summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
commit66765100f4257159622cefe57bed50125a5ad017 (patch)
treea88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
parent1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff)
parent88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff)
downloadqpid-python-66765100f4257159622cefe57bed50125a5ad017.tar.gz
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java467
1 files changed, 467 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
new file mode 100644
index 0000000000..5b7d272506
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -0,0 +1,467 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.protocol;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.security.sasl.SaslClient;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.UnprocessedMessage_0_8;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
+
+/**
+ * Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
+ * session is still available but clients should not use it to obtain session attributes.
+ */
+public class AMQProtocolSession implements AMQVersionAwareProtocolSession
+{
+ protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
+
+ protected static final Logger _logger = LoggerFactory.getLogger(AMQProtocolSession.class);
+
+ public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived";
+
+ //Usable channels are numbered 1 to <ChannelMax>
+ public static final int MAX_CHANNEL_MAX = 0xFFFF;
+ public static final int MIN_USABLE_CHANNEL_NUM = 1;
+
+ protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters";
+
+ protected static final String AMQ_CONNECTION = "AMQConnection";
+
+ protected static final String SASL_CLIENT = "SASLClient";
+
+ /**
+ * The handler from which this session was created and which is used to handle protocol events. We send failover
+ * events to the handler.
+ */
+ protected final AMQProtocolHandler _protocolHandler;
+
+ /** Maps from the channel id to the AMQSession that it represents. */
+ protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
+
+ protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
+
+ /**
+ * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
+ * first) with the subsequent content header and content bodies.
+ */
+ private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer, UnprocessedMessage>();
+ private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
+
+ /** Counter to ensure unique queue names */
+ protected int _queueId = 1;
+ protected final Object _queueIdLock = new Object();
+
+ private ProtocolVersion _protocolVersion;
+// private VersionSpecificRegistry _registry =
+// MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
+
+ private MethodRegistry _methodRegistry =
+ MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion());
+
+ private MethodDispatcher _methodDispatcher;
+
+ protected final AMQConnection _connection;
+
+ private ConnectionTuneParameters _connectionTuneParameters;
+
+ private SaslClient _saslClient;
+
+ private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+
+ public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
+ {
+ _protocolHandler = protocolHandler;
+ _protocolVersion = connection.getProtocolVersion();
+ _logger.info("Using ProtocolVersion for Session:" + _protocolVersion);
+ _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
+ this);
+ _connection = connection;
+ }
+
+ public void init()
+ {
+ // start the process of setting up the connection. This is the first place that
+ // data is written to the server.
+ _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion()));
+ }
+
+ public String getClientID()
+ {
+ try
+ {
+ return getAMQConnection().getClientID();
+ }
+ catch (JMSException e)
+ {
+ // we never throw a JMSException here
+ return null;
+ }
+ }
+
+ public void setClientID(String clientID) throws JMSException
+ {
+ getAMQConnection().setClientID(clientID);
+ }
+
+ public AMQStateManager getStateManager()
+ {
+ return _protocolHandler.getStateManager();
+ }
+
+ public String getVirtualHost()
+ {
+ return getAMQConnection().getVirtualHost();
+ }
+
+ public SaslClient getSaslClient()
+ {
+ return _saslClient;
+ }
+
+ /**
+ * Store the SASL client currently being used for the authentication handshake
+ *
+ * @param client if non-null, stores this in the session. if null clears any existing client being stored
+ */
+ public void setSaslClient(SaslClient client)
+ {
+ _saslClient = client;
+ }
+
+ public ConnectionTuneParameters getConnectionTuneParameters()
+ {
+ return _connectionTuneParameters;
+ }
+
+ public void setConnectionTuneParameters(ConnectionTuneParameters params)
+ {
+ _connectionTuneParameters = params;
+ AMQConnection con = getAMQConnection();
+
+ con.setMaximumChannelCount(params.getChannelMax());
+ con.setMaximumFrameSize(params.getFrameMax());
+ _protocolHandler.initHeartbeats((int) params.getHeartbeat());
+ }
+
+ /**
+ * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA
+ * dispatcher thread.
+ *
+ * @param message
+ *
+ * @throws AMQException if this was not expected
+ */
+ public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException
+ {
+ if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ _channelId2UnprocessedMsgArray[channelId] = message;
+ }
+ else
+ {
+ _channelId2UnprocessedMsgMap.put(channelId, message);
+ }
+ }
+
+ public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
+ {
+ final UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
+ : _channelId2UnprocessedMsgMap.get(channelId));
+
+ if (msg == null)
+ {
+ throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first on session:" + this, null);
+ }
+
+ if (msg.getContentHeader() != null)
+ {
+ throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames on session:" + this, null);
+ }
+
+ msg.setContentHeader(contentHeader);
+ if (contentHeader.bodySize == 0)
+ {
+ deliverMessageToAMQSession(channelId, msg);
+ }
+ }
+
+ public void contentBodyReceived(final int channelId, ContentBody contentBody) throws AMQException
+ {
+ UnprocessedMessage_0_8 msg;
+ final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0;
+ if (fastAccess)
+ {
+ msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgArray[channelId];
+ }
+ else
+ {
+ msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgMap.get(channelId);
+ }
+
+ if (msg == null)
+ {
+ throw new AMQException(null, "Error: received content body without having received a JMSDeliver frame first", null);
+ }
+
+ if (msg.getContentHeader() == null)
+ {
+ if (fastAccess)
+ {
+ _channelId2UnprocessedMsgArray[channelId] = null;
+ }
+ else
+ {
+ _channelId2UnprocessedMsgMap.remove(channelId);
+ }
+ throw new AMQException(null, "Error: received content body without having received a ContentHeader frame first", null);
+ }
+
+ msg.receiveBody(contentBody);
+
+ if (msg.isAllBodyDataReceived())
+ {
+ deliverMessageToAMQSession(channelId, msg);
+ }
+ }
+
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
+ {
+
+ }
+
+ /**
+ * Deliver a message to the appropriate session, removing the unprocessed message from our map
+ *
+ * @param channelId the channel id the message should be delivered to
+ * @param msg the message
+ */
+ private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
+ {
+ AMQSession session = getSession(channelId);
+ session.messageReceived(msg);
+ if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ _channelId2UnprocessedMsgArray[channelId] = null;
+ }
+ else
+ {
+ _channelId2UnprocessedMsgMap.remove(channelId);
+ }
+ }
+
+ protected AMQSession getSession(int channelId)
+ {
+ return _connection.getSession(channelId);
+ }
+
+ /**
+ * Convenience method that writes a frame to the protocol session. Equivalent to calling
+ * getProtocolSession().write().
+ *
+ * @param frame the frame to write
+ */
+ public void writeFrame(AMQDataBlock frame)
+ {
+ _protocolHandler.writeFrame(frame);
+ }
+
+ public void writeFrame(AMQDataBlock frame, boolean wait)
+ {
+ _protocolHandler.writeFrame(frame, wait);
+ }
+
+ /**
+ * Starts the process of closing a session
+ *
+ * @param session the AMQSession being closed
+ */
+ public void closeSession(AMQSession session)
+ {
+ _logger.debug("closeSession called on protocol session for session " + session.getChannelId());
+ final int channelId = session.getChannelId();
+ if (channelId <= 0)
+ {
+ throw new IllegalArgumentException("Attempt to close a channel with id < 0");
+ }
+ // we need to know when a channel is closing so that we can respond
+ // with a channel.close frame when we receive any other type of frame
+ // on that channel
+ _closingChannels.putIfAbsent(channelId, session);
+ }
+
+ /**
+ * Called from the ChannelClose handler when a channel close frame is received. This method decides whether this is
+ * a response or an initiation. The latter case causes the AMQSession to be closed and an exception to be thrown if
+ * appropriate.
+ *
+ * @param channelId the id of the channel (session)
+ *
+ * @return true if the client must respond to the server, i.e. if the server initiated the channel close, false if
+ * the channel close is just the server responding to the client's earlier request to close the channel.
+ */
+ public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException
+ {
+
+ // if this is not a response to an earlier request to close the channel
+ if (_closingChannels.remove(channelId) == null)
+ {
+ final AMQSession session = getSession(channelId);
+ try
+ {
+ session.closed(new AMQException(code, text, null));
+ }
+ catch (JMSException e)
+ {
+ throw new AMQException(null, "JMSException received while closing session", e);
+ }
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public AMQConnection getAMQConnection()
+ {
+ return _connection;
+ }
+
+ public void closeProtocolSession() throws AMQException
+ {
+ _protocolHandler.closeConnection(0);
+ }
+
+ public void failover(String host, int port)
+ {
+ _protocolHandler.failover(host, port);
+ }
+
+ protected AMQShortString generateQueueName()
+ {
+ int id;
+ synchronized (_queueIdLock)
+ {
+ id = _queueId++;
+ }
+ // convert '.', '/', ':' and ';' to single '_', for spec compliance and readability
+ String localAddress = _protocolHandler.getLocalAddress().toString().replaceAll("[./:;]", "_");
+ String queueName = "tmp_" + localAddress + "_" + id;
+ return new AMQShortString(queueName.replaceAll("_+", "_"));
+ }
+
+ public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
+ {
+ final AMQSession session = getSession(channelId);
+
+ session.confirmConsumerCancelled(consumerTag.toIntValue());
+ }
+
+ public void setProtocolVersion(final ProtocolVersion pv)
+ {
+ _logger.info("Setting ProtocolVersion to :" + pv);
+ _protocolVersion = pv;
+ _methodRegistry = MethodRegistry.getMethodRegistry(pv);
+ _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this);
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return _protocolVersion.getMinorVersion();
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return _protocolVersion.getMajorVersion();
+ }
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return _protocolVersion;
+ }
+
+ public MethodRegistry getMethodRegistry()
+ {
+ return _methodRegistry;
+ }
+
+ public MethodDispatcher getMethodDispatcher()
+ {
+ return _methodDispatcher;
+ }
+
+ public void setTicket(int ticket, int channelId)
+ {
+ final AMQSession session = getSession(channelId);
+ session.setTicket(ticket);
+ }
+
+ public void setMethodDispatcher(MethodDispatcher methodDispatcher)
+ {
+ _methodDispatcher = methodDispatcher;
+ }
+
+ public void setFlowControl(final int channelId, final boolean active)
+ {
+ final AMQSession session = getSession(channelId);
+ session.setFlowControl(active);
+ }
+
+ public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
+ {
+ _protocolHandler.methodBodyReceived(channel, amqMethodBody);
+ }
+
+ public void notifyError(Exception error)
+ {
+ _protocolHandler.propagateExceptionToAllWaiters(error);
+ }
+
+ public void setSender(Sender<java.nio.ByteBuffer> sender)
+ {
+ // No-op, interface munging
+ }
+
+
+ @Override
+ public String toString()
+ {
+ return "AMQProtocolSession[" + _connection + ']';
+ }
+}