summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/transport
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java44
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java346
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java158
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java678
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java1244
5 files changed, 2470 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
new file mode 100644
index 0000000000..3ca22b60c8
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.NetworkDriver;
+
+public class QpidAcceptor
+{
+ NetworkDriver _driver;
+ String _protocol;
+ public QpidAcceptor(NetworkDriver driver, String protocol)
+ {
+ _driver = driver;
+ _protocol = protocol;
+ }
+
+ public NetworkDriver getNetworkDriver()
+ {
+ return _driver;
+ }
+
+ public String toString()
+ {
+ return _protocol;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
new file mode 100644
index 0000000000..54cd709af3
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -0,0 +1,346 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*;
+
+import java.text.MessageFormat;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.configuration.ConnectionConfig;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionCloseCode;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.ExecutionException;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.Session;
+
+public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject
+{
+ private ConnectionConfig _config;
+ private Runnable _onOpenTask;
+ private AtomicBoolean _logClosed = new AtomicBoolean(false);
+ private LogActor _actor = GenericActor.getInstance(this);
+
+ private ApplicationRegistry _registry;
+ private boolean _statisticsEnabled = false;
+ private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+
+ public ServerConnection()
+ {
+
+ }
+
+ public UUID getId()
+ {
+ return _config.getId();
+ }
+
+ @Override
+ protected void invoke(Method method)
+ {
+ super.invoke(method);
+ }
+
+ @Override
+ protected void setState(State state)
+ {
+ super.setState(state);
+
+ if (state == State.OPEN)
+ {
+ if (_onOpenTask != null)
+ {
+ _onOpenTask.run();
+ }
+ _actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", true, true));
+
+ getVirtualHost().getConnectionRegistry().registerConnection(this);
+ }
+
+ if (state == State.CLOSE_RCVD || state == State.CLOSED || state == State.CLOSING)
+ {
+ if(_virtualHost != null)
+ {
+ _virtualHost.getConnectionRegistry().deregisterConnection(this);
+ }
+ }
+
+ if (state == State.CLOSED)
+ {
+ logClosed();
+ }
+ }
+
+ protected void logClosed()
+ {
+ if(_logClosed.compareAndSet(false, true))
+ {
+ CurrentActor.get().message(this, ConnectionMessages.CLOSE());
+ }
+ }
+
+ @Override
+ public ServerConnectionDelegate getConnectionDelegate()
+ {
+ return (ServerConnectionDelegate) super.getConnectionDelegate();
+ }
+
+ public void setConnectionDelegate(ServerConnectionDelegate delegate)
+ {
+ super.setConnectionDelegate(delegate);
+ }
+
+ private VirtualHost _virtualHost;
+
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public void setVirtualHost(VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+
+ initialiseStatistics();
+ }
+
+ public void setConnectionConfig(final ConnectionConfig config)
+ {
+ _config = config;
+ }
+
+ public ConnectionConfig getConfig()
+ {
+ return _config;
+ }
+
+ public void onOpen(final Runnable task)
+ {
+ _onOpenTask = task;
+ }
+
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ {
+ ExecutionException ex = new ExecutionException();
+ ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR;
+ try
+ {
+ code = ExecutionErrorCode.get(cause.getCode());
+ }
+ catch (IllegalArgumentException iae)
+ {
+ // Ignore, already set to INTERNAL_ERROR
+ }
+ ex.setErrorCode(code);
+ ex.setDescription(message);
+ ((ServerSession)session).invoke(ex);
+
+ ((ServerSession)session).close();
+ }
+
+ public LogSubject getLogSubject()
+ {
+ return (LogSubject) this;
+ }
+
+ @Override
+ public void received(ProtocolEvent event)
+ {
+ if (event.isConnectionControl())
+ {
+ CurrentActor.set(_actor);
+ }
+ else
+ {
+ ServerSession channel = (ServerSession) getSession(event.getChannel());
+ LogActor channelActor = null;
+
+ if (channel != null)
+ {
+ channelActor = channel.getLogActor();
+ }
+
+ CurrentActor.set(channelActor == null ? _actor : channelActor);
+ }
+
+ try
+ {
+ super.received(event);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+ }
+
+ public String toLogString()
+ {
+ boolean hasVirtualHost = (null != this.getVirtualHost());
+ boolean hasPrincipal = (null != getAuthorizationID());
+
+ if (hasPrincipal && hasVirtualHost)
+ {
+ return "[" +
+ MessageFormat.format(CONNECTION_FORMAT,
+ getConnectionId(),
+ getClientId(),
+ getConfig().getAddress(),
+ getVirtualHost().getName())
+ + "] ";
+ }
+ else if (hasPrincipal)
+ {
+ return "[" +
+ MessageFormat.format(USER_FORMAT,
+ getConnectionId(),
+ getClientId(),
+ getConfig().getAddress())
+ + "] ";
+
+ }
+ else
+ {
+ return "[" +
+ MessageFormat.format(SOCKET_FORMAT,
+ getConnectionId(),
+ getConfig().getAddress())
+ + "] ";
+ }
+ }
+
+ public LogActor getLogActor()
+ {
+ return _actor;
+ }
+
+ @Override
+ public void close(AMQConstant cause, String message) throws AMQException
+ {
+ ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
+ try
+ {
+ replyCode = ConnectionCloseCode.get(cause.getCode());
+ }
+ catch (IllegalArgumentException iae)
+ {
+ // Ignore
+ }
+ close(replyCode, message);
+ }
+
+ @Override
+ public List<AMQSessionModel> getSessionModels()
+ {
+ List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+ for (Session ssn : getChannels())
+ {
+ sessions.add((AMQSessionModel) ssn);
+ }
+ return sessions;
+ }
+
+ public void registerMessageDelivered(long messageSize)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
+ }
+ _virtualHost.registerMessageDelivered(messageSize);
+ }
+
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
+ }
+ _virtualHost.registerMessageReceived(messageSize, timestamp);
+ }
+
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messagesReceived;
+ }
+
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceived;
+ }
+
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messagesDelivered;
+ }
+
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDelivered;
+ }
+
+ public void resetStatistics()
+ {
+ _messagesDelivered.reset();
+ _dataDelivered.reset();
+ _messagesReceived.reset();
+ _dataReceived.reset();
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+ _virtualHost.getApplicationRegistry().getConfiguration().isStatisticsGenerationConnectionsEnabled());
+
+ _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
+ _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
+ _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
+ _dataReceived = new StatisticsCounter("data-received-" + getConnectionId());
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
new file mode 100644
index 0000000000..174dcbfa69
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -0,0 +1,158 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.*;
+
+public class ServerConnectionDelegate extends ServerDelegate
+{
+ private String _localFQDN;
+ private final IApplicationRegistry _appRegistry;
+
+ public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN)
+ {
+ this(new HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
+ }
+
+
+ public ServerConnectionDelegate(Map<String, Object> properties,
+ List<Object> locales,
+ IApplicationRegistry appRegistry,
+ String localFQDN)
+ {
+ super(properties, parseToList(appRegistry.getAuthenticationManager().getMechanisms()), locales);
+
+ _appRegistry = appRegistry;
+ _localFQDN = localFQDN;
+ }
+
+ private static List<Object> parseToList(String mechanisms)
+ {
+ List<Object> list = new ArrayList<Object>();
+ StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
+ while(tokenizer.hasMoreTokens())
+ {
+ list.add(tokenizer.nextToken());
+ }
+ return list;
+ }
+
+ @Override
+ public ServerSession getSession(Connection conn, SessionAttach atc)
+ {
+ SessionDelegate serverSessionDelegate = new ServerSessionDelegate(_appRegistry);
+
+ ServerSession ssn = new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0);
+
+ return ssn;
+ }
+
+ @Override
+ protected SaslServer createSaslServer(String mechanism) throws SaslException
+ {
+ return _appRegistry.getAuthenticationManager().createSaslServer(mechanism, _localFQDN);
+
+ }
+
+ @Override
+ public void connectionClose(Connection conn, ConnectionClose close)
+ {
+ try
+ {
+ ((ServerConnection) conn).logClosed();
+ }
+ finally
+ {
+ super.connectionClose(conn, close);
+ }
+
+ }
+
+ @Override
+ public void connectionOpen(Connection conn, ConnectionOpen open)
+ {
+ ServerConnection sconn = (ServerConnection) conn;
+
+ VirtualHost vhost;
+ String vhostName;
+ if(open.hasVirtualHost())
+ {
+ vhostName = open.getVirtualHost();
+ }
+ else
+ {
+ vhostName = "";
+ }
+ vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName);
+
+ SecurityManager.setThreadPrincipal(conn.getAuthorizationID());
+
+ if(vhost != null)
+ {
+ sconn.setVirtualHost(vhost);
+
+ if (!vhost.getSecurityManager().accessVirtualhost(vhostName, ((ProtocolEngine) sconn.getConfig()).getRemoteAddress()))
+ {
+ sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Permission denied '"+vhostName+"'"));
+ sconn.setState(Connection.State.CLOSING);
+ }
+ else
+ {
+ sconn.invoke(new ConnectionOpenOk(Collections.emptyList()));
+ sconn.setState(Connection.State.OPEN);
+ }
+ }
+ else
+ {
+ sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'"));
+ sconn.setState(Connection.State.CLOSING);
+ }
+
+ }
+
+ @Override
+ protected int getHeartbeatMax()
+ {
+ //TODO: implement broker support for actually sending heartbeats
+ return 0;
+ }
+
+ @Override
+ protected int getChannelMax()
+ {
+ return ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
new file mode 100644
index 0000000000..60c94b43c0
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -0,0 +1,678 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*;
+import static org.apache.qpid.util.Serial.*;
+
+import java.lang.ref.WeakReference;
+import java.security.Principal;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.ConnectionConfig;
+import org.apache.qpid.server.configuration.SessionConfig;
+import org.apache.qpid.server.configuration.SessionConfigType;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionDelegate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sun.security.auth.UserPrincipal;
+
+public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject
+{
+ private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
+
+ private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
+
+ private final UUID _id;
+ private ConnectionConfig _connectionConfig;
+ private long _createTime = System.currentTimeMillis();
+ private LogActor _actor = GenericActor.getInstance(this);
+
+ public static interface MessageDispositionChangeListener
+ {
+ public void onAccept();
+
+ public void onRelease();
+
+ public void onReject();
+
+ public boolean acquire();
+
+
+ }
+
+ public static interface Task
+ {
+ public void doTask(ServerSession session);
+ }
+
+
+ private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
+ new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
+
+ private ServerTransaction _transaction;
+
+ private final AtomicLong _txnStarts = new AtomicLong(0);
+ private final AtomicLong _txnCommits = new AtomicLong(0);
+ private final AtomicLong _txnRejects = new AtomicLong(0);
+ private final AtomicLong _txnCount = new AtomicLong(0);
+ private final AtomicLong _txnUpdateTime = new AtomicLong(0);
+
+ private Principal _principal;
+
+ private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
+
+ private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+
+ private final WeakReference<Session> _reference;
+
+ ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
+ {
+ this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
+ }
+
+ protected void setState(State state)
+ {
+ super.setState(state);
+
+ if (state == State.OPEN)
+ {
+ _actor.message(ChannelMessages.CREATE());
+ }
+ }
+
+ public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
+ {
+ super(connection, delegate, name, expiry);
+ _connectionConfig = connConfig;
+ _transaction = new AutoCommitTransaction(this.getMessageStore());
+ _principal = new UserPrincipal(connection.getAuthorizationID());
+ _reference = new WeakReference<Session>(this);
+ _id = getConfigStore().createId();
+ getConfigStore().addConfiguredObject(this);
+ }
+
+ private ConfigStore getConfigStore()
+ {
+ return getConnectionConfig().getConfigStore();
+ }
+
+
+ @Override
+ protected boolean isFull(int id)
+ {
+ return isCommandsFull(id);
+ }
+
+ public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
+ {
+ getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
+ _transaction.enqueue(queues,message, new ServerTransaction.Action()
+ {
+
+ BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
+
+ public void postCommit()
+ {
+ for(int i = 0; i < _queues.length; i++)
+ {
+ try
+ {
+ _queues[i].enqueue(message);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public void onRollback()
+ {
+ // NO-OP
+ }
+ });
+
+ incrementOutstandingTxnsIfNecessary();
+ updateTransactionalActivity();
+ }
+
+
+ public void sendMessage(MessageTransfer xfr,
+ Runnable postIdSettingAction)
+ {
+ invoke(xfr, postIdSettingAction);
+ getConnectionModel().registerMessageDelivered(xfr.getBodySize());
+ }
+
+ public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener)
+ {
+ _messageDispositionListenerMap.put(xfr.getId(), acceptListener);
+ }
+
+
+ private static interface MessageDispositionAction
+ {
+ void performAction(MessageDispositionChangeListener listener);
+ }
+
+ public void accept(RangeSet ranges)
+ {
+ dispositionChange(ranges, new MessageDispositionAction()
+ {
+ public void performAction(MessageDispositionChangeListener listener)
+ {
+ listener.onAccept();
+ }
+ });
+ }
+
+
+ public void release(RangeSet ranges)
+ {
+ dispositionChange(ranges, new MessageDispositionAction()
+ {
+ public void performAction(MessageDispositionChangeListener listener)
+ {
+ listener.onRelease();
+ }
+ });
+ }
+
+ public void reject(RangeSet ranges)
+ {
+ dispositionChange(ranges, new MessageDispositionAction()
+ {
+ public void performAction(MessageDispositionChangeListener listener)
+ {
+ listener.onReject();
+ }
+ });
+ }
+
+ public RangeSet acquire(RangeSet transfers)
+ {
+ RangeSet acquired = new RangeSet();
+
+ if(!_messageDispositionListenerMap.isEmpty())
+ {
+ Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
+ Iterator<Range> rangeIter = transfers.iterator();
+
+ if(rangeIter.hasNext())
+ {
+ Range range = rangeIter.next();
+
+ while(range != null && unacceptedMessages.hasNext())
+ {
+ int next = unacceptedMessages.next();
+ while(gt(next, range.getUpper()))
+ {
+ if(rangeIter.hasNext())
+ {
+ range = rangeIter.next();
+ }
+ else
+ {
+ range = null;
+ break;
+ }
+ }
+ if(range != null && range.includes(next))
+ {
+ MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.get(next);
+ if(changeListener != null && changeListener.acquire())
+ {
+ acquired.add(next);
+ }
+ }
+
+
+ }
+
+ }
+
+
+ }
+
+ return acquired;
+ }
+
+ public void dispositionChange(RangeSet ranges, MessageDispositionAction action)
+ {
+ if(ranges != null && !_messageDispositionListenerMap.isEmpty())
+ {
+ Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
+ Iterator<Range> rangeIter = ranges.iterator();
+
+ if(rangeIter.hasNext())
+ {
+ Range range = rangeIter.next();
+
+ while(range != null && unacceptedMessages.hasNext())
+ {
+ int next = unacceptedMessages.next();
+ while(gt(next, range.getUpper()))
+ {
+ if(rangeIter.hasNext())
+ {
+ range = rangeIter.next();
+ }
+ else
+ {
+ range = null;
+ break;
+ }
+ }
+ if(range != null && range.includes(next))
+ {
+ MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next);
+ action.performAction(changeListener);
+ }
+
+
+ }
+
+ }
+
+ }
+ }
+
+ public void removeDispositionListener(Method method)
+ {
+ _messageDispositionListenerMap.remove(method.getId());
+ }
+
+ public void onClose()
+ {
+ _transaction.rollback();
+ for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
+ {
+ listener.onRelease();
+ }
+ _messageDispositionListenerMap.clear();
+
+ getConfigStore().removeConfiguredObject(this);
+
+ for (Task task : _taskList)
+ {
+ task.doTask(this);
+ }
+
+ CurrentActor.get().message(getLogSubject(), ChannelMessages.CLOSE());
+ }
+
+ @Override
+ protected void awaitClose()
+ {
+ // Broker shouldn't block awaiting close - thus do override this method to do nothing
+ }
+
+ public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry)
+ {
+ _transaction.dequeue(entry.getQueue(), entry.getMessage(),
+ new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ sub.acknowledge(entry);
+ }
+
+ public void onRollback()
+ {
+ entry.release();
+ }
+ });
+ updateTransactionalActivity();
+ }
+
+ public Collection<Subscription_0_10> getSubscriptions()
+ {
+ return _subscriptions.values();
+ }
+
+ public void register(String destination, Subscription_0_10 sub)
+ {
+ _subscriptions.put(destination == null ? NULL_DESTINTATION : destination, sub);
+ }
+
+ public Subscription_0_10 getSubscription(String destination)
+ {
+ return _subscriptions.get(destination == null ? NULL_DESTINTATION : destination);
+ }
+
+ public void unregister(Subscription_0_10 sub)
+ {
+ _subscriptions.remove(sub.getConsumerTag().toString());
+ try
+ {
+ sub.getSendLock();
+ AMQQueue queue = sub.getQueue();
+ if(queue != null)
+ {
+ queue.unregisterSubscription(sub);
+ }
+
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ finally
+ {
+ sub.releaseSendLock();
+ }
+ }
+
+ public boolean isTransactional()
+ {
+ // this does not look great but there should only be one "non-transactional"
+ // transactional context, while there could be several transactional ones in
+ // theory
+ return !(_transaction instanceof AutoCommitTransaction);
+ }
+
+ public boolean inTransaction()
+ {
+ return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
+ }
+
+ public void selectTx()
+ {
+ _transaction = new LocalTransaction(this.getMessageStore());
+ _txnStarts.incrementAndGet();
+ }
+
+ public void commit()
+ {
+ _transaction.commit();
+
+ _txnCommits.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
+ }
+
+ public void rollback()
+ {
+ _transaction.rollback();
+
+ _txnRejects.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
+ }
+
+
+ private void incrementOutstandingTxnsIfNecessary()
+ {
+ if(isTransactional())
+ {
+ //There can currently only be at most one outstanding transaction
+ //due to only having LocalTransaction support. Set value to 1 if 0.
+ _txnCount.compareAndSet(0,1);
+ }
+ }
+
+ private void decrementOutstandingTxnsIfNecessary()
+ {
+ if(isTransactional())
+ {
+ //There can currently only be at most one outstanding transaction
+ //due to only having LocalTransaction support. Set value to 0 if 1.
+ _txnCount.compareAndSet(1,0);
+ }
+ }
+
+ /**
+ * Update last transaction activity timestamp
+ */
+ public void updateTransactionalActivity()
+ {
+ if (isTransactional())
+ {
+ _txnUpdateTime.set(System.currentTimeMillis());
+ }
+ }
+
+ public Long getTxnStarts()
+ {
+ return _txnStarts.get();
+ }
+
+ public Long getTxnCommits()
+ {
+ return _txnCommits.get();
+ }
+
+ public Long getTxnRejects()
+ {
+ return _txnRejects.get();
+ }
+
+ public Long getTxnCount()
+ {
+ return _txnCount.get();
+ }
+
+ public Principal getPrincipal()
+ {
+ return _principal;
+ }
+
+ public void addSessionCloseTask(Task task)
+ {
+ _taskList.add(task);
+ }
+
+ public void removeSessionCloseTask(Task task)
+ {
+ _taskList.remove(task);
+ }
+
+ public WeakReference<Session> getReference()
+ {
+ return _reference;
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return getVirtualHost().getMessageStore();
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return (VirtualHost) _connectionConfig.getVirtualHost();
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public SessionConfigType getConfigType()
+ {
+ return SessionConfigType.getInstance();
+ }
+
+ public ConfiguredObject getParent()
+ {
+ return getVirtualHost();
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public boolean isAttached()
+ {
+ return true;
+ }
+
+ public long getDetachedLifespan()
+ {
+ return 0;
+ }
+
+ public Long getExpiryTime()
+ {
+ return null;
+ }
+
+ public Long getMaxClientRate()
+ {
+ return null;
+ }
+
+ public ConnectionConfig getConnectionConfig()
+ {
+ return _connectionConfig;
+ }
+
+ public String getSessionName()
+ {
+ return getName().toString();
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
+ public void mgmtClose()
+ {
+ close();
+ }
+
+ public Object getID()
+ {
+ return getName();
+ }
+
+ public AMQConnectionModel getConnectionModel()
+ {
+ return (ServerConnection) getConnection();
+ }
+
+ public String getClientID()
+ {
+ return getConnection().getClientId();
+ }
+
+ public LogActor getLogActor()
+ {
+ return _actor;
+ }
+
+ public LogSubject getLogSubject()
+ {
+ return (LogSubject) this;
+ }
+
+ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
+ {
+ if (inTransaction())
+ {
+ long currentTime = System.currentTimeMillis();
+ long openTime = currentTime - _transaction.getTransactionStartTime();
+ long idleTime = currentTime - _txnUpdateTime.get();
+
+ // Log a warning on idle or open transactions
+ if (idleWarn > 0L && idleTime > idleWarn)
+ {
+ CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(openTime));
+ _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms");
+ }
+ else if (openWarn > 0L && openTime > openWarn)
+ {
+ CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime));
+ _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms");
+ }
+
+ // Close connection for idle or open transactions that have timed out
+ if (idleClose > 0L && idleTime > idleClose)
+ {
+ getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
+ }
+ else if (openClose > 0L && openTime > openClose)
+ {
+ getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
+ }
+ }
+ }
+
+ @Override
+ public String toLogString()
+ {
+ return "[" +
+ MessageFormat.format(CHANNEL_FORMAT,
+ getConnection().getConnectionId(),
+ getClientID(),
+ ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(),
+ getVirtualHost().getName(),
+ getChannel())
+ + "] ";
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
new file mode 100644
index 0000000000..be659c87ae
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -0,0 +1,1244 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.*;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.flow.FlowCreditManager_0_10;
+import org.apache.qpid.server.flow.WindowCreditManager;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.server.message.MessageMetaData_0_10;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.Acquired;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.ExchangeBind;
+import org.apache.qpid.transport.ExchangeBound;
+import org.apache.qpid.transport.ExchangeBoundResult;
+import org.apache.qpid.transport.ExchangeDeclare;
+import org.apache.qpid.transport.ExchangeDelete;
+import org.apache.qpid.transport.ExchangeQuery;
+import org.apache.qpid.transport.ExchangeQueryResult;
+import org.apache.qpid.transport.ExchangeUnbind;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.ExecutionException;
+import org.apache.qpid.transport.MessageAccept;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquire;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageCancel;
+import org.apache.qpid.transport.MessageFlow;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageFlush;
+import org.apache.qpid.transport.MessageReject;
+import org.apache.qpid.transport.MessageRejectCode;
+import org.apache.qpid.transport.MessageRelease;
+import org.apache.qpid.transport.MessageResume;
+import org.apache.qpid.transport.MessageSetFlowMode;
+import org.apache.qpid.transport.MessageStop;
+import org.apache.qpid.transport.MessageSubscribe;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.QueueDeclare;
+import org.apache.qpid.transport.QueueDelete;
+import org.apache.qpid.transport.QueuePurge;
+import org.apache.qpid.transport.QueueQuery;
+import org.apache.qpid.transport.QueueQueryResult;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionDelegate;
+import org.apache.qpid.transport.TxCommit;
+import org.apache.qpid.transport.TxRollback;
+import org.apache.qpid.transport.TxSelect;
+
+public class ServerSessionDelegate extends SessionDelegate
+{
+ private final IApplicationRegistry _appRegistry;
+
+ public ServerSessionDelegate(IApplicationRegistry appRegistry)
+ {
+ _appRegistry = appRegistry;
+ }
+
+ @Override
+ public void command(Session session, Method method)
+ {
+ SecurityManager.setThreadPrincipal(session.getConnection().getAuthorizationID());
+
+ if(!session.isClosing())
+ {
+ super.command(session, method);
+ if (method.isSync())
+ {
+ session.flushProcessed();
+ }
+ }
+ }
+
+ @Override
+ public void messageAccept(Session session, MessageAccept method)
+ {
+ ((ServerSession)session).accept(method.getTransfers());
+ }
+
+
+
+ @Override
+ public void messageReject(Session session, MessageReject method)
+ {
+ ((ServerSession)session).reject(method.getTransfers());
+ }
+
+ @Override
+ public void messageRelease(Session session, MessageRelease method)
+ {
+ ((ServerSession)session).release(method.getTransfers());
+ }
+
+ @Override
+ public void messageAcquire(Session session, MessageAcquire method)
+ {
+ RangeSet acquiredRanges = ((ServerSession)session).acquire(method.getTransfers());
+
+ Acquired result = new Acquired(acquiredRanges);
+
+
+ session.executionResult((int) method.getId(), result);
+
+
+ }
+
+ @Override
+ public void messageResume(Session session, MessageResume method)
+ {
+ super.messageResume(session, method);
+ }
+
+ @Override
+ public void messageSubscribe(Session session, MessageSubscribe method)
+ {
+
+ //TODO - work around broken Python tests
+ if(!method.hasAcceptMode())
+ {
+ method.setAcceptMode(MessageAcceptMode.EXPLICIT);
+ }
+ if(!method.hasAcquireMode())
+ {
+ method.setAcquireMode(MessageAcquireMode.PRE_ACQUIRED);
+
+ }
+
+ /* if(!method.hasAcceptMode())
+ {
+ exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "Accept-mode not supplied");
+ }
+ else if(!method.hasAcquireMode())
+ {
+ exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "Acquire-mode not supplied");
+ }
+ else */if(!method.hasQueue())
+ {
+ exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not supplied");
+ }
+ else
+ {
+ String destination = method.getDestination();
+
+ if(((ServerSession)session).getSubscription(destination)!=null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Subscription already exists with destaination: '"+destination+"'");
+ }
+ else
+ {
+ String queueName = method.getQueue();
+ QueueRegistry queueRegistry = getQueueRegistry(session);
+
+
+ final AMQQueue queue = queueRegistry.getQueue(queueName);
+
+ if(queue == null)
+ {
+ exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
+ }
+ else if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session)
+ {
+ exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
+ }
+ else
+ {
+ if(queue.isExclusive())
+ {
+ ServerSession s = (ServerSession) session;
+ queue.setExclusiveOwningSession(s);
+ if(queue.getPrincipalHolder() == null)
+ {
+ queue.setPrincipalHolder(s);
+ queue.setExclusiveOwningSession(s);
+ ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
+ {
+ public void doTask(ServerSession session)
+ {
+ if(queue.getPrincipalHolder() == session)
+ {
+ queue.setPrincipalHolder(null);
+ queue.setExclusiveOwningSession(null);
+ }
+ }
+ });
+ }
+
+ }
+
+ FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
+
+ FilterManager filterManager = null;
+ try
+ {
+ filterManager = FilterManagerFactory.createManager(method.getArguments());
+ }
+ catch (AMQException amqe)
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Exception Creating FilterManager");
+ return;
+ }
+
+ Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
+ destination,
+ method.getAcceptMode(),
+ method.getAcquireMode(),
+ MessageFlowMode.WINDOW,
+ creditManager,
+ filterManager,
+ method.getArguments());
+
+ ((ServerSession)session).register(destination, sub);
+ try
+ {
+ queue.registerSubscription(sub, method.getExclusive());
+ }
+ catch (AMQQueue.ExistingExclusiveSubscription existing)
+ {
+ exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer");
+ }
+ catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive)
+ {
+ exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively");
+ }
+ catch (AMQException e)
+ {
+ exception(session, method, e, "Cannot subscribe to '" + destination);
+ }
+ }
+ }
+ }
+ }
+
+
+ @Override
+ public void messageTransfer(Session ssn, MessageTransfer xfr)
+ {
+ ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn);
+ Exchange exchange;
+ if(xfr.hasDestination())
+ {
+ exchange = exchangeRegistry.getExchange(xfr.getDestination());
+ if(exchange == null)
+ {
+ exchange = exchangeRegistry.getDefaultExchange();
+ }
+ }
+ else
+ {
+ exchange = exchangeRegistry.getDefaultExchange();
+ }
+
+
+ DeliveryProperties delvProps = null;
+ if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+ {
+ delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
+ }
+
+ MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
+
+ if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName()))
+ {
+ ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS;
+ String description = "Permission denied: exchange-name '" + exchange.getName() + "'";
+ exception(ssn, xfr, errorCode, description);
+
+ return;
+ }
+
+ final MessageStore store = getVirtualHost(ssn).getMessageStore();
+ StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
+ ByteBuffer body = xfr.getBody();
+ if(body != null)
+ {
+ storeMessage.addContent(0, body);
+ }
+ storeMessage.flushToStore();
+ MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
+
+ ArrayList<? extends BaseQueue> queues = exchange.route(message);
+
+
+
+ if(queues != null && queues.size() != 0)
+ {
+ ((ServerSession) ssn).enqueue(message, queues);
+ }
+ else
+ {
+ if(delvProps == null || !delvProps.hasDiscardUnroutable() || !delvProps.getDiscardUnroutable())
+ {
+ if(xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ {
+ RangeSet rejects = new RangeSet();
+ rejects.add(xfr.getId());
+ MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
+ ssn.invoke(reject);
+ }
+ else
+ {
+ Exchange alternate = exchange.getAlternateExchange();
+ if(alternate != null)
+ {
+ queues = alternate.route(message);
+ if(queues != null && queues.size() != 0)
+ {
+ ((ServerSession) ssn).enqueue(message, queues);
+ }
+ else
+ {
+ //TODO - log the message discard
+ }
+ }
+ else
+ {
+ //TODO - log the message discard
+ }
+
+
+ }
+ }
+
+
+ }
+
+ ssn.processed(xfr);
+ }
+
+ @Override
+ public void messageCancel(Session session, MessageCancel method)
+ {
+ String destination = method.getDestination();
+
+ Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+
+ if(sub == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'");
+ }
+ else
+ {
+ AMQQueue queue = sub.getQueue();
+ ((ServerSession)session).unregister(sub);
+ if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0)
+ {
+ queue.setPrincipalHolder(null);
+ }
+ }
+ }
+
+ @Override
+ public void messageFlush(Session session, MessageFlush method)
+ {
+ String destination = method.getDestination();
+
+ Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+
+ if(sub == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'");
+ }
+ else
+ {
+
+ try
+ {
+ sub.flush();
+ }
+ catch (AMQException e)
+ {
+ exception(session, method, e, "Cannot flush subscription '" + destination);
+ }
+ }
+ }
+
+ @Override
+ public void txSelect(Session session, TxSelect method)
+ {
+ // TODO - check current tx mode
+ ((ServerSession)session).selectTx();
+ }
+
+ @Override
+ public void txCommit(Session session, TxCommit method)
+ {
+ // TODO - check current tx mode
+ ((ServerSession)session).commit();
+ }
+
+ @Override
+ public void txRollback(Session session, TxRollback method)
+ {
+ // TODO - check current tx mode
+ ((ServerSession)session).rollback();
+ }
+
+
+ @Override
+ public void exchangeDeclare(Session session, ExchangeDeclare method)
+ {
+ String exchangeName = method.getExchange();
+ VirtualHost virtualHost = getVirtualHost(session);
+ Exchange exchange = getExchange(session, exchangeName);
+
+ if(method.getPassive())
+ {
+ if(exchange == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '"+exchangeName+"'");
+
+ }
+ else
+ {
+ // TODO - check exchange has same properties
+ if(!exchange.getTypeShortString().toString().equals(method.getType()))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type");
+ }
+ }
+
+ }
+ else
+ {
+ if (exchange == null)
+ {
+ ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
+ ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+
+
+
+ try
+ {
+
+ exchange = exchangeFactory.createExchange(method.getExchange(),
+ method.getType(),
+ method.getDurable(),
+ method.getAutoDelete());
+
+ String alternateExchangeName = method.getAlternateExchange();
+ if(alternateExchangeName != null && alternateExchangeName.length() != 0)
+ {
+ Exchange alternate = getExchange(session, alternateExchangeName);
+ exchange.setAlternateExchange(alternate);
+ }
+
+ if (exchange.isDurable())
+ {
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+ store.createExchange(exchange);
+ }
+
+ exchangeRegistry.registerExchange(exchange);
+ }
+ catch(AMQUnknownExchangeType e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
+ }
+ catch (AMQException e)
+ {
+ exception(session, method, e, "Cannot declare exchange '" + exchangeName);
+ }
+ }
+ else
+ {
+ if(!exchange.getTypeShortString().toString().equals(method.getType()))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type");
+ }
+ }
+
+ }
+ }
+
+ // TODO decouple AMQException and AMQConstant error codes
+ private void exception(Session session, Method method, AMQException exception, String message)
+ {
+ ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+ if (exception.getErrorCode() != null)
+ {
+ try
+ {
+ errorCode = ExecutionErrorCode.get(exception.getErrorCode().getCode());
+ }
+ catch (IllegalArgumentException iae)
+ {
+ // ignore, already set to INTERNAL_ERROR
+ }
+ }
+ String description = message + "': " + exception.getMessage();
+
+ exception(session, method, errorCode, description);
+ }
+
+ private void exception(Session session, Method method, ExecutionErrorCode errorCode, String description)
+ {
+ ExecutionException ex = new ExecutionException();
+ ex.setErrorCode(errorCode);
+ ex.setCommandId(method.getId());
+ ex.setDescription(description);
+
+ session.invoke(ex);
+
+ session.close();
+ }
+
+ private Exchange getExchange(Session session, String exchangeName)
+ {
+ ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
+ return exchangeRegistry.getExchange(exchangeName);
+ }
+
+ private ExchangeRegistry getExchangeRegistry(Session session)
+ {
+ VirtualHost virtualHost = getVirtualHost(session);
+ return virtualHost.getExchangeRegistry();
+
+ }
+
+ private VirtualHost getVirtualHost(Session session)
+ {
+ ServerConnection conn = getServerConnection(session);
+ VirtualHost vhost = conn.getVirtualHost();
+ return vhost;
+ }
+
+ private ServerConnection getServerConnection(Session session)
+ {
+ ServerConnection conn = (ServerConnection) session.getConnection();
+ return conn;
+ }
+
+ @Override
+ public void exchangeDelete(Session session, ExchangeDelete method)
+ {
+ VirtualHost virtualHost = getVirtualHost(session);
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+
+ try
+ {
+ Exchange exchange = getExchange(session, method.getExchange());
+
+ if(exchange == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "No such exchange '" + method.getExchange() + "'");
+ }
+ else if(exchange.hasReferrers())
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange");
+ }
+ else if(isStandardExchange(exchange, virtualHost.getExchangeFactory().getRegisteredTypes()))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange '"+method.getExchange()+"' cannot be deleted");
+ }
+ else
+ {
+ exchangeRegistry.unregisterExchange(method.getExchange(), method.getIfUnused());
+
+ if (exchange.isDurable() && !exchange.isAutoDelete())
+ {
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+ store.removeExchange(exchange);
+ }
+ }
+ }
+ catch (ExchangeInUseException e)
+ {
+ exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Exchange in use");
+ }
+ catch (AMQException e)
+ {
+ exception(session, method, e, "Cannot delete exchange '" + method.getExchange() );
+ }
+ }
+
+ private boolean isStandardExchange(Exchange exchange, Collection<ExchangeType<? extends Exchange>> registeredTypes)
+ {
+ for(ExchangeType type : registeredTypes)
+ {
+ if(type.getDefaultExchangeName().toString().equals( exchange.getName() ))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void exchangeQuery(Session session, ExchangeQuery method)
+ {
+
+ ExchangeQueryResult result = new ExchangeQueryResult();
+
+ Exchange exchange = getExchange(session, method.getName());
+
+ if(exchange != null)
+ {
+ result.setDurable(exchange.isDurable());
+ result.setType(exchange.getTypeShortString().toString());
+ result.setNotFound(false);
+ }
+ else
+ {
+ result.setNotFound(true);
+ }
+
+ session.executionResult((int) method.getId(), result);
+ }
+
+ @Override
+ public void exchangeBind(Session session, ExchangeBind method)
+ {
+
+ VirtualHost virtualHost = getVirtualHost(session);
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ if (!method.hasQueue())
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
+ }
+ else if (!method.hasExchange())
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "exchange not set");
+ }
+/*
+ else if (!method.hasBindingKey())
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "binding-key not set");
+ }
+*/
+ else
+ {
+ //TODO - here because of non-compiant python tests
+ if (!method.hasBindingKey())
+ {
+ method.setBindingKey(method.getQueue());
+ }
+ AMQQueue queue = queueRegistry.getQueue(method.getQueue());
+ Exchange exchange = exchangeRegistry.getExchange(method.getExchange());
+ if(queue == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
+ }
+ else if(exchange == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found");
+ }
+ else if(exchange.getTypeShortString().equals(HeadersExchange.TYPE.getName()) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match")))
+ {
+ exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getName() + " require an x-match header");
+ }
+ else
+ {
+ AMQShortString routingKey = new AMQShortString(method.getBindingKey());
+ FieldTable fieldTable = FieldTable.convertToFieldTable(method.getArguments());
+
+ if (!exchange.isBound(routingKey, fieldTable, queue))
+ {
+ try
+ {
+ virtualHost.getBindingFactory().addBinding(method.getBindingKey(), queue, exchange, method.getArguments());
+ }
+ catch (AMQException e)
+ {
+ exception(session, method, e, "Cannot add binding '" + method.getBindingKey());
+ }
+ }
+ else
+ {
+ // todo
+ }
+ }
+
+
+ }
+
+
+
+ }
+
+ @Override
+ public void exchangeUnbind(Session session, ExchangeUnbind method)
+ {
+ VirtualHost virtualHost = getVirtualHost(session);
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ if (!method.hasQueue())
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
+ }
+ else if (!method.hasExchange())
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "exchange not set");
+ }
+ else if (!method.hasBindingKey())
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "binding-key not set");
+ }
+ else
+ {
+ AMQQueue queue = queueRegistry.getQueue(method.getQueue());
+ Exchange exchange = exchangeRegistry.getExchange(method.getExchange());
+ if(queue == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
+ }
+ else if(exchange == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found");
+ }
+ else
+ {
+ try
+ {
+ virtualHost.getBindingFactory().removeBinding(method.getBindingKey(), queue, exchange, null);
+ }
+ catch (AMQException e)
+ {
+ exception(session, method, e, "Cannot remove binding '" + method.getBindingKey());
+ }
+ }
+ }
+
+
+ super.exchangeUnbind(session, method);
+ }
+
+ @Override
+ public void exchangeBound(Session session, ExchangeBound method)
+ {
+
+ ExchangeBoundResult result = new ExchangeBoundResult();
+ Exchange exchange;
+ AMQQueue queue;
+ if(method.hasExchange())
+ {
+ exchange = getExchange(session, method.getExchange());
+
+ if(exchange == null)
+ {
+ result.setExchangeNotFound(true);
+ }
+ }
+ else
+ {
+ exchange = getExchangeRegistry(session).getDefaultExchange();
+ }
+
+
+ if(method.hasQueue())
+ {
+
+ queue = getQueue(session, method.getQueue());
+ if(queue == null)
+ {
+ result.setQueueNotFound(true);
+ }
+
+
+ if(exchange != null && queue != null)
+ {
+
+ boolean queueMatched = exchange.isBound(queue);
+
+ result.setQueueNotMatched(!queueMatched);
+
+
+ if(method.hasBindingKey())
+ {
+
+ if(method.hasArguments())
+ {
+ FieldTable args = FieldTable.convertToFieldTable(method.getArguments());
+
+ result.setArgsNotMatched(!exchange.isBound(new AMQShortString(method.getBindingKey()), args, queue));
+ }
+ if(queueMatched)
+ {
+ result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue));
+ }
+ else
+ {
+ result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
+ }
+ }
+ else if (method.hasArguments())
+ {
+ // TODO
+
+ }
+
+ result.setQueueNotMatched(!exchange.isBound(queue));
+
+ }
+ else if(exchange != null && method.hasBindingKey())
+ {
+ if(method.hasArguments())
+ {
+ // TODO
+ }
+ result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
+
+ }
+
+ }
+ else if(exchange != null && method.hasBindingKey())
+ {
+ if(method.hasArguments())
+ {
+ // TODO
+ }
+ result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
+
+ }
+
+
+ session.executionResult((int) method.getId(), result);
+
+
+ }
+
+ private AMQQueue getQueue(Session session, String queue)
+ {
+ QueueRegistry queueRegistry = getQueueRegistry(session);
+ return queueRegistry.getQueue(queue);
+ }
+
+ private QueueRegistry getQueueRegistry(Session session)
+ {
+ return getVirtualHost(session).getQueueRegistry();
+ }
+
+ @Override
+ public void queueDeclare(Session session, final QueueDeclare method)
+ {
+
+ VirtualHost virtualHost = getVirtualHost(session);
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+
+ String queueName = method.getQueue();
+ AMQQueue queue;
+ QueueRegistry queueRegistry = getQueueRegistry(session);
+ //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
+
+ synchronized (queueRegistry)
+ {
+
+ if (((queue = queueRegistry.getQueue(queueName)) == null))
+ {
+
+ if (method.getPassive())
+ {
+ String description = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
+ ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_FOUND;
+
+ exception(session, method, errorCode, description);
+
+ return;
+ }
+ else
+ {
+ try
+ {
+ queue = createQueue(queueName, method, virtualHost, (ServerSession)session);
+ if(method.getExclusive())
+ {
+ queue.setExclusive(true);
+ }
+ else if(method.getAutoDelete())
+ {
+ queue.setDeleteOnNoConsumers(true);
+ }
+
+ final String alternateExchangeName = method.getAlternateExchange();
+ if(alternateExchangeName != null && alternateExchangeName.length() != 0)
+ {
+ Exchange alternate = getExchange(session, alternateExchangeName);
+ queue.setAlternateExchange(alternate);
+ }
+
+ if(method.hasArguments() && method.getArguments() != null)
+ {
+ if(method.getArguments().containsKey("no-local"))
+ {
+ Object no_local = method.getArguments().get("no-local");
+ if(no_local instanceof Boolean && ((Boolean)no_local))
+ {
+ queue.setNoLocal(true);
+ }
+ }
+ }
+
+
+ if (queue.isDurable() && !queue.isAutoDelete())
+ {
+ if(method.hasArguments() && method.getArguments() != null)
+ {
+ Map<String,Object> args = method.getArguments();
+ FieldTable ftArgs = new FieldTable();
+ for(Map.Entry<String, Object> entry : args.entrySet())
+ {
+ ftArgs.put(new AMQShortString(entry.getKey()), entry.getValue());
+ }
+ store.createQueue(queue, ftArgs);
+ }
+ else
+ {
+ store.createQueue(queue);
+ }
+ }
+ queueRegistry.registerQueue(queue);
+ boolean autoRegister = ApplicationRegistry.getInstance().getConfiguration().getQueueAutoRegister();
+
+ if (autoRegister)
+ {
+
+ ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
+
+ Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
+
+ virtualHost.getBindingFactory().addBinding(queueName, queue, defaultExchange, null);
+
+ }
+
+ if (method.hasAutoDelete()
+ && method.getAutoDelete()
+ && method.hasExclusive()
+ && method.getExclusive())
+ {
+ final AMQQueue q = queue;
+ final ServerSession.Task deleteQueueTask = new ServerSession.Task()
+ {
+ public void doTask(ServerSession session)
+ {
+ try
+ {
+ q.delete();
+ }
+ catch (AMQException e)
+ {
+ exception(session, method, e, "Cannot delete '" + method.getQueue());
+ }
+ }
+ };
+ final ServerSession s = (ServerSession) session;
+ s.addSessionCloseTask(deleteQueueTask);
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+ public void doTask(AMQQueue queue) throws AMQException
+ {
+ s.removeSessionCloseTask(deleteQueueTask);
+ }
+ });
+ }
+ if (method.hasExclusive()
+ && method.getExclusive())
+ {
+ final AMQQueue q = queue;
+ final ServerSession.Task removeExclusive = new ServerSession.Task()
+ {
+ public void doTask(ServerSession session)
+ {
+ q.setPrincipalHolder(null);
+ q.setExclusiveOwningSession(null);
+ }
+ };
+ final ServerSession s = (ServerSession) session;
+ q.setExclusiveOwningSession(s);
+ s.addSessionCloseTask(removeExclusive);
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+ public void doTask(AMQQueue queue) throws AMQException
+ {
+ s.removeSessionCloseTask(removeExclusive);
+ }
+ });
+ }
+ }
+ catch (AMQException e)
+ {
+ exception(session, method, e, "Cannot declare queue '" + queueName);
+ }
+ }
+ }
+ else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+ {
+ String description = "Cannot declare queue('" + queueName + "'),"
+ + " as exclusive queue with same name "
+ + "declared on another session";
+ ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED;
+
+ exception(session, method, errorCode, description);
+
+ return;
+ }
+ }
+ }
+
+ protected AMQQueue createQueue(final String queueName,
+ final QueueDeclare body,
+ VirtualHost virtualHost,
+ final ServerSession session)
+ throws AMQException
+ {
+ String owner = body.getExclusive() ? session.getClientID() : null;
+
+ final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(),
+ body.getExclusive(), virtualHost, body.getArguments());
+
+ return queue;
+ }
+
+ @Override
+ public void queueDelete(Session session, QueueDelete method)
+ {
+ String queueName = method.getQueue();
+ if(queueName == null || queueName.length()==0)
+ {
+ exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "No queue name supplied");
+
+ }
+ else
+ {
+ AMQQueue queue = getQueue(session, queueName);
+
+
+ if (queue == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "No queue " + queueName + " found");
+ }
+ else
+ {
+ if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session)
+ {
+ exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
+ }
+ else if (method.getIfEmpty() && !queue.isEmpty())
+ {
+ exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " not empty");
+ }
+ else if (method.getIfUnused() && !queue.isUnused())
+ {
+ // TODO - Error code
+ exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " in use");
+
+ }
+ else
+ {
+ VirtualHost virtualHost = getVirtualHost(session);
+
+ try
+ {
+ queue.delete();
+ if (queue.isDurable() && !queue.isAutoDelete())
+ {
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+ store.removeQueue(queue);
+ }
+ }
+ catch (AMQException e)
+ {
+ exception(session, method, e, "Cannot delete queue '" + queueName);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void queuePurge(Session session, QueuePurge method)
+ {
+ String queueName = method.getQueue();
+ if(queueName == null || queueName.length()==0)
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "No queue name supplied");
+ }
+ else
+ {
+ AMQQueue queue = getQueue(session, queueName);
+
+ if (queue == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "No queue " + queueName + " found");
+ }
+ else
+ {
+ try
+ {
+ queue.clearQueue();
+ }
+ catch (AMQException e)
+ {
+ exception(session, method, e, "Cannot purge queue '" + queueName);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void queueQuery(Session session, QueueQuery method)
+ {
+ QueueQueryResult result = new QueueQueryResult();
+
+ AMQQueue queue = getQueue(session, method.getQueue());
+
+ if(queue != null)
+ {
+ result.setQueue(queue.getNameShortString().toString());
+ result.setDurable(queue.isDurable());
+ result.setExclusive(queue.isExclusive());
+ result.setAutoDelete(queue.isAutoDelete());
+ result.setArguments(queue.getArguments());
+ result.setMessageCount(queue.getMessageCount());
+ result.setSubscriberCount(queue.getConsumerCount());
+
+ }
+
+
+ session.executionResult((int) method.getId(), result);
+
+ }
+
+ @Override
+ public void messageSetFlowMode(Session session, MessageSetFlowMode sfm)
+ {
+ String destination = sfm.getDestination();
+
+ Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+
+ if(sub == null)
+ {
+ exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'");
+ }
+ else if(sub.isStopped())
+ {
+ sub.setFlowMode(sfm.getFlowMode());
+ }
+ }
+
+ @Override
+ public void messageStop(Session session, MessageStop stop)
+ {
+ String destination = stop.getDestination();
+
+ Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+
+ if(sub == null)
+ {
+ exception(session, stop, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'");
+ }
+ else
+ {
+ sub.stop();
+ }
+
+ }
+
+ @Override
+ public void messageFlow(Session session, MessageFlow flow)
+ {
+ String destination = flow.getDestination();
+
+ Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+
+ if(sub == null)
+ {
+ exception(session, flow, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'");
+ }
+ else
+ {
+ sub.addCredit(flow.getUnit(), flow.getValue());
+ }
+
+ }
+
+ @Override
+ public void closed(Session session)
+ {
+ for(Subscription_0_10 sub : getSubscriptions(session))
+ {
+ ((ServerSession)session).unregister(sub);
+ }
+ ((ServerSession)session).onClose();
+ }
+
+ @Override
+ public void detached(Session session)
+ {
+ closed(session);
+ }
+
+ public Collection<Subscription_0_10> getSubscriptions(Session session)
+ {
+ return ((ServerSession)session).getSubscriptions();
+ }
+
+}