summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-07-02 10:13:06 +0000
committerAidan Skinner <aidan@apache.org>2008-07-02 10:13:06 +0000
commit0f8664c9ed48cf9ff903dfa103564c9147af45f0 (patch)
tree98a51a1179cb04064aa70ec1925bbdfea2a6069e
parentf305d3d0eaed6b4b9390b86df3fc79013c905a7d (diff)
downloadqpid-python-0f8664c9ed48cf9ff903dfa103564c9147af45f0.tar.gz
QPID-960 make protocol negotiation work from 0-10 down to 0-9 and then 8-0
still needs love to do with railover, see QPID-959 AMQConnection.java: use 8_0 delegate for in-vm tests AMQConnectionDelegate_0_9.java: add subclass for class.forname'ing rename AMQConnectionDelegate_0_8.java to AMQConnectionDelegate_8_0.java to match protocol version properly git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@673347 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java234
-rwxr-xr-xjava/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java32
3 files changed, 33 insertions, 235 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index cf6ac54e55..59bf103089 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -368,7 +368,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
BrokerDetails brokerDetails = _failoverPolicy.getNextBrokerDetails();
if (brokerDetails.getTransport().equals(BrokerDetails.VM))
{
- _delegate = new AMQConnectionDelegate_0_8(this);
+ _delegate = new AMQConnectionDelegate_8_0(this);
}
else
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
deleted file mode 100644
index 79f81c4a2d..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.Iterator;
-import java.util.Set;
-
-import javax.jms.JMSException;
-import javax.jms.XASession;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.failover.FailoverProtectedOperation;
-import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicQosOkBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.framing.TxSelectOkBody;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.ChannelLimitReachedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQConnectionDelegate_0_8 implements AMQConnectionDelegate
-{
- private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_8.class);
- private AMQConnection _conn;
-
-
- public void closeConneciton(long timeout) throws JMSException, AMQException
- {
- _conn.getProtocolHandler().closeConnection(timeout);
-
- }
-
- public AMQConnectionDelegate_0_8(AMQConnection conn)
- {
- _conn = conn;
- }
-
- protected boolean checkException(Throwable thrown)
- {
- Throwable cause = thrown.getCause();
-
- if (cause == null)
- {
- cause = thrown;
- }
-
- return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
- }
-
- public void makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException
- {
- final Set<AMQState> openOrClosedStates =
- EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
-
- TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
- // this blocks until the connection has been set up or when an error
- // has prevented the connection being set up
-
- AMQState state = _conn._protocolHandler.attainState(openOrClosedStates);
- if(state == AMQState.CONNECTION_OPEN)
- {
- _conn._failoverPolicy.attainedConnection();
-
- // Again this should be changed to a suitable notify
- _conn._connected = true;
- }
- }
-
- public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
- throws JMSException
- {
- return createSession(transacted, acknowledgeMode, prefetch, prefetch);
- }
-
- public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
- {
- throw new UnsupportedOperationException("0_8 version does not provide XA support");
- }
-
- public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
- final int prefetchHigh, final int prefetchLow) throws JMSException
- {
- _conn.checkNotClosed();
-
- if (_conn.channelLimitReached())
- {
- throw new ChannelLimitReachedException(_conn._maximumChannelCount);
- }
-
- return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
- new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
- {
- public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
- {
- int channelId = _conn._idFactory.incrementAndGet();
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Write channel open frame for channel id " + channelId);
- }
-
- // We must create the session and register it before actually sending the frame to the server to
- // open it, so that there is no window where we could receive data on the channel and not be set
- // up to handle it appropriately.
- AMQSession session =
- new AMQSession_0_8(_conn, channelId, transacted, acknowledgeMode, prefetchHigh,
- prefetchLow);
- // _protocolHandler.addSessionByChannel(channelId, session);
- _conn.registerSession(channelId, session);
-
- boolean success = false;
- try
- {
- createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
- success = true;
- }
- catch (AMQException e)
- {
- JMSException jmse = new JMSException("Error creating session: " + e);
- jmse.setLinkedException(e);
- throw jmse;
- }
- finally
- {
- if (!success)
- {
- _conn.deregisterSession(channelId);
- }
- }
-
- if (_conn._started)
- {
- try
- {
- session.start();
- }
- catch (AMQException e)
- {
- throw new JMSAMQException(e);
- }
- }
-
- return session;
- }
- }, _conn).execute();
- }
-
- private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
- throws AMQException, FailoverException
- {
- ChannelOpenBody channelOpenBody = _conn.getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
- // TODO: Be aware of possible changes to parameter order as versions change.
- _conn._protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
-
- // todo send low water mark when protocol allows.
- // todo Be aware of possible changes to parameter order as versions change.
- BasicQosBody basicQosBody = _conn.getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false);
- _conn._protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
-
- if (transacted)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Issuing TxSelect for " + channelId);
- }
- TxSelectBody body = _conn.getProtocolHandler().getMethodRegistry().createTxSelectBody();
-
- // TODO: Be aware of possible changes to parameter order as versions change.
- _conn._protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
- }
- }
-
- /**
- * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
- * The caller must hold the failover mutex before calling this method.
- */
- public void resubscribeSessions() throws JMSException, AMQException, FailoverException
- {
- ArrayList sessions = new ArrayList(_conn.getSessions().values());
- _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
- for (Iterator it = sessions.iterator(); it.hasNext();)
- {
- AMQSession s = (AMQSession) it.next();
- // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
- reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
- s.resubscribe();
- }
- }
-
- private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
- throws AMQException, FailoverException
- {
- try
- {
- createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
- }
- catch (AMQException e)
- {
- _conn.deregisterSession(channelId);
- throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e);
- }
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java
new file mode 100755
index 0000000000..d95e2e3dff
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+
+public class AMQConnectionDelegate_0_9 extends AMQConnectionDelegate_8_0
+{
+
+ public AMQConnectionDelegate_0_9(AMQConnection conn)
+ {
+ super(conn);
+ }
+
+}