summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java477
1 files changed, 0 insertions, 477 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
deleted file mode 100644
index 7976760696..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ /dev/null
@@ -1,477 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.JMSException;
-import javax.security.sasl.SaslClient;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.ConnectionTuneParameters;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.UnprocessedMessage_0_8;
-import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
-
-/**
- * Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
- * session is still available but clients should not use it to obtain session attributes.
- */
-public class AMQProtocolSession implements AMQVersionAwareProtocolSession
-{
- protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
-
- protected static final Logger _logger = LoggerFactory.getLogger(AMQProtocolSession.class);
-
- public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived";
-
- //Usable channels are numbered 1 to <ChannelMax>
- public static final int MAX_CHANNEL_MAX = 0xFFFF;
- public static final int MIN_USABLE_CHANNEL_NUM = 1;
-
- protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters";
-
- protected static final String AMQ_CONNECTION = "AMQConnection";
-
- protected static final String SASL_CLIENT = "SASLClient";
-
- /**
- * The handler from which this session was created and which is used to handle protocol events. We send failover
- * events to the handler.
- */
- protected final AMQProtocolHandler _protocolHandler;
-
- /** Maps from the channel id to the AMQSession that it represents. */
- protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
-
- protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
-
- /**
- * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
- * first) with the subsequent content header and content bodies.
- */
- private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer, UnprocessedMessage>();
- private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
-
- /** Counter to ensure unique queue names */
- protected int _queueId = 1;
- protected final Object _queueIdLock = new Object();
-
- private ProtocolVersion _protocolVersion;
-// private VersionSpecificRegistry _registry =
-// MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
-
- private MethodRegistry _methodRegistry =
- MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion());
-
- private MethodDispatcher _methodDispatcher;
-
- protected final AMQConnection _connection;
-
- private ConnectionTuneParameters _connectionTuneParameters;
-
- private SaslClient _saslClient;
-
- private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
-
- public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
- {
- _protocolHandler = protocolHandler;
- _protocolVersion = connection.getProtocolVersion();
- _logger.info("Using ProtocolVersion for Session:" + _protocolVersion);
- _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
- this);
- _connection = connection;
- }
-
- public void init()
- {
- // start the process of setting up the connection. This is the first place that
- // data is written to the server.
- _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion()));
- }
-
- public String getClientID()
- {
- try
- {
- return getAMQConnection().getClientID();
- }
- catch (JMSException e)
- {
- // we never throw a JMSException here
- return null;
- }
- }
-
- public void setClientID(String clientID) throws JMSException
- {
- getAMQConnection().setClientID(clientID);
- }
-
- public AMQStateManager getStateManager()
- {
- return _protocolHandler.getStateManager();
- }
-
- public String getVirtualHost()
- {
- return getAMQConnection().getVirtualHost();
- }
-
- public String getUsername()
- {
- return getAMQConnection().getUsername();
- }
-
- public String getPassword()
- {
- return getAMQConnection().getPassword();
- }
-
- public SaslClient getSaslClient()
- {
- return _saslClient;
- }
-
- /**
- * Store the SASL client currently being used for the authentication handshake
- *
- * @param client if non-null, stores this in the session. if null clears any existing client being stored
- */
- public void setSaslClient(SaslClient client)
- {
- _saslClient = client;
- }
-
- public ConnectionTuneParameters getConnectionTuneParameters()
- {
- return _connectionTuneParameters;
- }
-
- public void setConnectionTuneParameters(ConnectionTuneParameters params)
- {
- _connectionTuneParameters = params;
- AMQConnection con = getAMQConnection();
-
- con.setMaximumChannelCount(params.getChannelMax());
- con.setMaximumFrameSize(params.getFrameMax());
- _protocolHandler.initHeartbeats((int) params.getHeartbeat());
- }
-
- /**
- * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA
- * dispatcher thread.
- *
- * @param message
- *
- * @throws AMQException if this was not expected
- */
- public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException
- {
- if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- _channelId2UnprocessedMsgArray[channelId] = message;
- }
- else
- {
- _channelId2UnprocessedMsgMap.put(channelId, message);
- }
- }
-
- public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
- {
- final UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
- : _channelId2UnprocessedMsgMap.get(channelId));
-
- if (msg == null)
- {
- throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first on session:" + this, null);
- }
-
- if (msg.getContentHeader() != null)
- {
- throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames on session:" + this, null);
- }
-
- msg.setContentHeader(contentHeader);
- if (contentHeader.bodySize == 0)
- {
- deliverMessageToAMQSession(channelId, msg);
- }
- }
-
- public void contentBodyReceived(final int channelId, ContentBody contentBody) throws AMQException
- {
- UnprocessedMessage_0_8 msg;
- final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0;
- if (fastAccess)
- {
- msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgArray[channelId];
- }
- else
- {
- msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgMap.get(channelId);
- }
-
- if (msg == null)
- {
- throw new AMQException(null, "Error: received content body without having received a JMSDeliver frame first", null);
- }
-
- if (msg.getContentHeader() == null)
- {
- if (fastAccess)
- {
- _channelId2UnprocessedMsgArray[channelId] = null;
- }
- else
- {
- _channelId2UnprocessedMsgMap.remove(channelId);
- }
- throw new AMQException(null, "Error: received content body without having received a ContentHeader frame first", null);
- }
-
- msg.receiveBody(contentBody);
-
- if (msg.isAllBodyDataReceived())
- {
- deliverMessageToAMQSession(channelId, msg);
- }
- }
-
- public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
- {
-
- }
-
- /**
- * Deliver a message to the appropriate session, removing the unprocessed message from our map
- *
- * @param channelId the channel id the message should be delivered to
- * @param msg the message
- */
- private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
- {
- AMQSession session = getSession(channelId);
- session.messageReceived(msg);
- if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- _channelId2UnprocessedMsgArray[channelId] = null;
- }
- else
- {
- _channelId2UnprocessedMsgMap.remove(channelId);
- }
- }
-
- protected AMQSession getSession(int channelId)
- {
- return _connection.getSession(channelId);
- }
-
- /**
- * Convenience method that writes a frame to the protocol session. Equivalent to calling
- * getProtocolSession().write().
- *
- * @param frame the frame to write
- */
- public void writeFrame(AMQDataBlock frame)
- {
- _protocolHandler.writeFrame(frame);
- }
-
- public void writeFrame(AMQDataBlock frame, boolean wait)
- {
- _protocolHandler.writeFrame(frame, wait);
- }
-
- /**
- * Starts the process of closing a session
- *
- * @param session the AMQSession being closed
- */
- public void closeSession(AMQSession session)
- {
- _logger.debug("closeSession called on protocol session for session " + session.getChannelId());
- final int channelId = session.getChannelId();
- if (channelId <= 0)
- {
- throw new IllegalArgumentException("Attempt to close a channel with id < 0");
- }
- // we need to know when a channel is closing so that we can respond
- // with a channel.close frame when we receive any other type of frame
- // on that channel
- _closingChannels.putIfAbsent(channelId, session);
- }
-
- /**
- * Called from the ChannelClose handler when a channel close frame is received. This method decides whether this is
- * a response or an initiation. The latter case causes the AMQSession to be closed and an exception to be thrown if
- * appropriate.
- *
- * @param channelId the id of the channel (session)
- *
- * @return true if the client must respond to the server, i.e. if the server initiated the channel close, false if
- * the channel close is just the server responding to the client's earlier request to close the channel.
- */
- public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException
- {
-
- // if this is not a response to an earlier request to close the channel
- if (_closingChannels.remove(channelId) == null)
- {
- final AMQSession session = getSession(channelId);
- try
- {
- session.closed(new AMQException(code, text, null));
- }
- catch (JMSException e)
- {
- throw new AMQException(null, "JMSException received while closing session", e);
- }
-
- return true;
- }
- else
- {
- return false;
- }
- }
-
- public AMQConnection getAMQConnection()
- {
- return _connection;
- }
-
- public void closeProtocolSession() throws AMQException
- {
- _protocolHandler.closeConnection(0);
- }
-
- public void failover(String host, int port)
- {
- _protocolHandler.failover(host, port);
- }
-
- protected AMQShortString generateQueueName()
- {
- int id;
- synchronized (_queueIdLock)
- {
- id = _queueId++;
- }
- // convert '.', '/', ':' and ';' to single '_', for spec compliance and readability
- String localAddress = _protocolHandler.getLocalAddress().toString().replaceAll("[./:;]", "_");
- String queueName = "tmp_" + localAddress + "_" + id;
- return new AMQShortString(queueName.replaceAll("_+", "_"));
- }
-
- public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
- {
- final AMQSession session = getSession(channelId);
-
- session.confirmConsumerCancelled(consumerTag.toIntValue());
- }
-
- public void setProtocolVersion(final ProtocolVersion pv)
- {
- _logger.info("Setting ProtocolVersion to :" + pv);
- _protocolVersion = pv;
- _methodRegistry = MethodRegistry.getMethodRegistry(pv);
- _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this);
- }
-
- public byte getProtocolMinorVersion()
- {
- return _protocolVersion.getMinorVersion();
- }
-
- public byte getProtocolMajorVersion()
- {
- return _protocolVersion.getMajorVersion();
- }
-
- public ProtocolVersion getProtocolVersion()
- {
- return _protocolVersion;
- }
-
- public MethodRegistry getMethodRegistry()
- {
- return _methodRegistry;
- }
-
- public MethodDispatcher getMethodDispatcher()
- {
- return _methodDispatcher;
- }
-
- public void setTicket(int ticket, int channelId)
- {
- final AMQSession session = getSession(channelId);
- session.setTicket(ticket);
- }
-
- public void setMethodDispatcher(MethodDispatcher methodDispatcher)
- {
- _methodDispatcher = methodDispatcher;
- }
-
- public void setFlowControl(final int channelId, final boolean active)
- {
- final AMQSession session = getSession(channelId);
- session.setFlowControl(active);
- }
-
- public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
- {
- _protocolHandler.methodBodyReceived(channel, amqMethodBody);
- }
-
- public void notifyError(Exception error)
- {
- _protocolHandler.propagateExceptionToAllWaiters(error);
- }
-
- public void setSender(Sender<java.nio.ByteBuffer> sender)
- {
- // No-op, interface munging
- }
-
-
- @Override
- public String toString()
- {
- return "AMQProtocolSession[" + _connection + ']';
- }
-}