summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java204
1 files changed, 170 insertions, 34 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index a2df2f3cf2..af21eb7ed0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -1,26 +1,52 @@
package org.apache.qpid.client;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.XASession;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQProtocolException;
-import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.ClosedListener;
-import org.apache.qpid.ErrorCode;
-import org.apache.qpid.QpidException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionClose;
+import org.apache.qpid.transport.ConnectionException;
+import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.ProtocolVersionException;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ClosedListener
+public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener
{
/**
* This class logger.
@@ -35,12 +61,15 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed
/**
* The QpidConeection instance that is mapped with thie JMS connection.
*/
- org.apache.qpid.nclient.Connection _qpidConnection;
+ org.apache.qpid.transport.Connection _qpidConnection;
+ private ConnectionException exception = null;
//--- constructor
public AMQConnectionDelegate_0_10(AMQConnection conn)
{
_conn = conn;
+ _qpidConnection = new Connection();
+ _qpidConnection.setConnectionListener(this);
}
/**
@@ -50,7 +79,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed
throws JMSException
{
_conn.checkNotClosed();
- int channelId = _conn._idFactory.incrementAndGet();
+ int channelId = _conn.getNextChannelID();
AMQSession session;
try
{
@@ -71,12 +100,24 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed
}
/**
+ * Create an XASession with default prefetch values of:
+ * High = MaxPrefetch
+ * Low = MaxPrefetch / 2
+ * @return XASession
+ * @throws JMSException
+ */
+ public XASession createXASession() throws JMSException
+ {
+ return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2);
+ }
+
+ /**
* create an XA Session and start it if required.
*/
public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
{
_conn.checkNotClosed();
- int channelId = _conn._idFactory.incrementAndGet();
+ int channelId = _conn.getNextChannelID();
XASessionImpl session;
try
{
@@ -104,70 +145,165 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed
*/
public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
{
- _qpidConnection = Client.createConnection();
try
{
if (_logger.isDebugEnabled())
{
- _logger.debug("creating connection with broker " + " host: " + brokerDetail
- .getHost() + " port: " + brokerDetail.getPort() + " virtualhost: " + _conn
- .getVirtualHost() + "user name: " + _conn.getUsername() + "password: " + _conn.getPassword());
+ _logger.debug("connecting to host: " + brokerDetail.getHost() +
+ " port: " + brokerDetail.getPort() +
+ " vhost: " + _conn.getVirtualHost() +
+ " username: " + _conn.getUsername() +
+ " password: " + _conn.getPassword());
+ }
+
+ if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null)
+ {
+ this.setIdleTimeout(Long.parseLong(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT)));
}
+ else
+ {
+ // use the default value set for all connections
+ this.setIdleTimeout(Long.getLong(ClientProperties.IDLE_TIMEOUT_PROP_NAME,0));
+ }
+
+ String saslMechs = brokerDetail.getProperty("sasl_mechs")!= null?
+ brokerDetail.getProperty("sasl_mechs"):
+ System.getProperty("qpid.sasl_mechs","PLAIN");
+
_qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
- _conn.getUsername(), _conn.getPassword());
- _qpidConnection.setClosedListener(this);
+ _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL(),saslMechs);
_conn._connected = true;
+ _conn._failoverPolicy.attainedConnection();
}
catch(ProtocolVersionException pe)
{
return new ProtocolVersion(pe.getMajor(), pe.getMinor());
}
- catch (QpidException e)
- {
+ catch (ConnectionException e)
+ {
throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e);
}
return null;
}
- /**
- * Not supported at this level.
- */
+ public void failoverPrep()
+ {
+ List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
+ for (AMQSession s : sessions)
+ {
+ s.failoverPrep();
+ }
+ }
+
public void resubscribeSessions() throws JMSException, AMQException, FailoverException
{
- //NOT implemented as railover is handled at a lower level
- throw new FailoverException("failing to reconnect during failover, operation not supported.");
+ List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
+ _logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size()));
+ for (AMQSession s : sessions)
+ {
+ ((AMQSession_0_10) s)._qpidConnection = _qpidConnection;
+ s.resubscribe();
+ }
}
- public void closeConneciton(long timeout) throws JMSException, AMQException
+ public void closeConnection(long timeout) throws JMSException, AMQException
{
try
{
_qpidConnection.close();
}
- catch (QpidException e)
+ catch (TransportException e)
+ {
+ throw new AMQException(e.getMessage(), e);
+ }
+ }
+
+ public void opened(Connection conn) {}
+
+ public void exception(Connection conn, ConnectionException exc)
+ {
+ if (exception != null)
{
- throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot close connection", e);
+ _logger.error("previous exception", exception);
}
+ exception = exc;
}
- public void onClosed(ErrorCode errorCode, String reason, Throwable t)
+ public void closed(Connection conn)
{
- if (_logger.isDebugEnabled())
+ ConnectionException exc = exception;
+ exception = null;
+
+ if (exc == null)
+ {
+ return;
+ }
+
+ ConnectionClose close = exc.getClose();
+ if (close == null)
+ {
+ try
+ {
+ if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ {
+ _conn.failoverPrep();
+ _qpidConnection.resume();
+ _conn.fireFailoverComplete();
+ return;
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("error during failover", e);
+ }
+ }
+
+ ExceptionListener listener = _conn._exceptionListener;
+ if (listener == null)
{
- _logger.debug("Received a connection close from the broker: Error code : " + errorCode.getCode(), t);
+ _logger.error("connection exception: " + conn, exc);
}
- if (_conn._exceptionListener != null)
+ else
{
- JMSException ex = new JMSException(reason,String.valueOf(errorCode.getCode()));
- if (t != null)
+ String code = null;
+ if (close != null)
{
- ex.initCause(t);
+ code = close.getReplyCode().toString();
}
- _conn._exceptionListener.onException(ex);
+ JMSException ex = new JMSException(exc.getMessage(), code);
+ ex.initCause(exc);
+ listener.onException(ex);
+ }
+ }
+
+ public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
+ {
+ try
+ {
+ return operation.execute();
}
+ catch (FailoverException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void setIdleTimeout(long l)
+ {
+ _qpidConnection.setIdleTimeout(l);
+ }
+
+ public int getMaxChannelID()
+ {
+ return Integer.MAX_VALUE;
+ }
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return ProtocolVersion.v0_10;
}
}