diff options
4 files changed, 426 insertions, 231 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index c001e97ec7..3b330c9725 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -20,34 +20,17 @@ */ package org.apache.qpid.client; -import org.apache.qpid.AMQConnectionFailureException; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQUndeliveredException; -import org.apache.qpid.AMQUnresolvedAddressException; -import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.client.failover.FailoverProtectedOperation; -import org.apache.qpid.client.failover.FailoverRetrySupport; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicQosBody; -import org.apache.qpid.framing.BasicQosOkBody; -import org.apache.qpid.framing.ChannelOpenBody; -import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.framing.TxSelectBody; -import org.apache.qpid.framing.TxSelectOkBody; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.jms.ChannelLimitReachedException; -import org.apache.qpid.jms.Connection; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.jms.FailoverPolicy; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.url.URLSyntaxException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.ConnectException; +import java.nio.channels.UnresolvedAddressException; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; @@ -66,103 +49,111 @@ import javax.naming.NamingException; import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; -import java.io.IOException; -import java.net.ConnectException; -import java.nio.channels.UnresolvedAddressException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.qpid.AMQConnectionFailureException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQUndeliveredException; +import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.Connection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.jms.FailoverPolicy; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpidity.url.QpidURL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); - private AtomicInteger _idFactory = new AtomicInteger(0); + protected AtomicInteger _idFactory = new AtomicInteger(0); /** * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be * held by any child objects of this connection such as the session, producers and consumers. */ - private final Object _failoverMutex = new Object(); + protected final Object _failoverMutex = new Object(); /** * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session * and we must prevent the client from opening too many. Zero means unlimited. */ - private long _maximumChannelCount; + protected long _maximumChannelCount; /** The maximum size of frame supported by the server */ - private long _maximumFrameSize; + protected long _maximumFrameSize; /** * The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate * handler. */ - private AMQProtocolHandler _protocolHandler; + protected AMQProtocolHandler _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ - private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>(); + protected final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>(); - private String _clientName; + protected String _clientName; /** The user name to use for authentication */ - private String _username; + protected String _username; /** The password to use for authentication */ - private String _password; + protected String _password; /** The virtual path to connect to on the AMQ server */ - private String _virtualHost; + protected String _virtualHost; - private ExceptionListener _exceptionListener; + protected ExceptionListener _exceptionListener; - private ConnectionListener _connectionListener; + protected ConnectionListener _connectionListener; - private ConnectionURL _connectionURL; + protected ConnectionURL _connectionURL; /** * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message * publication. */ - private boolean _started; + protected boolean _started; /** Policy dictating how to failover */ - private FailoverPolicy _failoverPolicy; + protected FailoverPolicy _failoverPolicy; /* * _Connected should be refactored with a suitable wait object. */ - private boolean _connected; + protected boolean _connected; /* * The last error code that occured on the connection. Used to return the correct exception to the client */ - private AMQException _lastAMQException = null; + protected AMQException _lastAMQException = null; /* * The connection meta data */ - private QpidConnectionMetaData _connectionMetaData; + protected QpidConnectionMetaData _connectionMetaData; /** Configuration info for SSL */ - private SSLConfiguration _sslConfiguration; + protected SSLConfiguration _sslConfiguration; - private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; - private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; - private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; - private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + protected AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; + protected AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + protected AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; + protected AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ - private final ExecutorService _taskPool = Executors.newCachedThreadPool(); - private static final long DEFAULT_TIMEOUT = 1000 * 30; + protected final ExecutorService _taskPool = Executors.newCachedThreadPool(); + protected static final long DEFAULT_TIMEOUT = 1000 * 30; + + private AMQConnectionDelegate _delegate; /** * @param broker brokerdetails @@ -237,12 +228,27 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect this(new AMQConnectionURL(connection), sslConfig); } + // 0-10 stuff + public AMQConnection(QpidURL connectionURL) throws AMQException + { + + } + /** * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon. */ public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { + if (Boolean.getBoolean("0-10")) + { + _delegate = new AMQConnectionDelegate_0_10(this); + } + else + { + _delegate = new AMQConnectionDelegate_0_8(this); + } + if (_logger.isInfoEnabled()) { _logger.info("Connection:" + connectionURL); @@ -366,18 +372,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _connectionMetaData = new QpidConnectionMetaData(this); } - protected boolean checkException(Throwable thrown) - { - Throwable cause = thrown.getCause(); - - if (cause == null) - { - cause = thrown; - } - - return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException)); - } - protected AMQConnection(String username, String password, String clientName, String virtualHost) { _clientName = clientName; @@ -396,26 +390,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _virtualHost = virtualHost; } - private void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException + protected boolean checkException(Throwable thrown) { - try - { - TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail); - // this blocks until the connection has been set up or when an error - // has prevented the connection being set up - _protocolHandler.attainState(AMQState.CONNECTION_OPEN); - _failoverPolicy.attainedConnection(); - - // Again this should be changed to a suitable notify - _connected = true; - } - catch (AMQException e) + Throwable cause = thrown.getCause(); + + if (cause == null) { - _lastAMQException = e; - throw e; + cause = thrown; } + + return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException)); } + + public boolean attemptReconnection(String host, int port) { BrokerDetails bd = new AMQBrokerDetails(host, port, _sslConfiguration); @@ -475,6 +463,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return false; } + public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException + { + _delegate.makeBrokerConnection(brokerDetail); + } + /** * Get the details of the currently active broker * @@ -512,114 +505,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException { - checkNotClosed(); - if (channelLimitReached()) - { - throw new ChannelLimitReachedException(_maximumChannelCount); - } - - return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>( - new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>() - { - public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException - { - int channelId = _idFactory.incrementAndGet(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Write channel open frame for channel id " + channelId); - } - - // We must create the session and register it before actually sending the frame to the server to - // open it, so that there is no window where we could receive data on the channel and not be set - // up to handle it appropriately. - AMQSession session = - new AMQSession_0_8(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, - prefetchLow); - // _protocolHandler.addSessionByChannel(channelId, session); - registerSession(channelId, session); - - boolean success = false; - try - { - createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); - success = true; - } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error creating session: " + e); - jmse.setLinkedException(e); - throw jmse; - } - finally - { - if (!success) - { - deregisterSession(channelId); - } - } - - if (_started) - { - try - { - session.start(); - } - catch (AMQException e) - { - throw new JMSAMQException(e); - } - } - - return session; - } - }, this).execute(); - } - - private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) - throws AMQException, FailoverException - { - - // TODO: Be aware of possible changes to parameter order as versions change. - - _protocolHandler.syncWrite(ChannelOpenBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), null), // outOfBand - ChannelOpenOkBody.class); - - // todo send low water mark when protocol allows. - // todo Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(BasicQosBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), false, // global - prefetchHigh, // prefetchCount - 0), // prefetchSize - BasicQosOkBody.class); - - if (transacted) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Issuing TxSelect for " + channelId); - } - - // TODO: Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion()), TxSelectOkBody.class); - } - } - - private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) - throws AMQException, FailoverException - { - try - { - createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); - } - catch (AMQException e) - { - deregisterSession(channelId); - throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e); - } + return _delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow); } public void setFailoverPolicy(FailoverPolicy policy) @@ -664,7 +551,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return new AMQTopicSessionAdaptor(createSession(transacted, acknowledgeMode)); } - private boolean channelLimitReached() + public boolean channelLimitReached() { return (_maximumChannelCount != 0) && (_sessions.size() == _maximumChannelCount); } @@ -805,18 +692,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - private long adjustTimeout(long timeout, long startTime) - { - long now = System.currentTimeMillis(); - timeout -= now - startTime; - if (timeout < 0) - { - timeout = 0; - } - - return timeout; - } - /** * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to * mark objects "visible" in userland as closed after failover or other significant event that impacts the @@ -880,6 +755,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } + + private long adjustTimeout(long timeout, long startTime) + { + long now = System.currentTimeMillis(); + timeout -= now - startTime; + if (timeout < 0) + { + timeout = 0; + } + + return timeout; + } + public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { @@ -1055,6 +943,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _failoverMutex; } + public void resubscribeSessions() throws JMSException, AMQException, FailoverException + { + _delegate.resubscribeSessions(); + } + /** * If failover is taking place this will block until it has completed. If failover is not taking place it will * return immediately. @@ -1068,7 +961,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception - * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will + * to a JMS exception liste + { + ArrayList sessions = new ArrayList(_sessions.values()); + _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey? + for (Iterator it = sessions.iterator(); it.hasNext();) + { + AMQSession s = (AMQSession) it.next(); + // _protocolHandler.addSessionByChannel(s.getChannelId(), s); + reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted()); + s.resubscribe(); + } + }ner, if configured, and propagates the exception to sessions, which in turn will * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them. * * @param cause the exception @@ -1160,23 +1064,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _sessions.remove(channelId); } - /** - * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. - * The caller must hold the failover mutex before calling this method. - */ - public void resubscribeSessions() throws JMSException, AMQException, FailoverException - { - ArrayList sessions = new ArrayList(_sessions.values()); - _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey? - for (Iterator it = sessions.iterator(); it.hasNext();) - { - AMQSession s = (AMQSession) it.next(); - // _protocolHandler.addSessionByChannel(s.getChannelId(), s); - reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted()); - s.resubscribe(); - } - } - public String toString() { StringBuffer buf = new StringBuffer("AMQConnection:\n"); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java new file mode 100644 index 0000000000..e4c9a259ab --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import java.io.IOException; + +import javax.jms.JMSException; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.jms.BrokerDetails; + +public interface AMQConnectionDelegate +{ + public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException; + + public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, + final int prefetchHigh, final int prefetchLow) throws JMSException; + + public void resubscribeSessions() throws JMSException, AMQException, FailoverException; +} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java new file mode 100644 index 0000000000..8213182907 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -0,0 +1,43 @@ +package org.apache.qpid.client; + +import java.io.IOException; + +import javax.jms.JMSException; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate +{ + + private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_10.class); + private AMQConnection _conn; + + public AMQConnectionDelegate_0_10(AMQConnection conn) + { + _conn = conn; + } + + public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh, int prefetchLow) throws JMSException + { + // TODO Auto-generated method stub + return null; + } + + public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException + { + // TODO Auto-generated method stub + + } + + public void resubscribeSessions() throws JMSException, AMQException, FailoverException + { + // TODO Auto-generated method stub + + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java new file mode 100644 index 0000000000..69dfed2dd9 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java @@ -0,0 +1,226 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import java.io.IOException; +import java.net.ConnectException; +import java.nio.channels.UnresolvedAddressException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Iterator; + +import javax.jms.JMSException; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; +import org.apache.qpid.client.failover.FailoverRetrySupport; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.framing.BasicQosBody; +import org.apache.qpid.framing.BasicQosOkBody; +import org.apache.qpid.framing.ChannelOpenBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.TxSelectBody; +import org.apache.qpid.framing.TxSelectOkBody; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ChannelLimitReachedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQConnectionDelegate_0_8 implements AMQConnectionDelegate +{ + private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_8.class); + private AMQConnection _conn; + + public AMQConnectionDelegate_0_8(AMQConnection conn) + { + _conn = conn; + } + + protected boolean checkException(Throwable thrown) + { + Throwable cause = thrown.getCause(); + + if (cause == null) + { + cause = thrown; + } + + return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException)); + } + + public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException + { + try + { + TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); + // this blocks until the connection has been set up or when an error + // has prevented the connection being set up + _conn._protocolHandler.attainState(AMQState.CONNECTION_OPEN); + _conn._failoverPolicy.attainedConnection(); + + // Again this should be changed to a suitable notify + _conn._connected = true; + } + catch (AMQException e) + { + _conn._lastAMQException = e; + throw e; + } + } + + public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) + throws JMSException + { + return createSession(transacted, acknowledgeMode, prefetch, prefetch); + } + + public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, + final int prefetchHigh, final int prefetchLow) throws JMSException + { + _conn.checkNotClosed(); + + if (_conn.channelLimitReached()) + { + throw new ChannelLimitReachedException(_conn._maximumChannelCount); + } + + return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>( + new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>() + { + public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException + { + int channelId = _conn._idFactory.incrementAndGet(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Write channel open frame for channel id " + channelId); + } + + // We must create the session and register it before actually sending the frame to the server to + // open it, so that there is no window where we could receive data on the channel and not be set + // up to handle it appropriately. + AMQSession session = + new AMQSession_0_8(_conn, channelId, transacted, acknowledgeMode, prefetchHigh, + prefetchLow); + // _protocolHandler.addSessionByChannel(channelId, session); + _conn.registerSession(channelId, session); + + boolean success = false; + try + { + createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); + success = true; + } + catch (AMQException e) + { + JMSException jmse = new JMSException("Error creating session: " + e); + jmse.setLinkedException(e); + throw jmse; + } + finally + { + if (!success) + { + _conn.deregisterSession(channelId); + } + } + + if (_conn._started) + { + try + { + session.start(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } + + return session; + } + }, _conn).execute(); + } + + private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) + throws AMQException, FailoverException + { + + // TODO: Be aware of possible changes to parameter order as versions change. + + _conn._protocolHandler.syncWrite(ChannelOpenBody.createAMQFrame(channelId, _conn._protocolHandler.getProtocolMajorVersion(), + _conn._protocolHandler.getProtocolMinorVersion(), null), // outOfBand + ChannelOpenOkBody.class); + + // todo send low water mark when protocol allows. + // todo Be aware of possible changes to parameter order as versions change. + _conn._protocolHandler.syncWrite(BasicQosBody.createAMQFrame(channelId, _conn._protocolHandler.getProtocolMajorVersion(), + _conn._protocolHandler.getProtocolMinorVersion(), false, // global + prefetchHigh, // prefetchCount + 0), // prefetchSize + BasicQosOkBody.class); + + if (transacted) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Issuing TxSelect for " + channelId); + } + + // TODO: Be aware of possible changes to parameter order as versions change. + _conn._protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, _conn._protocolHandler.getProtocolMajorVersion(), + _conn._protocolHandler.getProtocolMinorVersion()), TxSelectOkBody.class); + } + } + + /** + * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. + * The caller must hold the failover mutex before calling this method. + */ + public void resubscribeSessions() throws JMSException, AMQException, FailoverException + { + ArrayList sessions = new ArrayList(_conn._sessions.values()); + _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey? + for (Iterator it = sessions.iterator(); it.hasNext();) + { + AMQSession s = (AMQSession) it.next(); + // _protocolHandler.addSessionByChannel(s.getChannelId(), s); + reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted()); + s.resubscribe(); + } + } + + private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) + throws AMQException, FailoverException + { + try + { + createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); + } + catch (AMQException e) + { + _conn.deregisterSession(channelId); + throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e); + } + } +} |