summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java72
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java46
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java1361
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java50
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java235
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java417
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java54
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java425
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java75
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java201
10 files changed, 2936 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
new file mode 100644
index 0000000000..061ebf50cd
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+
+public interface AMQConnectionModel extends StatisticsGatherer
+{
+ /**
+ * get a unique id for this connection.
+ *
+ * @return a {@link UUID} representing the connection
+ */
+ public UUID getId();
+
+ /**
+ * Close the underlying Connection
+ *
+ * @param cause
+ * @param message
+ * @throws org.apache.qpid.AMQException
+ */
+ public void close(AMQConstant cause, String message) throws AMQException;
+
+ /**
+ * Close the given requested Session
+ *
+ * @param session
+ * @param cause
+ * @param message
+ * @throws org.apache.qpid.AMQException
+ */
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException;
+
+ public long getConnectionId();
+
+ /**
+ * Get a list of all sessions using this connection.
+ *
+ * @return a list of {@link AMQSessionModel}s
+ */
+ public List<AMQSessionModel> getSessionModels();
+
+ /**
+ * Return a {@link LogSubject} for the connection.
+ */
+ public LogSubject getLogSubject();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
new file mode 100644
index 0000000000..a7599a3e0d
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+
+/**
+ * AMQNoMethodHandlerException represents the case where no method handler exists to handle an AQMP method.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represents failure to handle an AMQP method.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Missing method handler. Unlikely to ever happen, and if it does its a coding error. Consider replacing with a
+ * Runtime.
+ */
+public class AMQNoMethodHandlerException extends AMQException
+{
+ public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt)
+ {
+ super("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
new file mode 100644
index 0000000000..449f698c48
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -0,0 +1,1361 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.JMException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.codec.AMQDecoder;
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQProtocolHeaderException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.pool.Job;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.ConnectionConfig;
+import org.apache.qpid.server.configuration.ConnectionConfigType;
+import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
+import org.apache.qpid.server.management.Managable;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.Sender;
+
+public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
+{
+ private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
+
+ private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
+
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+
+ // to save boxing the channelId and looking up in a map... cache in an array the low numbered
+ // channels. This value must be of the form 2^x - 1.
+ private static final int CHANNEL_CACHE_SIZE = 0xff;
+
+ private AMQShortString _contextKey;
+
+ private AMQShortString _clientVersion = null;
+
+ private VirtualHost _virtualHost;
+
+ private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
+
+ private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
+
+ private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
+
+ private final AMQStateManager _stateManager;
+
+ private AMQCodecFactory _codecFactory;
+
+ private AMQProtocolSessionMBean _managedObject;
+
+ private SaslServer _saslServer;
+
+ private Object _lastReceived;
+
+ private Object _lastSent;
+
+ protected volatile boolean _closed;
+
+ // maximum number of channels this session should have
+ private long _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
+
+ /* AMQP Version for this session */
+ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
+
+ private FieldTable _clientProperties;
+ private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+
+ private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
+ private ProtocolOutputConverter _protocolOutputConverter;
+ private Principal _authorizedID;
+ private MethodDispatcher _dispatcher;
+ private ProtocolSessionIdentifier _sessionIdentifier;
+
+ // Create a simple ID that increments for ever new Session
+ private final long _sessionID = idGenerator.getAndIncrement();
+
+ private AMQPConnectionActor _actor;
+ private LogSubject _logSubject;
+
+ private NetworkDriver _networkDriver;
+
+ private long _lastIoTime;
+
+ private long _writtenBytes;
+ private long _readBytes;
+
+ private Job _readJob;
+ private Job _writeJob;
+
+ private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+ private long _maxFrameSize;
+ private final AtomicBoolean _closing = new AtomicBoolean(false);
+ private final UUID _id;
+ private final ConfigStore _configStore;
+ private long _createTime = System.currentTimeMillis();
+
+ private ApplicationRegistry _registry;
+ private boolean _statisticsEnabled = false;
+ private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+
+ public ManagedObject getManagedObject()
+ {
+ return _managedObject;
+ }
+
+ public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver)
+ {
+ _stateManager = new AMQStateManager(virtualHostRegistry, this);
+ _networkDriver = driver;
+
+ _codecFactory = new AMQCodecFactory(true, this);
+ _poolReference.acquireExecutorService();
+ _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
+
+ _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
+
+ _logSubject = new ConnectionLogSubject(this);
+
+ _configStore = virtualHostRegistry.getConfigStore();
+ _id = _configStore.createId();
+
+ _actor.message(ConnectionMessages.OPEN(null, null, false, false));
+
+ _registry = virtualHostRegistry.getApplicationRegistry();
+ initialiseStatistics();
+ }
+
+ private AMQProtocolSessionMBean createMBean() throws JMException
+ {
+ return new AMQProtocolSessionMBean(this);
+ }
+
+ public long getSessionID()
+ {
+ return _sessionID;
+ }
+
+ public LogActor getLogActor()
+ {
+ return _actor;
+ }
+
+ public void setMaxFrameSize(long frameMax)
+ {
+ _maxFrameSize = frameMax;
+ }
+
+ public long getMaxFrameSize()
+ {
+ return _maxFrameSize;
+ }
+
+ public boolean isClosing()
+ {
+ return _closing.get();
+ }
+
+ public void received(final ByteBuffer msg)
+ {
+ _lastIoTime = System.currentTimeMillis();
+ try
+ {
+ final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
+ {
+ public void run()
+ {
+ // Decode buffer
+
+ for (AMQDataBlock dataBlock : dataBlocks)
+ {
+ try
+ {
+ dataBlockReceived(dataBlock);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unexpected exception when processing datablock", e);
+ closeProtocolSession();
+ }
+ }
+ }
+ });
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unexpected exception when processing datablock", e);
+ closeProtocolSession();
+ }
+ }
+
+ public void dataBlockReceived(AMQDataBlock message) throws Exception
+ {
+ _lastReceived = message;
+ if (message instanceof ProtocolInitiation)
+ {
+ protocolInitiationReceived((ProtocolInitiation) message);
+
+ }
+ else if (message instanceof AMQFrame)
+ {
+ AMQFrame frame = (AMQFrame) message;
+ frameReceived(frame);
+
+ }
+ else
+ {
+ throw new AMQException("Unknown message type: " + message.getClass().getName() + ": " + message);
+ }
+ }
+
+ private void frameReceived(AMQFrame frame) throws AMQException
+ {
+ int channelId = frame.getChannel();
+ AMQBody body = frame.getBodyFrame();
+
+ //Look up the Channel's Actor and set that as the current actor
+ // If that is not available then we can use the ConnectionActor
+ // that is associated with this AMQMPSession.
+ LogActor channelActor = null;
+ if (_channelMap.get(channelId) != null)
+ {
+ channelActor = _channelMap.get(channelId).getLogActor();
+ }
+ CurrentActor.set(channelActor == null ? _actor : channelActor);
+
+ try
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Frame Received: " + frame);
+ }
+
+ // Check that this channel is not closing
+ if (channelAwaitingClosure(channelId))
+ {
+ if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
+ }
+ }
+ else
+ {
+ // The channel has been told to close, we don't process any more frames until
+ // it's closed.
+ return;
+ }
+ }
+
+ try
+ {
+ body.handle(channelId, this);
+ }
+ catch (AMQException e)
+ {
+ closeChannel(channelId);
+ throw e;
+ }
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+ }
+
+ private void protocolInitiationReceived(ProtocolInitiation pi)
+ {
+ // this ensures the codec never checks for a PI message again
+ ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+ try
+ {
+ // Log incomming protocol negotiation request
+ _actor.message(ConnectionMessages.OPEN(null, pi._protocolMajor + "-" + pi._protocolMinor, false, true));
+
+ ProtocolVersion pv = pi.checkVersion(); // Fails if not correct
+
+ // This sets the protocol version (and hence framing classes) for this session.
+ setProtocolVersion(pv);
+
+ String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
+
+ String locales = "en_US";
+
+ AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
+ (short) pv.getActualMinorVersion(),
+ null,
+ mechanisms.getBytes(),
+ locales.getBytes());
+ _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer());
+
+ }
+ catch (AMQException e)
+ {
+ _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
+
+ _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer());
+ }
+ }
+
+ public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
+ {
+ final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
+
+ try
+ {
+ try
+ {
+ boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
+
+ if (!_frameListeners.isEmpty())
+ {
+ for (AMQMethodListener listener : _frameListeners)
+ {
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ }
+ }
+
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQNoMethodHandlerException(evt);
+ }
+ }
+ catch (AMQChannelException e)
+ {
+ if (getChannel(channelId) != null)
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing channel due to: " + e.getMessage());
+ }
+
+ writeFrame(e.getCloseFrame(channelId));
+ closeChannel(channelId);
+ }
+ else
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage());
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing connection due to: " + e.getMessage());
+ }
+
+ AMQConnectionException ce =
+ evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
+ AMQConstant.CHANNEL_ERROR.getName().toString());
+
+ _logger.info(e.getMessage() + " whilst processing:" + methodBody);
+ closeConnection(channelId, ce, false);
+ }
+ }
+ catch (AMQConnectionException e)
+ {
+ _logger.info(e.getMessage() + " whilst processing:" + methodBody);
+ closeConnection(channelId, e, false);
+ }
+ catch (AMQSecurityException e)
+ {
+ AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ _logger.info(e.getMessage() + " whilst processing:" + methodBody);
+ closeConnection(channelId, ce, false);
+ }
+ }
+ catch (Exception e)
+ {
+ for (AMQMethodListener listener : _frameListeners)
+ {
+ listener.error(e);
+ }
+
+ _logger.error("Unexpected exception while processing frame. Closing connection.", e);
+
+ closeProtocolSession();
+ }
+ }
+
+ public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
+ {
+
+ AMQChannel channel = getAndAssertChannel(channelId);
+
+ channel.publishContentHeader(body);
+
+ }
+
+ public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
+ {
+ AMQChannel channel = getAndAssertChannel(channelId);
+
+ channel.publishContentBody(body);
+ }
+
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
+ {
+ // NO - OP
+ }
+
+ /**
+ * Convenience method that writes a frame to the protocol session. Equivalent to calling
+ * getProtocolSession().write().
+ *
+ * @param frame the frame to write
+ */
+ public void writeFrame(AMQDataBlock frame)
+ {
+ _lastSent = frame;
+ final ByteBuffer buf = frame.toNioByteBuffer();
+ _lastIoTime = System.currentTimeMillis();
+ _writtenBytes += buf.remaining();
+ Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
+ {
+ public void run()
+ {
+ _networkDriver.send(buf);
+ }
+ });
+ }
+
+ public AMQShortString getContextKey()
+ {
+ return _contextKey;
+ }
+
+ public void setContextKey(AMQShortString contextKey)
+ {
+ _contextKey = contextKey;
+ }
+
+ public List<AMQChannel> getChannels()
+ {
+ return new ArrayList<AMQChannel>(_channelMap.values());
+ }
+
+ public AMQChannel getAndAssertChannel(int channelId) throws AMQException
+ {
+ AMQChannel channel = getChannel(channelId);
+ if (channel == null)
+ {
+ throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
+ }
+
+ return channel;
+ }
+
+ public AMQChannel getChannel(int channelId)
+ {
+ final AMQChannel channel =
+ ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
+ if ((channel == null) || channel.isClosing())
+ {
+ return null;
+ }
+ else
+ {
+ return channel;
+ }
+ }
+
+ public boolean channelAwaitingClosure(int channelId)
+ {
+ return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId);
+ }
+
+ public void addChannel(AMQChannel channel) throws AMQException
+ {
+ if (_closed)
+ {
+ throw new AMQException("Session is closed");
+ }
+
+ final int channelId = channel.getChannelId();
+
+ if (_closingChannelsList.containsKey(channelId))
+ {
+ throw new AMQException("Session is marked awaiting channel close");
+ }
+
+ if (_channelMap.size() == _maxNoOfChannels)
+ {
+ String errorMessage =
+ toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
+ + "); can't create channel";
+ _logger.error(errorMessage);
+ throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
+ }
+ else
+ {
+ _channelMap.put(channel.getChannelId(), channel);
+ }
+
+ if (((channelId & CHANNEL_CACHE_SIZE) == channelId))
+ {
+ _cachedChannels[channelId] = channel;
+ }
+
+ checkForNotification();
+ }
+
+ private void checkForNotification()
+ {
+ int channelsCount = _channelMap.size();
+ if (_managedObject != null && channelsCount >= _maxNoOfChannels)
+ {
+ _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
+ }
+ }
+
+ public Long getMaximumNumberOfChannels()
+ {
+ return _maxNoOfChannels;
+ }
+
+ public void setMaximumNumberOfChannels(Long value)
+ {
+ _maxNoOfChannels = value;
+ }
+
+ public void commitTransactions(AMQChannel channel) throws AMQException
+ {
+ if ((channel != null) && channel.isTransactional())
+ {
+ channel.commit();
+ }
+ }
+
+ public void rollbackTransactions(AMQChannel channel) throws AMQException
+ {
+ if ((channel != null) && channel.isTransactional())
+ {
+ channel.rollback();
+ }
+ }
+
+ /**
+ * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
+ * subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
+ *
+ * @param channelId id of the channel to close
+ *
+ * @throws AMQException if an error occurs closing the channel
+ * @throws IllegalArgumentException if the channel id is not valid
+ */
+ public void closeChannel(int channelId) throws AMQException
+ {
+ final AMQChannel channel = getChannel(channelId);
+ if (channel == null)
+ {
+ throw new IllegalArgumentException("Unknown channel id");
+ }
+ else
+ {
+ try
+ {
+ channel.close();
+ markChannelAwaitingCloseOk(channelId);
+ }
+ finally
+ {
+ removeChannel(channelId);
+ }
+ }
+ }
+
+ public void closeChannelOk(int channelId)
+ {
+ // todo QPID-847 - This is called from two lcoations ChannelCloseHandler and ChannelCloseOkHandler.
+ // When it is the CC_OK_Handler then it makes sence to remove the channel else we will leak memory.
+ // We do it from the Close Handler as we are sending the OK back to the client.
+ // While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException
+ // will send a close-ok.. Where we should call removeChannel.
+ // However, due to the poor exception handling on the client. The client-user will be notified of the
+ // InvalidArgument and if they then decide to close the session/connection then the there will be time
+ // for that to occur i.e. a new close method be sent before the exeption handling can mark the session closed.
+ //removeChannel(channelId);
+ _closingChannelsList.remove(channelId);
+ }
+
+ private void markChannelAwaitingCloseOk(int channelId)
+ {
+ _closingChannelsList.put(channelId, System.currentTimeMillis());
+ }
+
+ /**
+ * In our current implementation this is used by the clustering code.
+ *
+ * @param channelId The channel to remove
+ */
+ public void removeChannel(int channelId)
+ {
+ _channelMap.remove(channelId);
+ if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
+ {
+ _cachedChannels[channelId] = null;
+ }
+ }
+
+ /**
+ * Initialise heartbeats on the session.
+ *
+ * @param delay delay in seconds (not ms)
+ */
+ public void initHeartbeats(int delay)
+ {
+ if (delay > 0)
+ {
+ _networkDriver.setMaxWriteIdle(delay);
+ _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
+ }
+ }
+
+ /**
+ * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel.
+ *
+ * @throws AMQException if an error occurs while closing any channel
+ */
+ private void closeAllChannels() throws AMQException
+ {
+ for (AMQChannel channel : _channelMap.values())
+ {
+ channel.close();
+ }
+
+ _channelMap.clear();
+ for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
+ {
+ _cachedChannels[i] = null;
+ }
+ }
+
+ /** This must be called when the session is _closed in order to free up any resources managed by the session. */
+ public void closeSession() throws AMQException
+ {
+ if(_closing.compareAndSet(false,true))
+ {
+ // REMOVE THIS SHOULD NOT BE HERE.
+ if (CurrentActor.get() == null)
+ {
+ CurrentActor.set(_actor);
+ }
+ if (!_closed)
+ {
+ if (_virtualHost != null)
+ {
+ _virtualHost.getConnectionRegistry().deregisterConnection(this);
+ }
+
+ closeAllChannels();
+
+ getConfigStore().removeConfiguredObject(this);
+
+ if (_managedObject != null)
+ {
+ _managedObject.unregister();
+ // Ensure we only do this once.
+ _managedObject = null;
+ }
+
+ for (Task task : _taskList)
+ {
+ task.doTask(this);
+ }
+
+ synchronized(this)
+ {
+ _closed = true;
+ notifyAll();
+ }
+ _poolReference.releaseExecutorService();
+ CurrentActor.get().message(_logSubject, ConnectionMessages.CLOSE());
+ }
+ }
+ else
+ {
+ synchronized(this)
+ {
+ while(!_closed)
+ {
+ try
+ {
+ wait(1000);
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+ }
+ }
+ }
+ }
+
+ public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing connection due to: " + e);
+ }
+
+ markChannelAwaitingCloseOk(channelId);
+ closeSession();
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ writeFrame(e.getCloseFrame(channelId));
+
+ if (closeProtocolSession)
+ {
+ closeProtocolSession();
+ }
+ }
+
+ public void closeProtocolSession()
+ {
+ _networkDriver.close();
+ try
+ {
+ _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ }
+ catch (AMQException e)
+ {
+ _logger.info(e.getMessage());
+ }
+ }
+
+ public String toString()
+ {
+ return getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
+ }
+
+ public String dump()
+ {
+ return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived;
+ }
+
+ /** @return an object that can be used to identity */
+ public Object getKey()
+ {
+ return getRemoteAddress();
+ }
+
+ /**
+ * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may
+ * be bound to multiple addresses this could vary depending on the acceptor this session was created from.
+ *
+ * @return a String FQDN
+ */
+ public String getLocalFQDN()
+ {
+ SocketAddress address = _networkDriver.getLocalAddress();
+ // we use the vmpipe address in some tests hence the need for this rather ugly test. The host
+ // information is used by SASL primary.
+ if (address instanceof InetSocketAddress)
+ {
+ return ((InetSocketAddress) address).getHostName();
+ }
+ else if (address instanceof VmPipeAddress)
+ {
+ return "vmpipe:" + ((VmPipeAddress) address).getPort();
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unsupported socket address class: " + address);
+ }
+ }
+
+ public SaslServer getSaslServer()
+ {
+ return _saslServer;
+ }
+
+ public void setSaslServer(SaslServer saslServer)
+ {
+ _saslServer = saslServer;
+ }
+
+ public FieldTable getClientProperties()
+ {
+ return _clientProperties;
+ }
+
+ public void setClientProperties(FieldTable clientProperties)
+ {
+ _clientProperties = clientProperties;
+ if (_clientProperties != null)
+ {
+ if (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null)
+ {
+ String clientID = _clientProperties.getString(CLIENT_PROPERTIES_INSTANCE);
+ setContextKey(new AMQShortString(clientID));
+
+ // Log the Opening of the connection for this client
+ _actor.message(ConnectionMessages.OPEN(clientID, _protocolVersion.toString(), true, true));
+ }
+
+ if (_clientProperties.getString(ClientProperties.version.toString()) != null)
+ {
+ _clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString()));
+ }
+ }
+ _sessionIdentifier = new ProtocolSessionIdentifier(this);
+ }
+
+ private void setProtocolVersion(ProtocolVersion pv)
+ {
+ _protocolVersion = pv;
+
+ _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
+ _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion);
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return _protocolVersion.getMajorVersion();
+ }
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return _protocolVersion;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return _protocolVersion.getMinorVersion();
+ }
+
+ public boolean isProtocolVersion(byte major, byte minor)
+ {
+ return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor);
+ }
+
+ public MethodRegistry getRegistry()
+ {
+ return getMethodRegistry();
+ }
+
+ public Object getClientIdentifier()
+ {
+ return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null;
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public void setVirtualHost(VirtualHost virtualHost) throws AMQException
+ {
+ _virtualHost = virtualHost;
+
+ _virtualHost.getConnectionRegistry().registerConnection(this);
+
+ _configStore.addConfiguredObject(this);
+
+ try
+ {
+ _managedObject = createMBean();
+ _managedObject.register();
+ }
+ catch (JMException e)
+ {
+ _logger.error(e);
+ }
+ }
+
+ public void addSessionCloseTask(Task task)
+ {
+ _taskList.add(task);
+ }
+
+ public void removeSessionCloseTask(Task task)
+ {
+ _taskList.remove(task);
+ }
+
+ public ProtocolOutputConverter getProtocolOutputConverter()
+ {
+ return _protocolOutputConverter;
+ }
+
+ public void setAuthorizedID(Principal authorizedID)
+ {
+ _authorizedID = authorizedID;
+ }
+
+ public Principal getAuthorizedID()
+ {
+ return _authorizedID;
+ }
+
+ public Principal getPrincipal()
+ {
+ return _authorizedID;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
+ }
+
+ public MethodRegistry getMethodRegistry()
+ {
+ return MethodRegistry.getMethodRegistry(getProtocolVersion());
+ }
+
+ public MethodDispatcher getMethodDispatcher()
+ {
+ return _dispatcher;
+ }
+
+ public void closed()
+ {
+ try
+ {
+ closeSession();
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Could not close protocol engine", e);
+ }
+ }
+
+ public void readerIdle()
+ {
+ // Nothing
+ }
+
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
+ public void writerIdle()
+ {
+ _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer());
+ }
+
+ public void exception(Throwable throwable)
+ {
+ if (throwable instanceof AMQProtocolHeaderException)
+ {
+ writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
+ _networkDriver.close();
+
+ _logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable);
+ }
+ else if (throwable instanceof IOException)
+ {
+ _logger.error("IOException caught in" + this + ", session closed implictly: " + throwable);
+ }
+ else
+ {
+ _logger.error("Exception caught in" + this + ", closing session explictly: " + throwable, throwable);
+
+
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion());
+ ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
+
+ writeFrame(closeBody.generateFrame(0));
+
+ _networkDriver.close();
+ }
+ }
+
+ public void init()
+ {
+ // Do nothing
+ }
+
+ public void setSender(Sender<ByteBuffer> sender)
+ {
+ // Do nothing
+ }
+
+ public long getReadBytes()
+ {
+ return _readBytes;
+ }
+
+ public long getWrittenBytes()
+ {
+ return _writtenBytes;
+ }
+
+ public long getLastIoTime()
+ {
+ return _lastIoTime;
+ }
+
+ public ProtocolSessionIdentifier getSessionIdentifier()
+ {
+ return _sessionIdentifier;
+ }
+
+ public String getClientVersion()
+ {
+ return (_clientVersion == null) ? null : _clientVersion.toString();
+ }
+
+ public Boolean isIncoming()
+ {
+ return true;
+ }
+
+ public Boolean isSystemConnection()
+ {
+ return false;
+ }
+
+ public Boolean isFederationLink()
+ {
+ return false;
+ }
+
+ public String getAuthId()
+ {
+ return getAuthorizedID().getName();
+ }
+
+ public Integer getRemotePID()
+ {
+ return null;
+ }
+
+ public String getRemoteProcessName()
+ {
+ return null;
+ }
+
+ public Integer getRemoteParentPID()
+ {
+ return null;
+ }
+
+ public ConfigStore getConfigStore()
+ {
+ return _configStore;
+ }
+
+ public ConnectionConfigType getConfigType()
+ {
+ return ConnectionConfigType.getInstance();
+ }
+
+ public ConfiguredObject getParent()
+ {
+ return getVirtualHost();
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public long getConnectionId()
+ {
+ return getSessionID();
+ }
+
+ public String getAddress()
+ {
+ return String.valueOf(getRemoteAddress());
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
+ public Boolean isShadow()
+ {
+ return false;
+ }
+
+ public void mgmtClose()
+ {
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ConnectionCloseBody responseBody =
+ methodRegistry.createConnectionCloseBody(
+ AMQConstant.REPLY_SUCCESS.getCode(),
+ new AMQShortString("The connection was closed using the broker's management interface."),
+ 0,0);
+
+ // This seems ugly but because we use closeConnection in both normal
+ // broker operation and as part of the management interface it cannot
+ // be avoided. The Current Actor will be null when this method is
+ // called via the QMF management interface. As such we need to set one.
+ boolean removeActor = false;
+ if (CurrentActor.get() == null)
+ {
+ removeActor = true;
+ CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger()));
+ }
+
+ try
+ {
+ writeFrame(responseBody.generateFrame(0));
+
+ try
+ {
+
+ closeSession();
+ }
+ catch (AMQException ex)
+ {
+ throw new RuntimeException(ex);
+ }
+ }
+ finally
+ {
+ if (removeActor)
+ {
+ CurrentActor.remove();
+ }
+ }
+ }
+
+ public void mgmtCloseChannel(int channelId)
+ {
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ChannelCloseBody responseBody =
+ methodRegistry.createChannelCloseBody(
+ AMQConstant.REPLY_SUCCESS.getCode(),
+ new AMQShortString("The channel was closed using the broker's management interface."),
+ 0,0);
+
+ // This seems ugly but because we use AMQChannel.close() in both normal
+ // broker operation and as part of the management interface it cannot
+ // be avoided. The Current Actor will be null when this method is
+ // called via the QMF management interface. As such we need to set one.
+ boolean removeActor = false;
+ if (CurrentActor.get() == null)
+ {
+ removeActor = true;
+ CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger()));
+ }
+
+ try
+ {
+ writeFrame(responseBody.generateFrame(channelId));
+
+ try
+ {
+ closeChannel(channelId);
+ }
+ catch (AMQException ex)
+ {
+ throw new RuntimeException(ex);
+ }
+ }
+ finally
+ {
+ if (removeActor)
+ {
+ CurrentActor.remove();
+ }
+ }
+ }
+
+ public String getClientID()
+ {
+ return getContextKey().toString();
+ }
+
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ {
+ closeChannel((Integer)session.getID());
+
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ChannelCloseBody responseBody =
+ methodRegistry.createChannelCloseBody(
+ cause.getCode(),
+ new AMQShortString(message),
+ 0,0);
+
+ writeFrame(responseBody.generateFrame((Integer)session.getID()));
+ }
+
+ public void close(AMQConstant cause, String message) throws AMQException
+ {
+ closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
+ getProtocolOutputConverter().getProtocolMajorVersion(),
+ getProtocolOutputConverter().getProtocolMinorVersion(),
+ (Throwable) null), true);
+ }
+
+ public List<AMQSessionModel> getSessionModels()
+ {
+ List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+ for (AMQChannel channel : getChannels())
+ {
+ sessions.add((AMQSessionModel) channel);
+ }
+ return sessions;
+ }
+
+ public LogSubject getLogSubject()
+ {
+ return _logSubject;
+ }
+
+ public void registerMessageDelivered(long messageSize)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
+ }
+ _virtualHost.registerMessageDelivered(messageSize);
+ }
+
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
+ }
+ _virtualHost.registerMessageReceived(messageSize, timestamp);
+ }
+
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messagesReceived;
+ }
+
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceived;
+ }
+
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messagesDelivered;
+ }
+
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDelivered;
+ }
+
+ public void resetStatistics()
+ {
+ _messagesDelivered.reset();
+ _dataDelivered.reset();
+ _messagesReceived.reset();
+ _dataReceived.reset();
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+ _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled());
+
+ _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
+ _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
+ _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
+ _dataReceived = new StatisticsCounter("data-received-" + getSessionID());
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
new file mode 100644
index 0000000000..0e4444725e
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
@@ -0,0 +1,50 @@
+package org.apache.qpid.server.protocol;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriver;
+
+public class AMQProtocolEngineFactory implements ProtocolEngineFactory
+{
+ private VirtualHostRegistry _vhosts;
+
+ public AMQProtocolEngineFactory()
+ {
+ this(1);
+ }
+
+ public AMQProtocolEngineFactory(Integer port)
+ {
+ _vhosts = ApplicationRegistry.getInstance(port).getVirtualHostRegistry();
+ }
+
+
+ public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
+ {
+ return new AMQProtocolEngine(_vhosts, networkDriver);
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
new file mode 100644
index 0000000000..c64ed4ad5a
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -0,0 +1,235 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import javax.security.sasl.SaslServer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.security.Principal;
+import java.util.List;
+
+
+public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder, AMQConnectionModel
+{
+ long getSessionID();
+
+ LogActor getLogActor();
+
+ void setMaxFrameSize(long frameMax);
+
+ long getMaxFrameSize();
+
+ boolean isClosing();
+
+ public static final class ProtocolSessionIdentifier
+ {
+ private final Object _sessionIdentifier;
+ private final Object _sessionInstance;
+
+ ProtocolSessionIdentifier(AMQProtocolSession session)
+ {
+ _sessionIdentifier = session.getClientIdentifier();
+ _sessionInstance = session.getClientProperties() == null ? null : session.getClientProperties().getObject(ClientProperties.instance.toAMQShortString());
+ }
+
+ public Object getSessionIdentifier()
+ {
+ return _sessionIdentifier;
+ }
+
+ public Object getSessionInstance()
+ {
+ return _sessionInstance;
+ }
+ }
+
+ public static interface Task
+ {
+ public void doTask(AMQProtocolSession session) throws AMQException;
+ }
+
+ /**
+ * Called when a protocol data block is received
+ *
+ * @param message the data block that has been received
+ *
+ * @throws Exception if processing the datablock fails
+ */
+ void dataBlockReceived(AMQDataBlock message) throws Exception;
+
+ /**
+ * Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC
+ * 6).
+ *
+ * @return the context key
+ */
+ AMQShortString getContextKey();
+
+ /**
+ * Set the context key associated with this session. Context key is described in the AMQ protocol specification (RFC
+ * 6).
+ *
+ * @param contextKey the context key
+ */
+ void setContextKey(AMQShortString contextKey);
+
+ /**
+ * Get the channel for this session associated with the specified id. A channel id is unique per connection (i.e.
+ * per session).
+ *
+ * @param channelId the channel id which must be valid
+ *
+ * @return null if no channel exists, the channel otherwise
+ */
+ AMQChannel getChannel(int channelId);
+
+ /**
+ * Associate a channel with this session.
+ *
+ * @param channel the channel to associate with this session. It is an error to associate the same channel with more
+ * than one session but this is not validated.
+ */
+ void addChannel(AMQChannel channel) throws AMQException;
+
+ /**
+ * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
+ * subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
+ *
+ * @param channelId id of the channel to close
+ *
+ * @throws org.apache.qpid.AMQException if an error occurs closing the channel
+ * @throws IllegalArgumentException if the channel id is not valid
+ */
+ void closeChannel(int channelId) throws AMQException;
+
+ /**
+ * Markes the specific channel as closed. This will release the lock for that channel id so a new channel can be
+ * created on that id.
+ *
+ * @param channelId id of the channel to close
+ */
+ void closeChannelOk(int channelId);
+
+ /**
+ * Check to see if this chanel is closing
+ *
+ * @param channelId id to check
+ * @return boolean with state of channel awaiting closure
+ */
+ boolean channelAwaitingClosure(int channelId);
+
+ /**
+ * Remove a channel from the session but do not close it.
+ *
+ * @param channelId
+ */
+ void removeChannel(int channelId);
+
+ /**
+ * Initialise heartbeats on the session.
+ *
+ * @param delay delay in seconds (not ms)
+ */
+ void initHeartbeats(int delay);
+
+ /** This must be called when the session is _closed in order to free up any resources managed by the session. */
+ void closeSession() throws AMQException;
+
+ /** This must be called to close the session in order to free up any resources managed by the session. */
+ void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException;
+
+
+ /** @return a key that uniquely identifies this session */
+ Object getKey();
+
+ /**
+ * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may
+ * be bound to multiple addresses this could vary depending on the acceptor this session was created from.
+ *
+ * @return a String FQDN
+ */
+ String getLocalFQDN();
+
+ /** @return the sasl server that can perform authentication for this session. */
+ SaslServer getSaslServer();
+
+ /**
+ * Set the sasl server that is to perform authentication for this session.
+ *
+ * @param saslServer
+ */
+ void setSaslServer(SaslServer saslServer);
+
+
+ FieldTable getClientProperties();
+
+ void setClientProperties(FieldTable clientProperties);
+
+ Object getClientIdentifier();
+
+ VirtualHost getVirtualHost();
+
+ void setVirtualHost(VirtualHost virtualHost) throws AMQException;
+
+ void addSessionCloseTask(Task task);
+
+ void removeSessionCloseTask(Task task);
+
+ public ProtocolOutputConverter getProtocolOutputConverter();
+
+ void setAuthorizedID(Principal authorizedID);
+
+ public java.net.SocketAddress getRemoteAddress();
+
+ public MethodRegistry getMethodRegistry();
+
+ public MethodDispatcher getMethodDispatcher();
+
+ public ProtocolSessionIdentifier getSessionIdentifier();
+
+ String getClientVersion();
+
+ long getLastIoTime();
+
+ long getWrittenBytes();
+
+ Long getMaximumNumberOfChannels();
+
+ void setMaximumNumberOfChannels(Long value);
+
+ void commitTransactions(AMQChannel channel) throws AMQException;
+
+ void rollbackTransactions(AMQChannel channel) throws AMQException;
+
+ List<AMQChannel> getChannels();
+
+ void mgmtCloseChannel(int channelId);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
new file mode 100644
index 0000000000..fcac78fafa
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -0,0 +1,417 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import java.util.Date;
+import java.util.List;
+
+import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.NotCompliantMBeanException;
+import javax.management.Notification;
+import javax.management.ObjectName;
+import javax.management.monitor.MonitorNotification;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
+
+/**
+ * This MBean class implements the management interface. In order to make more attributes, operations and notifications
+ * available over JMX simply augment the ManagedConnection interface and add the appropriate implementation here.
+ */
+@MBeanDescription("Management Bean for an AMQ Broker Connection")
+public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection
+{
+ private AMQProtocolSession _protocolSession = null;
+ private String _name = null;
+
+ // openmbean data types for representing the channel attributes
+
+ private static final OpenType[] _channelAttributeTypes =
+ { SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER, SimpleType.BOOLEAN };
+ private static CompositeType _channelType = null; // represents the data type for channel data
+ private static TabularType _channelsType = null; // Data type for list of channels type
+ private static final AMQShortString BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION =
+ new AMQShortString("Broker Management Console has closed the connection.");
+
+ @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
+ public AMQProtocolSessionMBean(AMQProtocolSession amqProtocolSession) throws NotCompliantMBeanException, OpenDataException
+ {
+ super(ManagedConnection.class, ManagedConnection.TYPE);
+ _protocolSession = amqProtocolSession;
+ String remote = getRemoteAddress();
+ _name = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
+ init();
+ }
+
+ static
+ {
+ try
+ {
+ init();
+ }
+ catch (JMException ex)
+ {
+ // This is not expected to ever occur.
+ throw new RuntimeException("Got JMException in static initializer.", ex);
+ }
+ }
+
+ /**
+ * initialises the openmbean data types
+ */
+ private static void init() throws OpenDataException
+ {
+ _channelType =
+ new CompositeType("Channel", "Channel Details", COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]),
+ COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]), _channelAttributeTypes);
+ _channelsType = new TabularType("Channels", "Channels", _channelType, TABULAR_UNIQUE_INDEX.toArray(new String[TABULAR_UNIQUE_INDEX.size()]));
+ }
+
+ public String getClientId()
+ {
+ return String.valueOf(_protocolSession.getContextKey());
+ }
+
+ public String getAuthorizedId()
+ {
+ return (_protocolSession.getPrincipal() != null ) ? _protocolSession.getPrincipal().getName() : null;
+ }
+
+ public String getVersion()
+ {
+ return (_protocolSession.getClientVersion() == null) ? null : _protocolSession.getClientVersion().toString();
+ }
+
+ public Date getLastIoTime()
+ {
+ return new Date(_protocolSession.getLastIoTime());
+ }
+
+ public String getRemoteAddress()
+ {
+ return _protocolSession.getRemoteAddress().toString();
+ }
+
+ public ManagedObject getParentObject()
+ {
+ return _protocolSession.getVirtualHost().getManagedObject();
+ }
+
+ public Long getWrittenBytes()
+ {
+ return _protocolSession.getWrittenBytes();
+ }
+
+ public Long getReadBytes()
+ {
+ return _protocolSession.getWrittenBytes();
+ }
+
+ public Long getMaximumNumberOfChannels()
+ {
+ return _protocolSession.getMaximumNumberOfChannels();
+ }
+
+ public void setMaximumNumberOfChannels(Long value)
+ {
+ _protocolSession.setMaximumNumberOfChannels(value);
+ }
+
+ public String getObjectInstanceName()
+ {
+ return ObjectName.quote(_name);
+ }
+
+ /**
+ * commits transactions for a transactional channel
+ *
+ * @param channelId
+ * @throws JMException if channel with given id doesn't exist or if commit fails
+ */
+ public void commitTransactions(int channelId) throws JMException
+ {
+ CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
+ try
+ {
+ AMQChannel channel = _protocolSession.getChannel(channelId);
+ if (channel == null)
+ {
+ throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
+ }
+
+ _protocolSession.commitTransactions(channel);
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+ }
+
+ /**
+ * rollsback the transactions for a transactional channel
+ *
+ * @param channelId
+ * @throws JMException if channel with given id doesn't exist or if rollback fails
+ */
+ public void rollbackTransactions(int channelId) throws JMException
+ {
+ CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
+ try
+ {
+ AMQChannel channel = _protocolSession.getChannel(channelId);
+ if (channel == null)
+ {
+ throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
+ }
+
+ _protocolSession.rollbackTransactions(channel);
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+ }
+
+ /**
+ * Creates the list of channels in tabular form from the _channelMap.
+ *
+ * @return list of channels in tabular form.
+ * @throws OpenDataException
+ */
+ public TabularData channels() throws OpenDataException
+ {
+ TabularDataSupport channelsList = new TabularDataSupport(_channelsType);
+ List<AMQChannel> list = _protocolSession.getChannels();
+
+ for (AMQChannel channel : list)
+ {
+ Object[] itemValues =
+ {
+ channel.getChannelId(), channel.isTransactional(),
+ (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getNameShortString().asString() : null,
+ channel.getUnacknowledgedMessageMap().size(), channel.getBlocking()
+ };
+
+ CompositeData channelData = new CompositeDataSupport(_channelType,
+ COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]), itemValues);
+ channelsList.put(channelData);
+ }
+
+ return channelsList;
+ }
+
+ /**
+ * closes the connection. The administrator can use this management operation to close connection to free up
+ * resources.
+ * @throws JMException
+ */
+ public void closeConnection() throws JMException
+ {
+
+ MethodRegistry methodRegistry = _protocolSession.getMethodRegistry();
+ ConnectionCloseBody responseBody =
+ methodRegistry.createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
+ // replyCode
+ BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION,
+ // replyText,
+ 0,
+ 0);
+
+ // This seems ugly but because we use closeConnection in both normal
+ // broker operation and as part of the management interface it cannot
+ // be avoided. The Current Actor will be null when this method is
+ // called via the Management interface. This is because we allow the
+ // Local API connection with JConsole. If we did not allow that option
+ // then the CurrentActor could be set in our JMX Proxy object.
+ // As it is we need to set the CurrentActor on all MBean methods
+ // Ideally we would not have a single method that can be called from
+ // two contexts.
+ boolean removeActor = false;
+ if (CurrentActor.get() == null)
+ {
+ removeActor = true;
+ CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
+ }
+
+ try
+ {
+ _protocolSession.writeFrame(responseBody.generateFrame(0));
+
+ try
+ {
+
+ _protocolSession.closeSession();
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+ finally
+ {
+ if (removeActor)
+ {
+ CurrentActor.remove();
+ }
+ }
+ }
+
+ @Override
+ public MBeanNotificationInfo[] getNotificationInfo()
+ {
+ String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
+ String name = MonitorNotification.class.getName();
+ String description = "Channel count has reached threshold value";
+ MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
+
+ return new MBeanNotificationInfo[] { info1 };
+ }
+
+ public void notifyClients(String notificationMsg)
+ {
+ Notification n =
+ new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
+ System.currentTimeMillis(), notificationMsg);
+ _broadcaster.sendNotification(n);
+ }
+
+ public void resetStatistics() throws Exception
+ {
+ _protocolSession.resetStatistics();
+ }
+
+ public double getPeakMessageDeliveryRate()
+ {
+ return _protocolSession.getMessageDeliveryStatistics().getPeak();
+ }
+
+ public double getPeakDataDeliveryRate()
+ {
+ return _protocolSession.getDataDeliveryStatistics().getPeak();
+ }
+
+ public double getMessageDeliveryRate()
+ {
+ return _protocolSession.getMessageDeliveryStatistics().getRate();
+ }
+
+ public double getDataDeliveryRate()
+ {
+ return _protocolSession.getDataDeliveryStatistics().getRate();
+ }
+
+ public long getTotalMessagesDelivered()
+ {
+ return _protocolSession.getMessageDeliveryStatistics().getTotal();
+ }
+
+ public long getTotalDataDelivered()
+ {
+ return _protocolSession.getDataDeliveryStatistics().getTotal();
+ }
+
+ public double getPeakMessageReceiptRate()
+ {
+ return _protocolSession.getMessageReceiptStatistics().getPeak();
+ }
+
+ public double getPeakDataReceiptRate()
+ {
+ return _protocolSession.getDataReceiptStatistics().getPeak();
+ }
+
+ public double getMessageReceiptRate()
+ {
+ return _protocolSession.getMessageReceiptStatistics().getRate();
+ }
+
+ public double getDataReceiptRate()
+ {
+ return _protocolSession.getDataReceiptStatistics().getRate();
+ }
+
+ public long getTotalMessagesReceived()
+ {
+ return _protocolSession.getMessageReceiptStatistics().getTotal();
+ }
+
+ public long getTotalDataReceived()
+ {
+ return _protocolSession.getDataReceiptStatistics().getTotal();
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _protocolSession.isStatisticsEnabled();
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _protocolSession.setStatisticsEnabled(enabled);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
new file mode 100644
index 0000000000..bc63403a86
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.LogSubject;
+
+public interface AMQSessionModel
+{
+ public Object getID();
+
+ public AMQConnectionModel getConnectionModel();
+
+ public String getClientID();
+
+ public void close() throws AMQException;
+
+ public LogSubject getLogSubject();
+
+ /**
+ * This method is called from the housekeeping thread to check the status of
+ * transactions on this session and react appropriately.
+ *
+ * If a transaction is open for too long or idle for too long then a warning
+ * is logged or the connection is closed, depending on the configuration. An open
+ * transaction is one that has recent activity. The transaction age is counted
+ * from the time the transaction was started. An idle transaction is one that
+ * has had no activity, such as publishing or acknowledgeing messages.
+ *
+ * @param openWarn time in milliseconds before alerting on open transaction
+ * @param openClose time in milliseconds before closing connection with open transaction
+ * @param idleWarn time in milliseconds before alerting on idle transaction
+ * @param idleClose time in milliseconds before closing connection with idle transaction
+ */
+ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException;
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
new file mode 100755
index 0000000000..eb957ee33c
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -0,0 +1,425 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.protocol;
+
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.transport.ServerConnection;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.NetworkDriver;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+public class MultiVersionProtocolEngine implements ProtocolEngine
+{
+ private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
+
+
+
+ private NetworkDriver _networkDriver;
+ private Set<VERSION> _supported;
+ private String _fqdn;
+ private IApplicationRegistry _appRegistry;
+
+ private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine();
+
+ public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
+ String fqdn,
+ Set<VERSION> supported, NetworkDriver networkDriver)
+ {
+ _appRegistry = appRegistry;
+ _fqdn = fqdn;
+ _supported = supported;
+ _networkDriver = networkDriver;
+ }
+
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _delegate.setNetworkDriver(driver);
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _delegate.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _delegate.getLocalAddress();
+ }
+
+ public long getWrittenBytes()
+ {
+ return _delegate.getWrittenBytes();
+ }
+
+ public long getReadBytes()
+ {
+ return _delegate.getReadBytes();
+ }
+
+ public void closed()
+ {
+ _delegate.closed();
+ }
+
+ public void writerIdle()
+ {
+ _delegate.writerIdle();
+ }
+
+ public void readerIdle()
+ {
+ _delegate.readerIdle();
+ }
+
+ public void received(ByteBuffer msg)
+ {
+ _delegate.received(msg);
+ }
+
+ public void exception(Throwable t)
+ {
+ _delegate.exception(t);
+ }
+
+ private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
+
+ private static final byte[] AMQP_0_8_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 1,
+ (byte) 1,
+ (byte) 8,
+ (byte) 0
+ };
+
+ private static final byte[] AMQP_0_9_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 1,
+ (byte) 1,
+ (byte) 0,
+ (byte) 9
+ };
+
+private static final byte[] AMQP_0_9_1_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 0,
+ (byte) 0,
+ (byte) 9,
+ (byte) 1
+ };
+
+
+ private static final byte[] AMQP_0_10_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 1,
+ (byte) 1,
+ (byte) 0,
+ (byte) 10
+ };
+
+ private static interface DelegateCreator
+ {
+ VERSION getVersion();
+ byte[] getHeaderIdentifier();
+ ProtocolEngine getProtocolEngine();
+ }
+
+ private DelegateCreator creator_0_8 = new DelegateCreator()
+ {
+
+ public VERSION getVersion()
+ {
+ return VERSION.v0_8;
+ }
+
+ public byte[] getHeaderIdentifier()
+ {
+ return AMQP_0_8_HEADER;
+ }
+
+ public ProtocolEngine getProtocolEngine()
+ {
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
+ }
+ };
+
+ private DelegateCreator creator_0_9 = new DelegateCreator()
+ {
+
+ public VERSION getVersion()
+ {
+ return VERSION.v0_9;
+ }
+
+
+ public byte[] getHeaderIdentifier()
+ {
+ return AMQP_0_9_HEADER;
+ }
+
+ public ProtocolEngine getProtocolEngine()
+ {
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
+ }
+ };
+
+ private DelegateCreator creator_0_9_1 = new DelegateCreator()
+ {
+
+ public VERSION getVersion()
+ {
+ return VERSION.v0_9_1;
+ }
+
+
+ public byte[] getHeaderIdentifier()
+ {
+ return AMQP_0_9_1_HEADER;
+ }
+
+ public ProtocolEngine getProtocolEngine()
+ {
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
+ }
+ };
+
+
+ private DelegateCreator creator_0_10 = new DelegateCreator()
+ {
+
+ public VERSION getVersion()
+ {
+ return VERSION.v0_10;
+ }
+
+
+ public byte[] getHeaderIdentifier()
+ {
+ return AMQP_0_10_HEADER;
+ }
+
+ public ProtocolEngine getProtocolEngine()
+ {
+ final ConnectionDelegate connDelegate =
+ new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn);
+
+ ServerConnection conn = new ServerConnection();
+ conn.setConnectionDelegate(connDelegate);
+
+ return new ProtocolEngine_0_10( conn, _networkDriver, _appRegistry);
+ }
+ };
+
+ private final DelegateCreator[] _creators =
+ new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 };
+
+
+ private class ClosedDelegateProtocolEngine implements ProtocolEngine
+ {
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
+ }
+
+ public long getWrittenBytes()
+ {
+ return 0;
+ }
+
+ public long getReadBytes()
+ {
+ return 0;
+ }
+
+ public void received(ByteBuffer msg)
+ {
+ _logger.error("Error processing incoming data, could not negotiate a common protocol");
+ }
+
+ public void exception(Throwable t)
+ {
+ _logger.error("Error establishing session", t);
+ }
+
+ public void closed()
+ {
+
+ }
+
+ public void writerIdle()
+ {
+
+ }
+
+ public void readerIdle()
+ {
+
+ }
+ }
+
+ private class SelfDelegateProtocolEngine implements ProtocolEngine
+ {
+
+ private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
+
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
+ }
+
+ public long getWrittenBytes()
+ {
+ return 0;
+ }
+
+ public long getReadBytes()
+ {
+ return 0;
+ }
+
+ public void received(ByteBuffer msg)
+ {
+ ByteBuffer msgheader = msg.duplicate();
+ if(_header.remaining() > msgheader.limit())
+ {
+ msg.position(msg.limit());
+ }
+ else
+ {
+ msgheader.limit(_header.remaining());
+ msg.position(_header.remaining());
+ }
+
+ _header.put(msgheader);
+
+ if(!_header.hasRemaining())
+ {
+ _header.flip();
+ byte[] headerBytes = new byte[MINIMUM_REQUIRED_HEADER_BYTES];
+ _header.get(headerBytes);
+
+
+ ProtocolEngine newDelegate = null;
+ byte[] newestSupported = null;
+
+ for(int i = 0; newDelegate == null && i < _creators.length; i++)
+ {
+
+ if(_supported.contains(_creators[i].getVersion()))
+ {
+ newestSupported = _creators[i].getHeaderIdentifier();
+ byte[] compareBytes = _creators[i].getHeaderIdentifier();
+ boolean equal = true;
+ for(int j = 0; equal && j<compareBytes.length; j++)
+ {
+ equal = headerBytes[j] == compareBytes[j];
+ }
+ if(equal)
+ {
+ newDelegate = _creators[i].getProtocolEngine();
+ }
+ }
+ }
+
+ // If no delegate is found then send back the most recent support protocol version id
+ if(newDelegate == null)
+ {
+ _networkDriver.send(ByteBuffer.wrap(newestSupported));
+
+ _delegate = new ClosedDelegateProtocolEngine();
+ }
+ else
+ {
+ newDelegate.setNetworkDriver(_networkDriver);
+
+ _delegate = newDelegate;
+
+ _header.flip();
+ _delegate.received(_header);
+ if(msg.hasRemaining())
+ {
+ _delegate.received(msg);
+ }
+ }
+
+ }
+
+ }
+
+ public void exception(Throwable t)
+ {
+ _logger.error("Error establishing session", t);
+ }
+
+ public void closed()
+ {
+
+ }
+
+ public void writerIdle()
+ {
+
+ }
+
+ public void readerIdle()
+ {
+
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
new file mode 100755
index 0000000000..75358c42d9
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
@@ -0,0 +1,75 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+
+import java.util.Set;
+import java.util.Arrays;
+import java.util.HashSet;
+
+public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
+{
+ ;
+
+
+ public enum VERSION { v0_8, v0_9, v0_9_1, v0_10 };
+
+ private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values()));
+
+ private final IApplicationRegistry _appRegistry;
+ private final String _fqdn;
+ private final Set<VERSION> _supported;
+
+
+ public MultiVersionProtocolEngineFactory()
+ {
+ this(1, "localhost", ALL_VERSIONS);
+ }
+
+ public MultiVersionProtocolEngineFactory(String fqdn, Set<VERSION> versions)
+ {
+ this(1, fqdn, versions);
+ }
+
+
+ public MultiVersionProtocolEngineFactory(String fqdn)
+ {
+ this(1, fqdn, ALL_VERSIONS);
+ }
+
+ public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<VERSION> supportedVersions)
+ {
+ _appRegistry = ApplicationRegistry.getInstance(instance);
+ _fqdn = fqdn;
+ _supported = supportedVersions;
+ }
+
+
+ public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
+ {
+ return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
new file mode 100755
index 0000000000..30d506a89b
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
@@ -0,0 +1,201 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.server.configuration.*;
+import org.apache.qpid.server.transport.ServerConnection;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+
+import java.net.SocketAddress;
+import java.util.UUID;
+
+public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine, ConnectionConfig
+{
+ public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
+
+ private NetworkDriver _networkDriver;
+ private long _readBytes;
+ private long _writtenBytes;
+ private ServerConnection _connection;
+ private final UUID _id;
+ private final IApplicationRegistry _appRegistry;
+ private long _createTime = System.currentTimeMillis();
+
+ public ProtocolEngine_0_10(ServerConnection conn,
+ NetworkDriver networkDriver,
+ final IApplicationRegistry appRegistry)
+ {
+ super(new Assembler(conn));
+ _connection = conn;
+ _connection.setConnectionConfig(this);
+ _networkDriver = networkDriver;
+ _id = appRegistry.getConfigStore().createId();
+ _appRegistry = appRegistry;
+
+ // FIXME Two log messages to maintain compatinbility with earlier protocol versions
+ _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
+ _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
+ }
+
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ Disassembler dis = new Disassembler(driver, MAX_FRAME_SIZE);
+ _connection.setSender(dis);
+ _connection.onOpen(new Runnable()
+ {
+ public void run()
+ {
+ getConfigStore().addConfiguredObject(ProtocolEngine_0_10.this);
+ }
+ });
+
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
+ }
+
+ public long getReadBytes()
+ {
+ return _readBytes;
+ }
+
+ public long getWrittenBytes()
+ {
+ return _writtenBytes;
+ }
+
+ public void writerIdle()
+ {
+ //Todo
+ }
+
+ public void readerIdle()
+ {
+ //Todo
+ }
+
+ public VirtualHostConfig getVirtualHost()
+ {
+ return _connection.getVirtualHost();
+ }
+
+ public String getAddress()
+ {
+ return getRemoteAddress().toString();
+ }
+
+ public Boolean isIncoming()
+ {
+ return true;
+ }
+
+ public Boolean isSystemConnection()
+ {
+ return false;
+ }
+
+ public Boolean isFederationLink()
+ {
+ return false;
+ }
+
+ public String getAuthId()
+ {
+ return _connection.getAuthorizationID();
+ }
+
+ public String getRemoteProcessName()
+ {
+ return null;
+ }
+
+ public Integer getRemotePID()
+ {
+ return null;
+ }
+
+ public Integer getRemoteParentPID()
+ {
+ return null;
+ }
+
+ public ConfigStore getConfigStore()
+ {
+ return _appRegistry.getConfigStore();
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public ConnectionConfigType getConfigType()
+ {
+ return ConnectionConfigType.getInstance();
+ }
+
+ public ConfiguredObject getParent()
+ {
+ return getVirtualHost();
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ @Override
+ public void closed()
+ {
+ super.closed();
+ getConfigStore().removeConfiguredObject(this);
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
+ public Boolean isShadow()
+ {
+ return false;
+ }
+
+ public void mgmtClose()
+ {
+ _connection.mgmtClose();
+ }
+}