diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java | 204 |
1 files changed, 170 insertions, 34 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index a2df2f3cf2..af21eb7ed0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -1,26 +1,52 @@ package org.apache.qpid.client; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.XASession; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQProtocolException; -import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.ClosedListener; -import org.apache.qpid.ErrorCode; -import org.apache.qpid.QpidException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionClose; +import org.apache.qpid.transport.ConnectionException; +import org.apache.qpid.transport.ConnectionListener; import org.apache.qpid.transport.ProtocolVersionException; +import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ClosedListener +public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener { /** * This class logger. @@ -35,12 +61,15 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed /** * The QpidConeection instance that is mapped with thie JMS connection. */ - org.apache.qpid.nclient.Connection _qpidConnection; + org.apache.qpid.transport.Connection _qpidConnection; + private ConnectionException exception = null; //--- constructor public AMQConnectionDelegate_0_10(AMQConnection conn) { _conn = conn; + _qpidConnection = new Connection(); + _qpidConnection.setConnectionListener(this); } /** @@ -50,7 +79,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed throws JMSException { _conn.checkNotClosed(); - int channelId = _conn._idFactory.incrementAndGet(); + int channelId = _conn.getNextChannelID(); AMQSession session; try { @@ -71,12 +100,24 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed } /** + * Create an XASession with default prefetch values of: + * High = MaxPrefetch + * Low = MaxPrefetch / 2 + * @return XASession + * @throws JMSException + */ + public XASession createXASession() throws JMSException + { + return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2); + } + + /** * create an XA Session and start it if required. */ public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException { _conn.checkNotClosed(); - int channelId = _conn._idFactory.incrementAndGet(); + int channelId = _conn.getNextChannelID(); XASessionImpl session; try { @@ -104,70 +145,165 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed */ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException { - _qpidConnection = Client.createConnection(); try { if (_logger.isDebugEnabled()) { - _logger.debug("creating connection with broker " + " host: " + brokerDetail - .getHost() + " port: " + brokerDetail.getPort() + " virtualhost: " + _conn - .getVirtualHost() + "user name: " + _conn.getUsername() + "password: " + _conn.getPassword()); + _logger.debug("connecting to host: " + brokerDetail.getHost() + + " port: " + brokerDetail.getPort() + + " vhost: " + _conn.getVirtualHost() + + " username: " + _conn.getUsername() + + " password: " + _conn.getPassword()); + } + + if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null) + { + this.setIdleTimeout(Long.parseLong(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT))); } + else + { + // use the default value set for all connections + this.setIdleTimeout(Long.getLong(ClientProperties.IDLE_TIMEOUT_PROP_NAME,0)); + } + + String saslMechs = brokerDetail.getProperty("sasl_mechs")!= null? + brokerDetail.getProperty("sasl_mechs"): + System.getProperty("qpid.sasl_mechs","PLAIN"); + _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(), - _conn.getUsername(), _conn.getPassword()); - _qpidConnection.setClosedListener(this); + _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL(),saslMechs); _conn._connected = true; + _conn._failoverPolicy.attainedConnection(); } catch(ProtocolVersionException pe) { return new ProtocolVersion(pe.getMajor(), pe.getMinor()); } - catch (QpidException e) - { + catch (ConnectionException e) + { throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e); } return null; } - /** - * Not supported at this level. - */ + public void failoverPrep() + { + List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values()); + for (AMQSession s : sessions) + { + s.failoverPrep(); + } + } + public void resubscribeSessions() throws JMSException, AMQException, FailoverException { - //NOT implemented as railover is handled at a lower level - throw new FailoverException("failing to reconnect during failover, operation not supported."); + List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values()); + _logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size())); + for (AMQSession s : sessions) + { + ((AMQSession_0_10) s)._qpidConnection = _qpidConnection; + s.resubscribe(); + } } - public void closeConneciton(long timeout) throws JMSException, AMQException + public void closeConnection(long timeout) throws JMSException, AMQException { try { _qpidConnection.close(); } - catch (QpidException e) + catch (TransportException e) + { + throw new AMQException(e.getMessage(), e); + } + } + + public void opened(Connection conn) {} + + public void exception(Connection conn, ConnectionException exc) + { + if (exception != null) { - throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot close connection", e); + _logger.error("previous exception", exception); } + exception = exc; } - public void onClosed(ErrorCode errorCode, String reason, Throwable t) + public void closed(Connection conn) { - if (_logger.isDebugEnabled()) + ConnectionException exc = exception; + exception = null; + + if (exc == null) + { + return; + } + + ConnectionClose close = exc.getClose(); + if (close == null) + { + try + { + if (_conn.firePreFailover(false) && _conn.attemptReconnection()) + { + _conn.failoverPrep(); + _qpidConnection.resume(); + _conn.fireFailoverComplete(); + return; + } + } + catch (Exception e) + { + _logger.error("error during failover", e); + } + } + + ExceptionListener listener = _conn._exceptionListener; + if (listener == null) { - _logger.debug("Received a connection close from the broker: Error code : " + errorCode.getCode(), t); + _logger.error("connection exception: " + conn, exc); } - if (_conn._exceptionListener != null) + else { - JMSException ex = new JMSException(reason,String.valueOf(errorCode.getCode())); - if (t != null) + String code = null; + if (close != null) { - ex.initCause(t); + code = close.getReplyCode().toString(); } - _conn._exceptionListener.onException(ex); + JMSException ex = new JMSException(exc.getMessage(), code); + ex.initCause(exc); + listener.onException(ex); + } + } + + public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E + { + try + { + return operation.execute(); } + catch (FailoverException e) + { + throw new RuntimeException(e); + } + } + + public void setIdleTimeout(long l) + { + _qpidConnection.setIdleTimeout(l); + } + + public int getMaxChannelID() + { + return Integer.MAX_VALUE; + } + + public ProtocolVersion getProtocolVersion() + { + return ProtocolVersion.v0_10; } } |