diff options
21 files changed, 222 insertions, 98 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties index cc465e9251..4b86126cf6 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties @@ -20,7 +20,7 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF # register some connection factories # connectionfactory.[jndiname] = [ConnectionURL] -connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672 +connectionfactory.qpidConnectionfactory = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672' # Register an AMQP destination in JNDI # destination.[jniName] = [BindingURL] diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties index 0f1dd43aa9..4b98477a5f 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties @@ -21,7 +21,7 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF # register some connection factories # connectionfactory.[jndiname] = [ConnectionURL] -connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672 +connectionfactory.qpidConnectionfactory = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672' # Register an AMQP destination in JNDI # destination.[jniName] = [BindingURL] diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties index dc9061866a..675ac7fc0f 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties @@ -21,7 +21,7 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF # register some connection factories # connectionfactory.[jndiname] = [ConnectionURL] -connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672 +connectionfactory.qpidConnectionfactory = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672' # register some topics in JNDI using the form # topic.[jndiName] = [physicalName] diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties index e732ce560d..8d6706eeb8 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties @@ -20,7 +20,7 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF # register some connection factories # connectionfactory.[jndiname] = [ConnectionURL] -connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672 +connectionfactory.qpidConnectionfactory = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672' # register some queues in JNDI using the form # queue.[jndiName] = [physicalName] diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties index 394d5f9036..601c5a24e2 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties @@ -20,7 +20,7 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF # register some connection factories # connectionfactory.[jndiname] = [ConnectionURL] -connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672 +connectionfactory.qpidConnectionfactory = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672' # register some queues in JNDI using the form # queue.[jndiName] = [physicalName] diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index fc11794cba..adbe03e986 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -50,10 +50,7 @@ import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; -import org.apache.qpid.AMQConnectionFailureException; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQUndeliveredException; -import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.*; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.exchange.ExchangeDefaults; @@ -64,8 +61,8 @@ import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.url.QpidURL; import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpidity.transport.TransportConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -234,30 +231,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { - /* This JVM arg is only used for test code - Unless u pass a url it is difficult to determine which version to use - Most of the test code use an AMQConnection constructor that doesn't use - the url. So you need this switch to say which code path to test. - - Another complication is that when a constructor is called with out a url - they would construct a 0-8 url and pass into the construtor that takes a url. - - In such an instance u need the jvm argument to force an 0-10 connection - Once the 0-10 code base stabilises, 0-10 will be the default. - */ - - if (Boolean.getBoolean("SwitchCon")) - { - connectionURL.setURLVersion((Boolean.getBoolean("0-10")? ConnectionURL.URL_0_10:ConnectionURL.URL_0_8)); - } - - if (connectionURL.getURLVersion() == ConnectionURL.URL_0_10) + _failoverPolicy = new FailoverPolicy(connectionURL); + if (_failoverPolicy.getCurrentBrokerDetails().getTransport().equals(BrokerDetails.VM)) { - _delegate = new AMQConnectionDelegate_0_10(this); + _delegate = new AMQConnectionDelegate_0_8(this); } else { - _delegate = new AMQConnectionDelegate_0_8(this); + // We always assume that the broker supports the lates AMQ protocol verions + // thie is currently 0.10 + // TODO: use this code once we have switch to 0.10 + // getDelegate(); + _delegate = new AMQConnectionDelegate_0_10(this); } if (_logger.isInfoEnabled()) @@ -299,7 +284,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } - _failoverPolicy = new FailoverPolicy(connectionURL); _protocolHandler = new AMQProtocolHandler(this); // We are not currently connected @@ -316,6 +300,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect lastException = null; _connected = true; } + catch (AMQProtocolException pe) + { + if (_logger.isInfoEnabled()) + { + _logger.info(pe.getMessage()); + _logger.info("Trying broker supported protocol version: " + + TransportConstants.getVersionMajor() + "." + + TransportConstants.getVersionMinor()); + } + // we need to check whether we have a delegate for the supported protocol + getDelegate(); + } catch (Exception e) { lastException = e; @@ -383,6 +379,26 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _connectionMetaData = new QpidConnectionMetaData(this); } + private void getDelegate() throws AMQProtocolException + { + try + { + Class c = Class.forName("org.apache.qpid.client.AMQConnectionDelegate_" + + TransportConstants.getVersionMajor() + "_" + + TransportConstants.getVersionMinor()); + Class partypes[] = new Class[1]; + partypes[0] = AMQConnection.class; + _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); + } + catch (Exception e) + { + throw new AMQProtocolException(AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR, + "Protocol: " + TransportConstants.getVersionMajor() + "." + + TransportConstants.getVersionMinor() + " is rquired by the broker but is not " + + "currently supported by this client library implementation", e); + } + } + protected AMQConnection(String username, String password, String clientName, String virtualHost) { _clientName = clientName; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index bf1ed49492..bde60c433f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -6,6 +6,7 @@ import javax.jms.JMSException; import javax.jms.XASession; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQProtocolException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.jms.BrokerDetails; @@ -14,6 +15,7 @@ import org.apache.qpidity.nclient.Client; import org.apache.qpidity.nclient.ClosedListener; import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; +import org.apache.qpidity.ProtocolException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,6 +115,10 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed _conn.getUsername(), _conn.getPassword()); _qpidConnection.setClosedListener(this); } + catch(ProtocolException pe) + { + throw new AMQProtocolException(null, pe.getMessage(), pe); + } catch (QpidException e) { throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java index dfc87e21b1..3fcec67fe1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java @@ -37,6 +37,7 @@ import javax.naming.spi.ObjectFactory; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpidity.transport.TransportConstants; public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, @@ -429,9 +430,10 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF */ public XAConnection createXAConnection() throws JMSException { - if (_connectionDetails.getURLVersion() == ConnectionURL.URL_0_8) + if (TransportConstants.getVersionMajor() == 0 && + TransportConstants.getVersionMinor() == 8) { - throw new UnsupportedOperationException("This version does not support XA operations"); + throw new UnsupportedOperationException("This protocol version does not support XA operations"); } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java index ae9a5ff802..770fab7a81 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java @@ -24,10 +24,8 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.net.MalformedURLException; -import org.apache.qpid.client.url.URLParser_0_8; -import org.apache.qpid.client.url.URLParser_0_10; +import org.apache.qpid.client.url.URLParser; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionURL; @@ -53,7 +51,6 @@ public class AMQConnectionURL implements ConnectionURL private AMQShortString _defaultTopicExchangeName; private AMQShortString _temporaryTopicExchangeName; private AMQShortString _temporaryQueueExchangeName; - private byte _urlVersion; public AMQConnectionURL(String fullURL) throws URLSyntaxException { @@ -62,48 +59,7 @@ public class AMQConnectionURL implements ConnectionURL _options = new HashMap<String, String>(); _brokers = new LinkedList<BrokerDetails>(); _failoverOptions = new HashMap<String, String>(); - - if (!Boolean.getBoolean("SwitchCon")) - { - // We need to decided the version based on URL - if (fullURL.startsWith("qpid")) - { - //URLParser - URLParser_0_10 parser = null; - try - { - parser = new URLParser_0_10(fullURL); - } - catch (MalformedURLException e) - { - throw new URLSyntaxException(fullURL,e.getMessage(),0,0); - } - setBrokerDetails(parser.getAllBrokerDetails()); - // use the first instance username and password - // This is temporary as the URL must be changed for olding this information as part of the full URL - BrokerDetails firstBroker = getBrokerDetails(0); - setUsername(firstBroker.getProperty(BrokerDetails.USERNAME)); - setPassword(firstBroker.getProperty(BrokerDetails.PASSWORD)); - setClientName(firstBroker.getProperty(BrokerDetails.CLIENT_ID)); - setVirtualHost(firstBroker.getProperty(BrokerDetails.VIRTUAL_HOST)); - _urlVersion = URL_0_10; - } - else - { - new URLParser_0_8(this); - _urlVersion = URL_0_8; - } - } - } - - public byte getURLVersion() - { - return _urlVersion; - } - - public void setURLVersion(byte version) - { - _urlVersion = version; + new URLParser(this); } public String getURL() diff --git a/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_8.java b/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java index 5cde4d196a..b975713ad7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java @@ -11,11 +11,11 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.URLHelper; import org.apache.qpid.url.URLSyntaxException; -public class URLParser_0_8 +public class URLParser { private AMQConnectionURL _url; - public URLParser_0_8(AMQConnectionURL url)throws URLSyntaxException + public URLParser(AMQConnectionURL url)throws URLSyntaxException { _url = url; parseURL(_url.getURL()); diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index 7cae7f8a9f..8ce302564b 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -43,11 +43,7 @@ public interface ConnectionURL public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange"; public static final byte URL_0_8 = 1; public static final byte URL_0_10 = 2; - - byte getURLVersion(); - - void setURLVersion(byte version); - + String getURL(); String getFailoverMethod(); diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java index 3944e2c3f3..8b787615e4 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java @@ -11,6 +11,7 @@ import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.url.QpidURL; import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; +import org.apache.qpidity.ProtocolException; import org.apache.qpidity.nclient.impl.ClientSession; import org.apache.qpidity.nclient.impl.ClientSessionDelegate; import org.apache.qpidity.transport.Channel; @@ -49,14 +50,14 @@ public class Client implements org.apache.qpidity.nclient.Connection public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException { - Condition negotiationComplete = _lock.newCondition(); + final Condition negotiationComplete = _lock.newCondition(); closeOk = _lock.newCondition(); _lock.lock(); ConnectionDelegate connectionDelegate = new ConnectionDelegate() { private boolean receivedClose = false; - + private String _unsupportedProtocol; public SessionDelegate getSessionDelegate() { return new ClientSessionDelegate(); @@ -115,6 +116,32 @@ public class Client implements org.apache.qpidity.nclient.Connection this.receivedClose = true; } + + @Override public void init(Channel ch, ProtocolHeader hdr) + { + // TODO: once the merge is done we'll need to update this code + // for handling 0.8 protocol version type i.e. major=8 and minor=0 :( + if (hdr.getMajor() != TransportConstants.getVersionMajor() + || hdr.getMinor() != TransportConstants.getVersionMinor()) + { + _unsupportedProtocol = TransportConstants.getVersionMajor() + "." + + TransportConstants.getVersionMinor(); + TransportConstants.setVersionMajor( hdr.getMajor() ); + TransportConstants.setVersionMinor( hdr.getMinor() ); + _lock.lock(); + negotiationComplete.signalAll(); + _lock.unlock(); + } + else + { + ch.connectionStart(hdr.getMajor(), hdr.getMinor(), null, "PLAIN", "utf8"); + } + } + + @Override public String getUnsupportedProtocol() + { + return _unsupportedProtocol; + } }; connectionDelegate.setCondition(_lock,negotiationComplete); @@ -123,8 +150,7 @@ public class Client implements org.apache.qpidity.nclient.Connection connectionDelegate.setVirtualHost(virtualHost); if (System.getProperty("transport","mina").equalsIgnoreCase("nio")) - { - System.out.println("Using NIO"); + { if( _logger.isDebugEnabled()) { _logger.debug("using NIO"); @@ -142,13 +168,21 @@ public class Client implements org.apache.qpidity.nclient.Connection } // XXX: hardcoded version numbers - _conn.send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.CONNECTION_VERSION_MAJOR, TransportConstants.CONNECTION_VERSION_MINOR))); + _conn.send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.getVersionMajor(), + TransportConstants.getVersionMinor()))); try { negotiationComplete.await(); + if( connectionDelegate.getUnsupportedProtocol() != null ) + { + _conn.close(); + throw new ProtocolException("Unsupported protocol version: " + connectionDelegate.getUnsupportedProtocol() + , ErrorCode.UNSUPPORTED_PROTOCOL, null); + + } } - catch (Exception e) + catch (InterruptedException e) { // } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/tests.properties b/java/client/src/test/java/org/apache/qpid/test/unit/tests.properties index 893958949f..32ed16a392 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/tests.properties +++ b/java/client/src/test/java/org/apache/qpid/test/unit/tests.properties @@ -23,7 +23,9 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF # register some connection factories # connectionfactory.[jndiname] = [ConnectionURL] -connectionfactory.local = qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672 +connectionfactory.local = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672' +#qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672 + # register some queues in JNDI using the form # queue.[jndiName] = [physicalName] diff --git a/java/common/src/main/java/org/apache/qpid/AMQProtocolException.java b/java/common/src/main/java/org/apache/qpid/AMQProtocolException.java new file mode 100644 index 0000000000..bbc569839a --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQProtocolException.java @@ -0,0 +1,38 @@ +package org.apache.qpid; + +import org.apache.qpid.protocol.AMQConstant; + +/* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +public class AMQProtocolException extends AMQException +{ + /** + * Constructor for a Protocol Exception + * <p> This is the only provided constructor and the parameters have to be + * set to null when they are unknown. + * + * @param msg A description of the reason of this exception . + * @param errorCode A string specifyin the error code of this exception. + * @param cause The linked Execption. + */ + public AMQProtocolException(AMQConstant errorCode, String msg, Throwable cause) + { + super(errorCode, msg, cause); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java index 375df2a45d..8dee790a9e 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java @@ -153,6 +153,15 @@ public final class AMQConstant public static final AMQConstant FRAME_MIN_SIZE = new AMQConstant(4096, "frame min size", true); + /** + * The server does not support the protocol version + */ + public static final AMQConstant UNSUPPORTED_BROKER_PROTOCOL_ERROR = new AMQConstant(542, "broker unsupported protocol", true); + /** + * The client imp does not support the protocol version + */ + public static final AMQConstant UNSUPPORTED_CLIENT_PROTOCOL_ERROR = new AMQConstant(543, "client unsupported protocol", true); + /** The AMQP status code. */ private int _code; diff --git a/java/common/src/main/java/org/apache/qpidity/ErrorCode.java b/java/common/src/main/java/org/apache/qpidity/ErrorCode.java index 4ff6939139..4b18c46d16 100644 --- a/java/common/src/main/java/org/apache/qpidity/ErrorCode.java +++ b/java/common/src/main/java/org/apache/qpidity/ErrorCode.java @@ -6,6 +6,7 @@ public enum ErrorCode UNDEFINED(1,"undefined",true), MESSAGE_REJECTED(2,"message_rejected",true), CONNECTION_ERROR(3,"connection was closed",true), + UNSUPPORTED_PROTOCOL(4, "protocol version is unsupported", true), //This might change in the spec, the error class is not applicable NO_ERROR(200,"reply-success",true), diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolException.java b/java/common/src/main/java/org/apache/qpidity/ProtocolException.java new file mode 100644 index 0000000000..596143a1b9 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/ProtocolException.java @@ -0,0 +1,36 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpidity; + +public class ProtocolException extends QpidException +{ + /** + * Constructor for a Ptotocol Exception. + * <p> This is the only provided constructor and the parameters have to be set to null when + * they are unknown. + * @param message A description of the reason of this exception. + * @param errorCode A string specifyin the error code of this exception. + * @param cause The linked Execption. + * + */ + public ProtocolException(String message, ErrorCode errorCode, Throwable cause) + { + super(message, errorCode, cause); + } +} diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java index e455be0873..10b68bbb20 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java @@ -75,7 +75,9 @@ class ToyClient extends SessionDelegate } public void closed() {} }); - conn.send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.CONNECTION_VERSION_MAJOR, TransportConstants.CONNECTION_VERSION_MINOR))); + conn.send(new ConnectionEvent(0, new ProtocolHeader(1, + TransportConstants.getVersionMajor(), + TransportConstants.getVersionMinor()))); Channel ch = conn.getChannel(0); Session ssn = new Session(); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java index 21c7b8c16b..4815f1025f 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java @@ -83,7 +83,8 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> if (hdr.getMajor() != 0 && hdr.getMinor() != 10) { // XXX - ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.CONNECTION_VERSION_MAJOR, TransportConstants.CONNECTION_VERSION_MINOR))); + ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.getVersionMajor(), + TransportConstants.getVersionMinor()))); ch.getConnection().close(); } else @@ -282,4 +283,9 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> { _virtualHost = host; } + + public String getUnsupportedProtocol() + { + return null; + } } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java b/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java index 47f7f17578..54429a1a4f 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java @@ -2,6 +2,26 @@ package org.apache.qpidity.transport; public class TransportConstants { - public static final byte CONNECTION_VERSION_MAJOR = 99; - public static final byte CONNECTION_VERSION_MINOR = 0; + private static byte _protocol_version_minor = 0; + private static byte _protocol_version_major = 99; + + public static void setVersionMajor(byte value) + { + _protocol_version_major = value; + } + + public static void setVersionMinor(byte value) + { + _protocol_version_minor = value; + } + + public static byte getVersionMajor() + { + return _protocol_version_major; + } + + public static byte getVersionMinor() + { + return _protocol_version_minor; + } } diff --git a/java/test-provider.properties b/java/test-provider.properties index 8dcba7230f..38cc146ae6 100644 --- a/java/test-provider.properties +++ b/java/test-provider.properties @@ -1,4 +1,4 @@ -connectionfactory.local = qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672 +connectionfactory.local = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672' queue.MyQueue = example.MyQueue queue.xaQueue = xaQueue |