summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties2
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties2
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties2
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties2
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java48
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/url/URLParser.java (renamed from java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_8.java)4
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java6
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/Client.java46
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/tests.properties4
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQProtocolException.java38
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java9
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ErrorCode.java1
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ProtocolException.java36
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyClient.java4
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java8
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java24
-rw-r--r--java/test-provider.properties2
21 files changed, 222 insertions, 98 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties
index cc465e9251..4b86126cf6 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties
@@ -20,7 +20,7 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF
# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
-connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672
+connectionfactory.qpidConnectionfactory = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
# Register an AMQP destination in JNDI
# destination.[jniName] = [BindingURL]
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties
index 0f1dd43aa9..4b98477a5f 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties
@@ -21,7 +21,7 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF
# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
-connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672
+connectionfactory.qpidConnectionfactory = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
# Register an AMQP destination in JNDI
# destination.[jniName] = [BindingURL]
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties
index dc9061866a..675ac7fc0f 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties
@@ -21,7 +21,7 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF
# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
-connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672
+connectionfactory.qpidConnectionfactory = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties
index e732ce560d..8d6706eeb8 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties
@@ -20,7 +20,7 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF
# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
-connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672
+connectionfactory.qpidConnectionfactory = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties
index 394d5f9036..601c5a24e2 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties
@@ -20,7 +20,7 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF
# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
-connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672
+connectionfactory.qpidConnectionfactory = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index fc11794cba..adbe03e986 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -50,10 +50,7 @@ import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
-import org.apache.qpid.AMQConnectionFailureException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.*;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -64,8 +61,8 @@ import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.url.QpidURL;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpidity.transport.TransportConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -234,30 +231,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
- /* This JVM arg is only used for test code
- Unless u pass a url it is difficult to determine which version to use
- Most of the test code use an AMQConnection constructor that doesn't use
- the url. So you need this switch to say which code path to test.
-
- Another complication is that when a constructor is called with out a url
- they would construct a 0-8 url and pass into the construtor that takes a url.
-
- In such an instance u need the jvm argument to force an 0-10 connection
- Once the 0-10 code base stabilises, 0-10 will be the default.
- */
-
- if (Boolean.getBoolean("SwitchCon"))
- {
- connectionURL.setURLVersion((Boolean.getBoolean("0-10")? ConnectionURL.URL_0_10:ConnectionURL.URL_0_8));
- }
-
- if (connectionURL.getURLVersion() == ConnectionURL.URL_0_10)
+ _failoverPolicy = new FailoverPolicy(connectionURL);
+ if (_failoverPolicy.getCurrentBrokerDetails().getTransport().equals(BrokerDetails.VM))
{
- _delegate = new AMQConnectionDelegate_0_10(this);
+ _delegate = new AMQConnectionDelegate_0_8(this);
}
else
{
- _delegate = new AMQConnectionDelegate_0_8(this);
+ // We always assume that the broker supports the lates AMQ protocol verions
+ // thie is currently 0.10
+ // TODO: use this code once we have switch to 0.10
+ // getDelegate();
+ _delegate = new AMQConnectionDelegate_0_10(this);
}
if (_logger.isInfoEnabled())
@@ -299,7 +284,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
- _failoverPolicy = new FailoverPolicy(connectionURL);
_protocolHandler = new AMQProtocolHandler(this);
// We are not currently connected
@@ -316,6 +300,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
lastException = null;
_connected = true;
}
+ catch (AMQProtocolException pe)
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info(pe.getMessage());
+ _logger.info("Trying broker supported protocol version: " +
+ TransportConstants.getVersionMajor() + "." +
+ TransportConstants.getVersionMinor());
+ }
+ // we need to check whether we have a delegate for the supported protocol
+ getDelegate();
+ }
catch (Exception e)
{
lastException = e;
@@ -383,6 +379,26 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_connectionMetaData = new QpidConnectionMetaData(this);
}
+ private void getDelegate() throws AMQProtocolException
+ {
+ try
+ {
+ Class c = Class.forName("org.apache.qpid.client.AMQConnectionDelegate_" +
+ TransportConstants.getVersionMajor() + "_" +
+ TransportConstants.getVersionMinor());
+ Class partypes[] = new Class[1];
+ partypes[0] = AMQConnection.class;
+ _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
+ }
+ catch (Exception e)
+ {
+ throw new AMQProtocolException(AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR,
+ "Protocol: " + TransportConstants.getVersionMajor() + "."
+ + TransportConstants.getVersionMinor() + " is rquired by the broker but is not " +
+ "currently supported by this client library implementation", e);
+ }
+ }
+
protected AMQConnection(String username, String password, String clientName, String virtualHost)
{
_clientName = clientName;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index bf1ed49492..bde60c433f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -6,6 +6,7 @@ import javax.jms.JMSException;
import javax.jms.XASession;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.jms.BrokerDetails;
@@ -14,6 +15,7 @@ import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.ClosedListener;
import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
+import org.apache.qpidity.ProtocolException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -113,6 +115,10 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed
_conn.getUsername(), _conn.getPassword());
_qpidConnection.setClosedListener(this);
}
+ catch(ProtocolException pe)
+ {
+ throw new AMQProtocolException(null, pe.getMessage(), pe);
+ }
catch (QpidException e)
{
throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
index dfc87e21b1..3fcec67fe1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
@@ -37,6 +37,7 @@ import javax.naming.spi.ObjectFactory;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpidity.transport.TransportConstants;
public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory,
@@ -429,9 +430,10 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF
*/
public XAConnection createXAConnection() throws JMSException
{
- if (_connectionDetails.getURLVersion() == ConnectionURL.URL_0_8)
+ if (TransportConstants.getVersionMajor() == 0 &&
+ TransportConstants.getVersionMinor() == 8)
{
- throw new UnsupportedOperationException("This version does not support XA operations");
+ throw new UnsupportedOperationException("This protocol version does not support XA operations");
}
else
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
index ae9a5ff802..770fab7a81 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
@@ -24,10 +24,8 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.net.MalformedURLException;
-import org.apache.qpid.client.url.URLParser_0_8;
-import org.apache.qpid.client.url.URLParser_0_10;
+import org.apache.qpid.client.url.URLParser;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
@@ -53,7 +51,6 @@ public class AMQConnectionURL implements ConnectionURL
private AMQShortString _defaultTopicExchangeName;
private AMQShortString _temporaryTopicExchangeName;
private AMQShortString _temporaryQueueExchangeName;
- private byte _urlVersion;
public AMQConnectionURL(String fullURL) throws URLSyntaxException
{
@@ -62,48 +59,7 @@ public class AMQConnectionURL implements ConnectionURL
_options = new HashMap<String, String>();
_brokers = new LinkedList<BrokerDetails>();
_failoverOptions = new HashMap<String, String>();
-
- if (!Boolean.getBoolean("SwitchCon"))
- {
- // We need to decided the version based on URL
- if (fullURL.startsWith("qpid"))
- {
- //URLParser
- URLParser_0_10 parser = null;
- try
- {
- parser = new URLParser_0_10(fullURL);
- }
- catch (MalformedURLException e)
- {
- throw new URLSyntaxException(fullURL,e.getMessage(),0,0);
- }
- setBrokerDetails(parser.getAllBrokerDetails());
- // use the first instance username and password
- // This is temporary as the URL must be changed for olding this information as part of the full URL
- BrokerDetails firstBroker = getBrokerDetails(0);
- setUsername(firstBroker.getProperty(BrokerDetails.USERNAME));
- setPassword(firstBroker.getProperty(BrokerDetails.PASSWORD));
- setClientName(firstBroker.getProperty(BrokerDetails.CLIENT_ID));
- setVirtualHost(firstBroker.getProperty(BrokerDetails.VIRTUAL_HOST));
- _urlVersion = URL_0_10;
- }
- else
- {
- new URLParser_0_8(this);
- _urlVersion = URL_0_8;
- }
- }
- }
-
- public byte getURLVersion()
- {
- return _urlVersion;
- }
-
- public void setURLVersion(byte version)
- {
- _urlVersion = version;
+ new URLParser(this);
}
public String getURL()
diff --git a/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_8.java b/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java
index 5cde4d196a..b975713ad7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java
@@ -11,11 +11,11 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.url.URLHelper;
import org.apache.qpid.url.URLSyntaxException;
-public class URLParser_0_8
+public class URLParser
{
private AMQConnectionURL _url;
- public URLParser_0_8(AMQConnectionURL url)throws URLSyntaxException
+ public URLParser(AMQConnectionURL url)throws URLSyntaxException
{
_url = url;
parseURL(_url.getURL());
diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
index 7cae7f8a9f..8ce302564b 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
@@ -43,11 +43,7 @@ public interface ConnectionURL
public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange";
public static final byte URL_0_8 = 1;
public static final byte URL_0_10 = 2;
-
- byte getURLVersion();
-
- void setURLVersion(byte version);
-
+
String getURL();
String getFailoverMethod();
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
index 3944e2c3f3..8b787615e4 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
@@ -11,6 +11,7 @@ import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.url.QpidURL;
import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
+import org.apache.qpidity.ProtocolException;
import org.apache.qpidity.nclient.impl.ClientSession;
import org.apache.qpidity.nclient.impl.ClientSessionDelegate;
import org.apache.qpidity.transport.Channel;
@@ -49,14 +50,14 @@ public class Client implements org.apache.qpidity.nclient.Connection
public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException
{
- Condition negotiationComplete = _lock.newCondition();
+ final Condition negotiationComplete = _lock.newCondition();
closeOk = _lock.newCondition();
_lock.lock();
ConnectionDelegate connectionDelegate = new ConnectionDelegate()
{
private boolean receivedClose = false;
-
+ private String _unsupportedProtocol;
public SessionDelegate getSessionDelegate()
{
return new ClientSessionDelegate();
@@ -115,6 +116,32 @@ public class Client implements org.apache.qpidity.nclient.Connection
this.receivedClose = true;
}
+
+ @Override public void init(Channel ch, ProtocolHeader hdr)
+ {
+ // TODO: once the merge is done we'll need to update this code
+ // for handling 0.8 protocol version type i.e. major=8 and minor=0 :(
+ if (hdr.getMajor() != TransportConstants.getVersionMajor()
+ || hdr.getMinor() != TransportConstants.getVersionMinor())
+ {
+ _unsupportedProtocol = TransportConstants.getVersionMajor() + "." +
+ TransportConstants.getVersionMinor();
+ TransportConstants.setVersionMajor( hdr.getMajor() );
+ TransportConstants.setVersionMinor( hdr.getMinor() );
+ _lock.lock();
+ negotiationComplete.signalAll();
+ _lock.unlock();
+ }
+ else
+ {
+ ch.connectionStart(hdr.getMajor(), hdr.getMinor(), null, "PLAIN", "utf8");
+ }
+ }
+
+ @Override public String getUnsupportedProtocol()
+ {
+ return _unsupportedProtocol;
+ }
};
connectionDelegate.setCondition(_lock,negotiationComplete);
@@ -123,8 +150,7 @@ public class Client implements org.apache.qpidity.nclient.Connection
connectionDelegate.setVirtualHost(virtualHost);
if (System.getProperty("transport","mina").equalsIgnoreCase("nio"))
- {
- System.out.println("Using NIO");
+ {
if( _logger.isDebugEnabled())
{
_logger.debug("using NIO");
@@ -142,13 +168,21 @@ public class Client implements org.apache.qpidity.nclient.Connection
}
// XXX: hardcoded version numbers
- _conn.send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.CONNECTION_VERSION_MAJOR, TransportConstants.CONNECTION_VERSION_MINOR)));
+ _conn.send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.getVersionMajor(),
+ TransportConstants.getVersionMinor())));
try
{
negotiationComplete.await();
+ if( connectionDelegate.getUnsupportedProtocol() != null )
+ {
+ _conn.close();
+ throw new ProtocolException("Unsupported protocol version: " + connectionDelegate.getUnsupportedProtocol()
+ , ErrorCode.UNSUPPORTED_PROTOCOL, null);
+
+ }
}
- catch (Exception e)
+ catch (InterruptedException e)
{
//
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/tests.properties b/java/client/src/test/java/org/apache/qpid/test/unit/tests.properties
index 893958949f..32ed16a392 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/tests.properties
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/tests.properties
@@ -23,7 +23,9 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF
# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
-connectionfactory.local = qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672
+connectionfactory.local = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
+#qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672
+
# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
diff --git a/java/common/src/main/java/org/apache/qpid/AMQProtocolException.java b/java/common/src/main/java/org/apache/qpid/AMQProtocolException.java
new file mode 100644
index 0000000000..bbc569839a
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/AMQProtocolException.java
@@ -0,0 +1,38 @@
+package org.apache.qpid;
+
+import org.apache.qpid.protocol.AMQConstant;
+
+/* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+public class AMQProtocolException extends AMQException
+{
+ /**
+ * Constructor for a Protocol Exception
+ * <p> This is the only provided constructor and the parameters have to be
+ * set to null when they are unknown.
+ *
+ * @param msg A description of the reason of this exception .
+ * @param errorCode A string specifyin the error code of this exception.
+ * @param cause The linked Execption.
+ */
+ public AMQProtocolException(AMQConstant errorCode, String msg, Throwable cause)
+ {
+ super(errorCode, msg, cause);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
index 375df2a45d..8dee790a9e 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
@@ -153,6 +153,15 @@ public final class AMQConstant
public static final AMQConstant FRAME_MIN_SIZE = new AMQConstant(4096, "frame min size", true);
+ /**
+ * The server does not support the protocol version
+ */
+ public static final AMQConstant UNSUPPORTED_BROKER_PROTOCOL_ERROR = new AMQConstant(542, "broker unsupported protocol", true);
+ /**
+ * The client imp does not support the protocol version
+ */
+ public static final AMQConstant UNSUPPORTED_CLIENT_PROTOCOL_ERROR = new AMQConstant(543, "client unsupported protocol", true);
+
/** The AMQP status code. */
private int _code;
diff --git a/java/common/src/main/java/org/apache/qpidity/ErrorCode.java b/java/common/src/main/java/org/apache/qpidity/ErrorCode.java
index 4ff6939139..4b18c46d16 100644
--- a/java/common/src/main/java/org/apache/qpidity/ErrorCode.java
+++ b/java/common/src/main/java/org/apache/qpidity/ErrorCode.java
@@ -6,6 +6,7 @@ public enum ErrorCode
UNDEFINED(1,"undefined",true),
MESSAGE_REJECTED(2,"message_rejected",true),
CONNECTION_ERROR(3,"connection was closed",true),
+ UNSUPPORTED_PROTOCOL(4, "protocol version is unsupported", true),
//This might change in the spec, the error class is not applicable
NO_ERROR(200,"reply-success",true),
diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolException.java b/java/common/src/main/java/org/apache/qpidity/ProtocolException.java
new file mode 100644
index 0000000000..596143a1b9
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/ProtocolException.java
@@ -0,0 +1,36 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpidity;
+
+public class ProtocolException extends QpidException
+{
+ /**
+ * Constructor for a Ptotocol Exception.
+ * <p> This is the only provided constructor and the parameters have to be set to null when
+ * they are unknown.
+ * @param message A description of the reason of this exception.
+ * @param errorCode A string specifyin the error code of this exception.
+ * @param cause The linked Execption.
+ *
+ */
+ public ProtocolException(String message, ErrorCode errorCode, Throwable cause)
+ {
+ super(message, errorCode, cause);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java
index e455be0873..10b68bbb20 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java
@@ -75,7 +75,9 @@ class ToyClient extends SessionDelegate
}
public void closed() {}
});
- conn.send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.CONNECTION_VERSION_MAJOR, TransportConstants.CONNECTION_VERSION_MINOR)));
+ conn.send(new ConnectionEvent(0, new ProtocolHeader(1,
+ TransportConstants.getVersionMajor(),
+ TransportConstants.getVersionMinor())));
Channel ch = conn.getChannel(0);
Session ssn = new Session();
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
index 21c7b8c16b..4815f1025f 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
@@ -83,7 +83,8 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
if (hdr.getMajor() != 0 && hdr.getMinor() != 10)
{
// XXX
- ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.CONNECTION_VERSION_MAJOR, TransportConstants.CONNECTION_VERSION_MINOR)));
+ ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.getVersionMajor(),
+ TransportConstants.getVersionMinor())));
ch.getConnection().close();
}
else
@@ -282,4 +283,9 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
{
_virtualHost = host;
}
+
+ public String getUnsupportedProtocol()
+ {
+ return null;
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java b/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java
index 47f7f17578..54429a1a4f 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java
@@ -2,6 +2,26 @@ package org.apache.qpidity.transport;
public class TransportConstants
{
- public static final byte CONNECTION_VERSION_MAJOR = 99;
- public static final byte CONNECTION_VERSION_MINOR = 0;
+ private static byte _protocol_version_minor = 0;
+ private static byte _protocol_version_major = 99;
+
+ public static void setVersionMajor(byte value)
+ {
+ _protocol_version_major = value;
+ }
+
+ public static void setVersionMinor(byte value)
+ {
+ _protocol_version_minor = value;
+ }
+
+ public static byte getVersionMajor()
+ {
+ return _protocol_version_major;
+ }
+
+ public static byte getVersionMinor()
+ {
+ return _protocol_version_minor;
+ }
}
diff --git a/java/test-provider.properties b/java/test-provider.properties
index 8dcba7230f..38cc146ae6 100644
--- a/java/test-provider.properties
+++ b/java/test-provider.properties
@@ -1,4 +1,4 @@
-connectionfactory.local = qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672
+connectionfactory.local = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
queue.MyQueue = example.MyQueue
queue.xaQueue = xaQueue