diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport')
5 files changed, 2470 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java new file mode 100644 index 0000000000..3ca22b60c8 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import org.apache.qpid.transport.NetworkDriver; + +public class QpidAcceptor +{ + NetworkDriver _driver; + String _protocol; + public QpidAcceptor(NetworkDriver driver, String protocol) + { + _driver = driver; + _protocol = protocol; + } + + public NetworkDriver getNetworkDriver() + { + return _driver; + } + + public String toString() + { + return _protocol; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java new file mode 100644 index 0000000000..54cd709af3 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -0,0 +1,346 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*; + +import java.text.MessageFormat; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.configuration.ConnectionConfig; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.logging.messages.ConnectionMessages; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionCloseCode; +import org.apache.qpid.transport.ExecutionErrorCode; +import org.apache.qpid.transport.ExecutionException; +import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.ProtocolEvent; +import org.apache.qpid.transport.Session; + +public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject +{ + private ConnectionConfig _config; + private Runnable _onOpenTask; + private AtomicBoolean _logClosed = new AtomicBoolean(false); + private LogActor _actor = GenericActor.getInstance(this); + + private ApplicationRegistry _registry; + private boolean _statisticsEnabled = false; + private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + + public ServerConnection() + { + + } + + public UUID getId() + { + return _config.getId(); + } + + @Override + protected void invoke(Method method) + { + super.invoke(method); + } + + @Override + protected void setState(State state) + { + super.setState(state); + + if (state == State.OPEN) + { + if (_onOpenTask != null) + { + _onOpenTask.run(); + } + _actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", true, true)); + + getVirtualHost().getConnectionRegistry().registerConnection(this); + } + + if (state == State.CLOSE_RCVD || state == State.CLOSED || state == State.CLOSING) + { + if(_virtualHost != null) + { + _virtualHost.getConnectionRegistry().deregisterConnection(this); + } + } + + if (state == State.CLOSED) + { + logClosed(); + } + } + + protected void logClosed() + { + if(_logClosed.compareAndSet(false, true)) + { + CurrentActor.get().message(this, ConnectionMessages.CLOSE()); + } + } + + @Override + public ServerConnectionDelegate getConnectionDelegate() + { + return (ServerConnectionDelegate) super.getConnectionDelegate(); + } + + public void setConnectionDelegate(ServerConnectionDelegate delegate) + { + super.setConnectionDelegate(delegate); + } + + private VirtualHost _virtualHost; + + + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + + public void setVirtualHost(VirtualHost virtualHost) + { + _virtualHost = virtualHost; + + initialiseStatistics(); + } + + public void setConnectionConfig(final ConnectionConfig config) + { + _config = config; + } + + public ConnectionConfig getConfig() + { + return _config; + } + + public void onOpen(final Runnable task) + { + _onOpenTask = task; + } + + public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException + { + ExecutionException ex = new ExecutionException(); + ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; + try + { + code = ExecutionErrorCode.get(cause.getCode()); + } + catch (IllegalArgumentException iae) + { + // Ignore, already set to INTERNAL_ERROR + } + ex.setErrorCode(code); + ex.setDescription(message); + ((ServerSession)session).invoke(ex); + + ((ServerSession)session).close(); + } + + public LogSubject getLogSubject() + { + return (LogSubject) this; + } + + @Override + public void received(ProtocolEvent event) + { + if (event.isConnectionControl()) + { + CurrentActor.set(_actor); + } + else + { + ServerSession channel = (ServerSession) getSession(event.getChannel()); + LogActor channelActor = null; + + if (channel != null) + { + channelActor = channel.getLogActor(); + } + + CurrentActor.set(channelActor == null ? _actor : channelActor); + } + + try + { + super.received(event); + } + finally + { + CurrentActor.remove(); + } + } + + public String toLogString() + { + boolean hasVirtualHost = (null != this.getVirtualHost()); + boolean hasPrincipal = (null != getAuthorizationID()); + + if (hasPrincipal && hasVirtualHost) + { + return "[" + + MessageFormat.format(CONNECTION_FORMAT, + getConnectionId(), + getClientId(), + getConfig().getAddress(), + getVirtualHost().getName()) + + "] "; + } + else if (hasPrincipal) + { + return "[" + + MessageFormat.format(USER_FORMAT, + getConnectionId(), + getClientId(), + getConfig().getAddress()) + + "] "; + + } + else + { + return "[" + + MessageFormat.format(SOCKET_FORMAT, + getConnectionId(), + getConfig().getAddress()) + + "] "; + } + } + + public LogActor getLogActor() + { + return _actor; + } + + @Override + public void close(AMQConstant cause, String message) throws AMQException + { + ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; + try + { + replyCode = ConnectionCloseCode.get(cause.getCode()); + } + catch (IllegalArgumentException iae) + { + // Ignore + } + close(replyCode, message); + } + + @Override + public List<AMQSessionModel> getSessionModels() + { + List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>(); + for (Session ssn : getChannels()) + { + sessions.add((AMQSessionModel) ssn); + } + return sessions; + } + + public void registerMessageDelivered(long messageSize) + { + if (isStatisticsEnabled()) + { + _messagesDelivered.registerEvent(1L); + _dataDelivered.registerEvent(messageSize); + } + _virtualHost.registerMessageDelivered(messageSize); + } + + public void registerMessageReceived(long messageSize, long timestamp) + { + if (isStatisticsEnabled()) + { + _messagesReceived.registerEvent(1L, timestamp); + _dataReceived.registerEvent(messageSize, timestamp); + } + _virtualHost.registerMessageReceived(messageSize, timestamp); + } + + public StatisticsCounter getMessageReceiptStatistics() + { + return _messagesReceived; + } + + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceived; + } + + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messagesDelivered; + } + + public StatisticsCounter getDataDeliveryStatistics() + { + return _dataDelivered; + } + + public void resetStatistics() + { + _messagesDelivered.reset(); + _dataDelivered.reset(); + _messagesReceived.reset(); + _dataReceived.reset(); + } + + public void initialiseStatistics() + { + setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && + _virtualHost.getApplicationRegistry().getConfiguration().isStatisticsGenerationConnectionsEnabled()); + + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId()); + _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId()); + _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId()); + _dataReceived = new StatisticsCounter("data-received-" + getConnectionId()); + } + + public boolean isStatisticsEnabled() + { + return _statisticsEnabled; + } + + public void setStatisticsEnabled(boolean enabled) + { + _statisticsEnabled = enabled; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java new file mode 100644 index 0000000000..174dcbfa69 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -0,0 +1,158 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.StringTokenizer; + +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.*; + +public class ServerConnectionDelegate extends ServerDelegate +{ + private String _localFQDN; + private final IApplicationRegistry _appRegistry; + + public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN) + { + this(new HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN); + } + + + public ServerConnectionDelegate(Map<String, Object> properties, + List<Object> locales, + IApplicationRegistry appRegistry, + String localFQDN) + { + super(properties, parseToList(appRegistry.getAuthenticationManager().getMechanisms()), locales); + + _appRegistry = appRegistry; + _localFQDN = localFQDN; + } + + private static List<Object> parseToList(String mechanisms) + { + List<Object> list = new ArrayList<Object>(); + StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); + while(tokenizer.hasMoreTokens()) + { + list.add(tokenizer.nextToken()); + } + return list; + } + + @Override + public ServerSession getSession(Connection conn, SessionAttach atc) + { + SessionDelegate serverSessionDelegate = new ServerSessionDelegate(_appRegistry); + + ServerSession ssn = new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0); + + return ssn; + } + + @Override + protected SaslServer createSaslServer(String mechanism) throws SaslException + { + return _appRegistry.getAuthenticationManager().createSaslServer(mechanism, _localFQDN); + + } + + @Override + public void connectionClose(Connection conn, ConnectionClose close) + { + try + { + ((ServerConnection) conn).logClosed(); + } + finally + { + super.connectionClose(conn, close); + } + + } + + @Override + public void connectionOpen(Connection conn, ConnectionOpen open) + { + ServerConnection sconn = (ServerConnection) conn; + + VirtualHost vhost; + String vhostName; + if(open.hasVirtualHost()) + { + vhostName = open.getVirtualHost(); + } + else + { + vhostName = ""; + } + vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName); + + SecurityManager.setThreadPrincipal(conn.getAuthorizationID()); + + if(vhost != null) + { + sconn.setVirtualHost(vhost); + + if (!vhost.getSecurityManager().accessVirtualhost(vhostName, ((ProtocolEngine) sconn.getConfig()).getRemoteAddress())) + { + sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Permission denied '"+vhostName+"'")); + sconn.setState(Connection.State.CLOSING); + } + else + { + sconn.invoke(new ConnectionOpenOk(Collections.emptyList())); + sconn.setState(Connection.State.OPEN); + } + } + else + { + sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'")); + sconn.setState(Connection.State.CLOSING); + } + + } + + @Override + protected int getHeartbeatMax() + { + //TODO: implement broker support for actually sending heartbeats + return 0; + } + + @Override + protected int getChannelMax() + { + return ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java new file mode 100644 index 0000000000..60c94b43c0 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -0,0 +1,678 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*; +import static org.apache.qpid.util.Serial.*; + +import java.lang.ref.WeakReference; +import java.security.Principal; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.server.configuration.ConfigStore; +import org.apache.qpid.server.configuration.ConfiguredObject; +import org.apache.qpid.server.configuration.ConnectionConfig; +import org.apache.qpid.server.configuration.SessionConfig; +import org.apache.qpid.server.configuration.SessionConfigType; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.security.PrincipalHolder; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.subscription.Subscription_0_10; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.LocalTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.Binary; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionDelegate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.sun.security.auth.UserPrincipal; + +public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject +{ + private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); + + private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); + + private final UUID _id; + private ConnectionConfig _connectionConfig; + private long _createTime = System.currentTimeMillis(); + private LogActor _actor = GenericActor.getInstance(this); + + public static interface MessageDispositionChangeListener + { + public void onAccept(); + + public void onRelease(); + + public void onReject(); + + public boolean acquire(); + + + } + + public static interface Task + { + public void doTask(ServerSession session); + } + + + private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap = + new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>(); + + private ServerTransaction _transaction; + + private final AtomicLong _txnStarts = new AtomicLong(0); + private final AtomicLong _txnCommits = new AtomicLong(0); + private final AtomicLong _txnRejects = new AtomicLong(0); + private final AtomicLong _txnCount = new AtomicLong(0); + private final AtomicLong _txnUpdateTime = new AtomicLong(0); + + private Principal _principal; + + private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>(); + + private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); + + private final WeakReference<Session> _reference; + + ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry) + { + this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig()); + } + + protected void setState(State state) + { + super.setState(state); + + if (state == State.OPEN) + { + _actor.message(ChannelMessages.CREATE()); + } + } + + public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig) + { + super(connection, delegate, name, expiry); + _connectionConfig = connConfig; + _transaction = new AutoCommitTransaction(this.getMessageStore()); + _principal = new UserPrincipal(connection.getAuthorizationID()); + _reference = new WeakReference<Session>(this); + _id = getConfigStore().createId(); + getConfigStore().addConfiguredObject(this); + } + + private ConfigStore getConfigStore() + { + return getConnectionConfig().getConfigStore(); + } + + + @Override + protected boolean isFull(int id) + { + return isCommandsFull(id); + } + + public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues) + { + getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); + _transaction.enqueue(queues,message, new ServerTransaction.Action() + { + + BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]); + + public void postCommit() + { + for(int i = 0; i < _queues.length; i++) + { + try + { + _queues[i].enqueue(message); + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } + } + } + + public void onRollback() + { + // NO-OP + } + }); + + incrementOutstandingTxnsIfNecessary(); + updateTransactionalActivity(); + } + + + public void sendMessage(MessageTransfer xfr, + Runnable postIdSettingAction) + { + invoke(xfr, postIdSettingAction); + getConnectionModel().registerMessageDelivered(xfr.getBodySize()); + } + + public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener) + { + _messageDispositionListenerMap.put(xfr.getId(), acceptListener); + } + + + private static interface MessageDispositionAction + { + void performAction(MessageDispositionChangeListener listener); + } + + public void accept(RangeSet ranges) + { + dispositionChange(ranges, new MessageDispositionAction() + { + public void performAction(MessageDispositionChangeListener listener) + { + listener.onAccept(); + } + }); + } + + + public void release(RangeSet ranges) + { + dispositionChange(ranges, new MessageDispositionAction() + { + public void performAction(MessageDispositionChangeListener listener) + { + listener.onRelease(); + } + }); + } + + public void reject(RangeSet ranges) + { + dispositionChange(ranges, new MessageDispositionAction() + { + public void performAction(MessageDispositionChangeListener listener) + { + listener.onReject(); + } + }); + } + + public RangeSet acquire(RangeSet transfers) + { + RangeSet acquired = new RangeSet(); + + if(!_messageDispositionListenerMap.isEmpty()) + { + Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator(); + Iterator<Range> rangeIter = transfers.iterator(); + + if(rangeIter.hasNext()) + { + Range range = rangeIter.next(); + + while(range != null && unacceptedMessages.hasNext()) + { + int next = unacceptedMessages.next(); + while(gt(next, range.getUpper())) + { + if(rangeIter.hasNext()) + { + range = rangeIter.next(); + } + else + { + range = null; + break; + } + } + if(range != null && range.includes(next)) + { + MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.get(next); + if(changeListener != null && changeListener.acquire()) + { + acquired.add(next); + } + } + + + } + + } + + + } + + return acquired; + } + + public void dispositionChange(RangeSet ranges, MessageDispositionAction action) + { + if(ranges != null && !_messageDispositionListenerMap.isEmpty()) + { + Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator(); + Iterator<Range> rangeIter = ranges.iterator(); + + if(rangeIter.hasNext()) + { + Range range = rangeIter.next(); + + while(range != null && unacceptedMessages.hasNext()) + { + int next = unacceptedMessages.next(); + while(gt(next, range.getUpper())) + { + if(rangeIter.hasNext()) + { + range = rangeIter.next(); + } + else + { + range = null; + break; + } + } + if(range != null && range.includes(next)) + { + MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next); + action.performAction(changeListener); + } + + + } + + } + + } + } + + public void removeDispositionListener(Method method) + { + _messageDispositionListenerMap.remove(method.getId()); + } + + public void onClose() + { + _transaction.rollback(); + for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values()) + { + listener.onRelease(); + } + _messageDispositionListenerMap.clear(); + + getConfigStore().removeConfiguredObject(this); + + for (Task task : _taskList) + { + task.doTask(this); + } + + CurrentActor.get().message(getLogSubject(), ChannelMessages.CLOSE()); + } + + @Override + protected void awaitClose() + { + // Broker shouldn't block awaiting close - thus do override this method to do nothing + } + + public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry) + { + _transaction.dequeue(entry.getQueue(), entry.getMessage(), + new ServerTransaction.Action() + { + + public void postCommit() + { + sub.acknowledge(entry); + } + + public void onRollback() + { + entry.release(); + } + }); + updateTransactionalActivity(); + } + + public Collection<Subscription_0_10> getSubscriptions() + { + return _subscriptions.values(); + } + + public void register(String destination, Subscription_0_10 sub) + { + _subscriptions.put(destination == null ? NULL_DESTINTATION : destination, sub); + } + + public Subscription_0_10 getSubscription(String destination) + { + return _subscriptions.get(destination == null ? NULL_DESTINTATION : destination); + } + + public void unregister(Subscription_0_10 sub) + { + _subscriptions.remove(sub.getConsumerTag().toString()); + try + { + sub.getSendLock(); + AMQQueue queue = sub.getQueue(); + if(queue != null) + { + queue.unregisterSubscription(sub); + } + + } + catch (AMQException e) + { + // TODO + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + finally + { + sub.releaseSendLock(); + } + } + + public boolean isTransactional() + { + // this does not look great but there should only be one "non-transactional" + // transactional context, while there could be several transactional ones in + // theory + return !(_transaction instanceof AutoCommitTransaction); + } + + public boolean inTransaction() + { + return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; + } + + public void selectTx() + { + _transaction = new LocalTransaction(this.getMessageStore()); + _txnStarts.incrementAndGet(); + } + + public void commit() + { + _transaction.commit(); + + _txnCommits.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); + } + + public void rollback() + { + _transaction.rollback(); + + _txnRejects.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); + } + + + private void incrementOutstandingTxnsIfNecessary() + { + if(isTransactional()) + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 1 if 0. + _txnCount.compareAndSet(0,1); + } + } + + private void decrementOutstandingTxnsIfNecessary() + { + if(isTransactional()) + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 0 if 1. + _txnCount.compareAndSet(1,0); + } + } + + /** + * Update last transaction activity timestamp + */ + public void updateTransactionalActivity() + { + if (isTransactional()) + { + _txnUpdateTime.set(System.currentTimeMillis()); + } + } + + public Long getTxnStarts() + { + return _txnStarts.get(); + } + + public Long getTxnCommits() + { + return _txnCommits.get(); + } + + public Long getTxnRejects() + { + return _txnRejects.get(); + } + + public Long getTxnCount() + { + return _txnCount.get(); + } + + public Principal getPrincipal() + { + return _principal; + } + + public void addSessionCloseTask(Task task) + { + _taskList.add(task); + } + + public void removeSessionCloseTask(Task task) + { + _taskList.remove(task); + } + + public WeakReference<Session> getReference() + { + return _reference; + } + + public MessageStore getMessageStore() + { + return getVirtualHost().getMessageStore(); + } + + public VirtualHost getVirtualHost() + { + return (VirtualHost) _connectionConfig.getVirtualHost(); + } + + public UUID getId() + { + return _id; + } + + public SessionConfigType getConfigType() + { + return SessionConfigType.getInstance(); + } + + public ConfiguredObject getParent() + { + return getVirtualHost(); + } + + public boolean isDurable() + { + return false; + } + + public boolean isAttached() + { + return true; + } + + public long getDetachedLifespan() + { + return 0; + } + + public Long getExpiryTime() + { + return null; + } + + public Long getMaxClientRate() + { + return null; + } + + public ConnectionConfig getConnectionConfig() + { + return _connectionConfig; + } + + public String getSessionName() + { + return getName().toString(); + } + + public long getCreateTime() + { + return _createTime; + } + + public void mgmtClose() + { + close(); + } + + public Object getID() + { + return getName(); + } + + public AMQConnectionModel getConnectionModel() + { + return (ServerConnection) getConnection(); + } + + public String getClientID() + { + return getConnection().getClientId(); + } + + public LogActor getLogActor() + { + return _actor; + } + + public LogSubject getLogSubject() + { + return (LogSubject) this; + } + + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException + { + if (inTransaction()) + { + long currentTime = System.currentTimeMillis(); + long openTime = currentTime - _transaction.getTransactionStartTime(); + long idleTime = currentTime - _txnUpdateTime.get(); + + // Log a warning on idle or open transactions + if (idleWarn > 0L && idleTime > idleWarn) + { + CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(openTime)); + _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms"); + } + else if (openWarn > 0L && openTime > openWarn) + { + CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime)); + _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms"); + } + + // Close connection for idle or open transactions that have timed out + if (idleClose > 0L && idleTime > idleClose) + { + getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); + } + else if (openClose > 0L && openTime > openClose) + { + getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); + } + } + } + + @Override + public String toLogString() + { + return "[" + + MessageFormat.format(CHANNEL_FORMAT, + getConnection().getConnectionId(), + getClientID(), + ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(), + getVirtualHost().getName(), + getChannel()) + + "] "; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java new file mode 100644 index 0000000000..be659c87ae --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -0,0 +1,1244 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQUnknownExchangeType; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.exchange.*; +import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.filter.FilterManagerFactory; +import org.apache.qpid.server.flow.FlowCreditManager_0_10; +import org.apache.qpid.server.flow.WindowCreditManager; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.message.MessageMetaData_0_10; +import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.subscription.Subscription_0_10; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.Acquired; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.ExchangeBind; +import org.apache.qpid.transport.ExchangeBound; +import org.apache.qpid.transport.ExchangeBoundResult; +import org.apache.qpid.transport.ExchangeDeclare; +import org.apache.qpid.transport.ExchangeDelete; +import org.apache.qpid.transport.ExchangeQuery; +import org.apache.qpid.transport.ExchangeQueryResult; +import org.apache.qpid.transport.ExchangeUnbind; +import org.apache.qpid.transport.ExecutionErrorCode; +import org.apache.qpid.transport.ExecutionException; +import org.apache.qpid.transport.MessageAccept; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquire; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCancel; +import org.apache.qpid.transport.MessageFlow; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageFlush; +import org.apache.qpid.transport.MessageReject; +import org.apache.qpid.transport.MessageRejectCode; +import org.apache.qpid.transport.MessageRelease; +import org.apache.qpid.transport.MessageResume; +import org.apache.qpid.transport.MessageSetFlowMode; +import org.apache.qpid.transport.MessageStop; +import org.apache.qpid.transport.MessageSubscribe; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.QueueDeclare; +import org.apache.qpid.transport.QueueDelete; +import org.apache.qpid.transport.QueuePurge; +import org.apache.qpid.transport.QueueQuery; +import org.apache.qpid.transport.QueueQueryResult; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionDelegate; +import org.apache.qpid.transport.TxCommit; +import org.apache.qpid.transport.TxRollback; +import org.apache.qpid.transport.TxSelect; + +public class ServerSessionDelegate extends SessionDelegate +{ + private final IApplicationRegistry _appRegistry; + + public ServerSessionDelegate(IApplicationRegistry appRegistry) + { + _appRegistry = appRegistry; + } + + @Override + public void command(Session session, Method method) + { + SecurityManager.setThreadPrincipal(session.getConnection().getAuthorizationID()); + + if(!session.isClosing()) + { + super.command(session, method); + if (method.isSync()) + { + session.flushProcessed(); + } + } + } + + @Override + public void messageAccept(Session session, MessageAccept method) + { + ((ServerSession)session).accept(method.getTransfers()); + } + + + + @Override + public void messageReject(Session session, MessageReject method) + { + ((ServerSession)session).reject(method.getTransfers()); + } + + @Override + public void messageRelease(Session session, MessageRelease method) + { + ((ServerSession)session).release(method.getTransfers()); + } + + @Override + public void messageAcquire(Session session, MessageAcquire method) + { + RangeSet acquiredRanges = ((ServerSession)session).acquire(method.getTransfers()); + + Acquired result = new Acquired(acquiredRanges); + + + session.executionResult((int) method.getId(), result); + + + } + + @Override + public void messageResume(Session session, MessageResume method) + { + super.messageResume(session, method); + } + + @Override + public void messageSubscribe(Session session, MessageSubscribe method) + { + + //TODO - work around broken Python tests + if(!method.hasAcceptMode()) + { + method.setAcceptMode(MessageAcceptMode.EXPLICIT); + } + if(!method.hasAcquireMode()) + { + method.setAcquireMode(MessageAcquireMode.PRE_ACQUIRED); + + } + + /* if(!method.hasAcceptMode()) + { + exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "Accept-mode not supplied"); + } + else if(!method.hasAcquireMode()) + { + exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "Acquire-mode not supplied"); + } + else */if(!method.hasQueue()) + { + exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not supplied"); + } + else + { + String destination = method.getDestination(); + + if(((ServerSession)session).getSubscription(destination)!=null) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Subscription already exists with destaination: '"+destination+"'"); + } + else + { + String queueName = method.getQueue(); + QueueRegistry queueRegistry = getQueueRegistry(session); + + + final AMQQueue queue = queueRegistry.getQueue(queueName); + + if(queue == null) + { + exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found"); + } + else if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session) + { + exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); + } + else + { + if(queue.isExclusive()) + { + ServerSession s = (ServerSession) session; + queue.setExclusiveOwningSession(s); + if(queue.getPrincipalHolder() == null) + { + queue.setPrincipalHolder(s); + queue.setExclusiveOwningSession(s); + ((ServerSession) session).addSessionCloseTask(new ServerSession.Task() + { + public void doTask(ServerSession session) + { + if(queue.getPrincipalHolder() == session) + { + queue.setPrincipalHolder(null); + queue.setExclusiveOwningSession(null); + } + } + }); + } + + } + + FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L); + + FilterManager filterManager = null; + try + { + filterManager = FilterManagerFactory.createManager(method.getArguments()); + } + catch (AMQException amqe) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Exception Creating FilterManager"); + return; + } + + Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, + destination, + method.getAcceptMode(), + method.getAcquireMode(), + MessageFlowMode.WINDOW, + creditManager, + filterManager, + method.getArguments()); + + ((ServerSession)session).register(destination, sub); + try + { + queue.registerSubscription(sub, method.getExclusive()); + } + catch (AMQQueue.ExistingExclusiveSubscription existing) + { + exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer"); + } + catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive) + { + exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively"); + } + catch (AMQException e) + { + exception(session, method, e, "Cannot subscribe to '" + destination); + } + } + } + } + } + + + @Override + public void messageTransfer(Session ssn, MessageTransfer xfr) + { + ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn); + Exchange exchange; + if(xfr.hasDestination()) + { + exchange = exchangeRegistry.getExchange(xfr.getDestination()); + if(exchange == null) + { + exchange = exchangeRegistry.getDefaultExchange(); + } + } + else + { + exchange = exchangeRegistry.getDefaultExchange(); + } + + + DeliveryProperties delvProps = null; + if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration()) + { + delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl()); + } + + MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr); + + if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName())) + { + ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS; + String description = "Permission denied: exchange-name '" + exchange.getName() + "'"; + exception(ssn, xfr, errorCode, description); + + return; + } + + final MessageStore store = getVirtualHost(ssn).getMessageStore(); + StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData); + ByteBuffer body = xfr.getBody(); + if(body != null) + { + storeMessage.addContent(0, body); + } + storeMessage.flushToStore(); + MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference()); + + ArrayList<? extends BaseQueue> queues = exchange.route(message); + + + + if(queues != null && queues.size() != 0) + { + ((ServerSession) ssn).enqueue(message, queues); + } + else + { + if(delvProps == null || !delvProps.hasDiscardUnroutable() || !delvProps.getDiscardUnroutable()) + { + if(xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) + { + RangeSet rejects = new RangeSet(); + rejects.add(xfr.getId()); + MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable"); + ssn.invoke(reject); + } + else + { + Exchange alternate = exchange.getAlternateExchange(); + if(alternate != null) + { + queues = alternate.route(message); + if(queues != null && queues.size() != 0) + { + ((ServerSession) ssn).enqueue(message, queues); + } + else + { + //TODO - log the message discard + } + } + else + { + //TODO - log the message discard + } + + + } + } + + + } + + ssn.processed(xfr); + } + + @Override + public void messageCancel(Session session, MessageCancel method) + { + String destination = method.getDestination(); + + Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + + if(sub == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); + } + else + { + AMQQueue queue = sub.getQueue(); + ((ServerSession)session).unregister(sub); + if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0) + { + queue.setPrincipalHolder(null); + } + } + } + + @Override + public void messageFlush(Session session, MessageFlush method) + { + String destination = method.getDestination(); + + Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + + if(sub == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); + } + else + { + + try + { + sub.flush(); + } + catch (AMQException e) + { + exception(session, method, e, "Cannot flush subscription '" + destination); + } + } + } + + @Override + public void txSelect(Session session, TxSelect method) + { + // TODO - check current tx mode + ((ServerSession)session).selectTx(); + } + + @Override + public void txCommit(Session session, TxCommit method) + { + // TODO - check current tx mode + ((ServerSession)session).commit(); + } + + @Override + public void txRollback(Session session, TxRollback method) + { + // TODO - check current tx mode + ((ServerSession)session).rollback(); + } + + + @Override + public void exchangeDeclare(Session session, ExchangeDeclare method) + { + String exchangeName = method.getExchange(); + VirtualHost virtualHost = getVirtualHost(session); + Exchange exchange = getExchange(session, exchangeName); + + if(method.getPassive()) + { + if(exchange == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '"+exchangeName+"'"); + + } + else + { + // TODO - check exchange has same properties + if(!exchange.getTypeShortString().toString().equals(method.getType())) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type"); + } + } + + } + else + { + if (exchange == null) + { + ExchangeRegistry exchangeRegistry = getExchangeRegistry(session); + ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); + + + + try + { + + exchange = exchangeFactory.createExchange(method.getExchange(), + method.getType(), + method.getDurable(), + method.getAutoDelete()); + + String alternateExchangeName = method.getAlternateExchange(); + if(alternateExchangeName != null && alternateExchangeName.length() != 0) + { + Exchange alternate = getExchange(session, alternateExchangeName); + exchange.setAlternateExchange(alternate); + } + + if (exchange.isDurable()) + { + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + store.createExchange(exchange); + } + + exchangeRegistry.registerExchange(exchange); + } + catch(AMQUnknownExchangeType e) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType()); + } + catch (AMQException e) + { + exception(session, method, e, "Cannot declare exchange '" + exchangeName); + } + } + else + { + if(!exchange.getTypeShortString().toString().equals(method.getType())) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type"); + } + } + + } + } + + // TODO decouple AMQException and AMQConstant error codes + private void exception(Session session, Method method, AMQException exception, String message) + { + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (exception.getErrorCode() != null) + { + try + { + errorCode = ExecutionErrorCode.get(exception.getErrorCode().getCode()); + } + catch (IllegalArgumentException iae) + { + // ignore, already set to INTERNAL_ERROR + } + } + String description = message + "': " + exception.getMessage(); + + exception(session, method, errorCode, description); + } + + private void exception(Session session, Method method, ExecutionErrorCode errorCode, String description) + { + ExecutionException ex = new ExecutionException(); + ex.setErrorCode(errorCode); + ex.setCommandId(method.getId()); + ex.setDescription(description); + + session.invoke(ex); + + session.close(); + } + + private Exchange getExchange(Session session, String exchangeName) + { + ExchangeRegistry exchangeRegistry = getExchangeRegistry(session); + return exchangeRegistry.getExchange(exchangeName); + } + + private ExchangeRegistry getExchangeRegistry(Session session) + { + VirtualHost virtualHost = getVirtualHost(session); + return virtualHost.getExchangeRegistry(); + + } + + private VirtualHost getVirtualHost(Session session) + { + ServerConnection conn = getServerConnection(session); + VirtualHost vhost = conn.getVirtualHost(); + return vhost; + } + + private ServerConnection getServerConnection(Session session) + { + ServerConnection conn = (ServerConnection) session.getConnection(); + return conn; + } + + @Override + public void exchangeDelete(Session session, ExchangeDelete method) + { + VirtualHost virtualHost = getVirtualHost(session); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + + try + { + Exchange exchange = getExchange(session, method.getExchange()); + + if(exchange == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "No such exchange '" + method.getExchange() + "'"); + } + else if(exchange.hasReferrers()) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange"); + } + else if(isStandardExchange(exchange, virtualHost.getExchangeFactory().getRegisteredTypes())) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange '"+method.getExchange()+"' cannot be deleted"); + } + else + { + exchangeRegistry.unregisterExchange(method.getExchange(), method.getIfUnused()); + + if (exchange.isDurable() && !exchange.isAutoDelete()) + { + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + store.removeExchange(exchange); + } + } + } + catch (ExchangeInUseException e) + { + exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Exchange in use"); + } + catch (AMQException e) + { + exception(session, method, e, "Cannot delete exchange '" + method.getExchange() ); + } + } + + private boolean isStandardExchange(Exchange exchange, Collection<ExchangeType<? extends Exchange>> registeredTypes) + { + for(ExchangeType type : registeredTypes) + { + if(type.getDefaultExchangeName().toString().equals( exchange.getName() )) + { + return true; + } + } + return false; + } + + @Override + public void exchangeQuery(Session session, ExchangeQuery method) + { + + ExchangeQueryResult result = new ExchangeQueryResult(); + + Exchange exchange = getExchange(session, method.getName()); + + if(exchange != null) + { + result.setDurable(exchange.isDurable()); + result.setType(exchange.getTypeShortString().toString()); + result.setNotFound(false); + } + else + { + result.setNotFound(true); + } + + session.executionResult((int) method.getId(), result); + } + + @Override + public void exchangeBind(Session session, ExchangeBind method) + { + + VirtualHost virtualHost = getVirtualHost(session); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + if (!method.hasQueue()) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set"); + } + else if (!method.hasExchange()) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "exchange not set"); + } +/* + else if (!method.hasBindingKey()) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "binding-key not set"); + } +*/ + else + { + //TODO - here because of non-compiant python tests + if (!method.hasBindingKey()) + { + method.setBindingKey(method.getQueue()); + } + AMQQueue queue = queueRegistry.getQueue(method.getQueue()); + Exchange exchange = exchangeRegistry.getExchange(method.getExchange()); + if(queue == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found"); + } + else if(exchange == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found"); + } + else if(exchange.getTypeShortString().equals(HeadersExchange.TYPE.getName()) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match"))) + { + exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getName() + " require an x-match header"); + } + else + { + AMQShortString routingKey = new AMQShortString(method.getBindingKey()); + FieldTable fieldTable = FieldTable.convertToFieldTable(method.getArguments()); + + if (!exchange.isBound(routingKey, fieldTable, queue)) + { + try + { + virtualHost.getBindingFactory().addBinding(method.getBindingKey(), queue, exchange, method.getArguments()); + } + catch (AMQException e) + { + exception(session, method, e, "Cannot add binding '" + method.getBindingKey()); + } + } + else + { + // todo + } + } + + + } + + + + } + + @Override + public void exchangeUnbind(Session session, ExchangeUnbind method) + { + VirtualHost virtualHost = getVirtualHost(session); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + if (!method.hasQueue()) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set"); + } + else if (!method.hasExchange()) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "exchange not set"); + } + else if (!method.hasBindingKey()) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "binding-key not set"); + } + else + { + AMQQueue queue = queueRegistry.getQueue(method.getQueue()); + Exchange exchange = exchangeRegistry.getExchange(method.getExchange()); + if(queue == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found"); + } + else if(exchange == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found"); + } + else + { + try + { + virtualHost.getBindingFactory().removeBinding(method.getBindingKey(), queue, exchange, null); + } + catch (AMQException e) + { + exception(session, method, e, "Cannot remove binding '" + method.getBindingKey()); + } + } + } + + + super.exchangeUnbind(session, method); + } + + @Override + public void exchangeBound(Session session, ExchangeBound method) + { + + ExchangeBoundResult result = new ExchangeBoundResult(); + Exchange exchange; + AMQQueue queue; + if(method.hasExchange()) + { + exchange = getExchange(session, method.getExchange()); + + if(exchange == null) + { + result.setExchangeNotFound(true); + } + } + else + { + exchange = getExchangeRegistry(session).getDefaultExchange(); + } + + + if(method.hasQueue()) + { + + queue = getQueue(session, method.getQueue()); + if(queue == null) + { + result.setQueueNotFound(true); + } + + + if(exchange != null && queue != null) + { + + boolean queueMatched = exchange.isBound(queue); + + result.setQueueNotMatched(!queueMatched); + + + if(method.hasBindingKey()) + { + + if(method.hasArguments()) + { + FieldTable args = FieldTable.convertToFieldTable(method.getArguments()); + + result.setArgsNotMatched(!exchange.isBound(new AMQShortString(method.getBindingKey()), args, queue)); + } + if(queueMatched) + { + result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue)); + } + else + { + result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); + } + } + else if (method.hasArguments()) + { + // TODO + + } + + result.setQueueNotMatched(!exchange.isBound(queue)); + + } + else if(exchange != null && method.hasBindingKey()) + { + if(method.hasArguments()) + { + // TODO + } + result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); + + } + + } + else if(exchange != null && method.hasBindingKey()) + { + if(method.hasArguments()) + { + // TODO + } + result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); + + } + + + session.executionResult((int) method.getId(), result); + + + } + + private AMQQueue getQueue(Session session, String queue) + { + QueueRegistry queueRegistry = getQueueRegistry(session); + return queueRegistry.getQueue(queue); + } + + private QueueRegistry getQueueRegistry(Session session) + { + return getVirtualHost(session).getQueueRegistry(); + } + + @Override + public void queueDeclare(Session session, final QueueDeclare method) + { + + VirtualHost virtualHost = getVirtualHost(session); + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + + String queueName = method.getQueue(); + AMQQueue queue; + QueueRegistry queueRegistry = getQueueRegistry(session); + //TODO: do we need to check that the queue already exists with exactly the same "configuration"? + + synchronized (queueRegistry) + { + + if (((queue = queueRegistry.getQueue(queueName)) == null)) + { + + if (method.getPassive()) + { + String description = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; + ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_FOUND; + + exception(session, method, errorCode, description); + + return; + } + else + { + try + { + queue = createQueue(queueName, method, virtualHost, (ServerSession)session); + if(method.getExclusive()) + { + queue.setExclusive(true); + } + else if(method.getAutoDelete()) + { + queue.setDeleteOnNoConsumers(true); + } + + final String alternateExchangeName = method.getAlternateExchange(); + if(alternateExchangeName != null && alternateExchangeName.length() != 0) + { + Exchange alternate = getExchange(session, alternateExchangeName); + queue.setAlternateExchange(alternate); + } + + if(method.hasArguments() && method.getArguments() != null) + { + if(method.getArguments().containsKey("no-local")) + { + Object no_local = method.getArguments().get("no-local"); + if(no_local instanceof Boolean && ((Boolean)no_local)) + { + queue.setNoLocal(true); + } + } + } + + + if (queue.isDurable() && !queue.isAutoDelete()) + { + if(method.hasArguments() && method.getArguments() != null) + { + Map<String,Object> args = method.getArguments(); + FieldTable ftArgs = new FieldTable(); + for(Map.Entry<String, Object> entry : args.entrySet()) + { + ftArgs.put(new AMQShortString(entry.getKey()), entry.getValue()); + } + store.createQueue(queue, ftArgs); + } + else + { + store.createQueue(queue); + } + } + queueRegistry.registerQueue(queue); + boolean autoRegister = ApplicationRegistry.getInstance().getConfiguration().getQueueAutoRegister(); + + if (autoRegister) + { + + ExchangeRegistry exchangeRegistry = getExchangeRegistry(session); + + Exchange defaultExchange = exchangeRegistry.getDefaultExchange(); + + virtualHost.getBindingFactory().addBinding(queueName, queue, defaultExchange, null); + + } + + if (method.hasAutoDelete() + && method.getAutoDelete() + && method.hasExclusive() + && method.getExclusive()) + { + final AMQQueue q = queue; + final ServerSession.Task deleteQueueTask = new ServerSession.Task() + { + public void doTask(ServerSession session) + { + try + { + q.delete(); + } + catch (AMQException e) + { + exception(session, method, e, "Cannot delete '" + method.getQueue()); + } + } + }; + final ServerSession s = (ServerSession) session; + s.addSessionCloseTask(deleteQueueTask); + queue.addQueueDeleteTask(new AMQQueue.Task() + { + public void doTask(AMQQueue queue) throws AMQException + { + s.removeSessionCloseTask(deleteQueueTask); + } + }); + } + if (method.hasExclusive() + && method.getExclusive()) + { + final AMQQueue q = queue; + final ServerSession.Task removeExclusive = new ServerSession.Task() + { + public void doTask(ServerSession session) + { + q.setPrincipalHolder(null); + q.setExclusiveOwningSession(null); + } + }; + final ServerSession s = (ServerSession) session; + q.setExclusiveOwningSession(s); + s.addSessionCloseTask(removeExclusive); + queue.addQueueDeleteTask(new AMQQueue.Task() + { + public void doTask(AMQQueue queue) throws AMQException + { + s.removeSessionCloseTask(removeExclusive); + } + }); + } + } + catch (AMQException e) + { + exception(session, method, e, "Cannot declare queue '" + queueName); + } + } + } + else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + { + String description = "Cannot declare queue('" + queueName + "')," + + " as exclusive queue with same name " + + "declared on another session"; + ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED; + + exception(session, method, errorCode, description); + + return; + } + } + } + + protected AMQQueue createQueue(final String queueName, + final QueueDeclare body, + VirtualHost virtualHost, + final ServerSession session) + throws AMQException + { + String owner = body.getExclusive() ? session.getClientID() : null; + + final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), + body.getExclusive(), virtualHost, body.getArguments()); + + return queue; + } + + @Override + public void queueDelete(Session session, QueueDelete method) + { + String queueName = method.getQueue(); + if(queueName == null || queueName.length()==0) + { + exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "No queue name supplied"); + + } + else + { + AMQQueue queue = getQueue(session, queueName); + + + if (queue == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "No queue " + queueName + " found"); + } + else + { + if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session) + { + exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); + } + else if (method.getIfEmpty() && !queue.isEmpty()) + { + exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " not empty"); + } + else if (method.getIfUnused() && !queue.isUnused()) + { + // TODO - Error code + exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " in use"); + + } + else + { + VirtualHost virtualHost = getVirtualHost(session); + + try + { + queue.delete(); + if (queue.isDurable() && !queue.isAutoDelete()) + { + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + store.removeQueue(queue); + } + } + catch (AMQException e) + { + exception(session, method, e, "Cannot delete queue '" + queueName); + } + } + } + } + } + + @Override + public void queuePurge(Session session, QueuePurge method) + { + String queueName = method.getQueue(); + if(queueName == null || queueName.length()==0) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "No queue name supplied"); + } + else + { + AMQQueue queue = getQueue(session, queueName); + + if (queue == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "No queue " + queueName + " found"); + } + else + { + try + { + queue.clearQueue(); + } + catch (AMQException e) + { + exception(session, method, e, "Cannot purge queue '" + queueName); + } + } + } + } + + @Override + public void queueQuery(Session session, QueueQuery method) + { + QueueQueryResult result = new QueueQueryResult(); + + AMQQueue queue = getQueue(session, method.getQueue()); + + if(queue != null) + { + result.setQueue(queue.getNameShortString().toString()); + result.setDurable(queue.isDurable()); + result.setExclusive(queue.isExclusive()); + result.setAutoDelete(queue.isAutoDelete()); + result.setArguments(queue.getArguments()); + result.setMessageCount(queue.getMessageCount()); + result.setSubscriberCount(queue.getConsumerCount()); + + } + + + session.executionResult((int) method.getId(), result); + + } + + @Override + public void messageSetFlowMode(Session session, MessageSetFlowMode sfm) + { + String destination = sfm.getDestination(); + + Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + + if(sub == null) + { + exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); + } + else if(sub.isStopped()) + { + sub.setFlowMode(sfm.getFlowMode()); + } + } + + @Override + public void messageStop(Session session, MessageStop stop) + { + String destination = stop.getDestination(); + + Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + + if(sub == null) + { + exception(session, stop, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); + } + else + { + sub.stop(); + } + + } + + @Override + public void messageFlow(Session session, MessageFlow flow) + { + String destination = flow.getDestination(); + + Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + + if(sub == null) + { + exception(session, flow, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); + } + else + { + sub.addCredit(flow.getUnit(), flow.getValue()); + } + + } + + @Override + public void closed(Session session) + { + for(Subscription_0_10 sub : getSubscriptions(session)) + { + ((ServerSession)session).unregister(sub); + } + ((ServerSession)session).onClose(); + } + + @Override + public void detached(Session session) + { + closed(session); + } + + public Collection<Subscription_0_10> getSubscriptions(Session session) + { + return ((ServerSession)session).getSubscriptions(); + } + +} |