diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-10-14 22:29:03 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-10-14 22:29:03 +0000 |
commit | 390b6b48b9808a1862b48b3e324451b95f465ed4 (patch) | |
tree | 8190d53d1c3d3022690be7a0b39dd848331a74e0 | |
parent | c77eff7738c4bd8424fd2205788a3f34ace13df5 (diff) | |
download | qpid-python-address-refactor.tar.gz |
QPID-3401 Checking the proposed changes into a branch to preserve history & continue working until such time it's accepted into trunk.address-refactor
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor@1183532 13f79535-47bb-0310-9956-ffa450edef68
51 files changed, 2577 insertions, 1893 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/Drain.java b/java/client/example/src/main/java/org/apache/qpid/example/Drain.java index b43031ad23..e2db7c37b1 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/Drain.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/Drain.java @@ -26,9 +26,7 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; -import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQConnection; -import org.slf4j.Logger; +import org.apache.qpid.client.AddressBasedDestination; public class Drain extends OptionParser { @@ -66,7 +64,7 @@ public class Drain extends OptionParser Connection con = createConnection(); con.start(); Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE); - Destination dest = new AMQAnyDestination(address); + Destination dest = new AddressBasedDestination(address); MessageConsumer consumer = ssn.createConsumer(dest); Message msg; diff --git a/java/client/example/src/main/java/org/apache/qpid/example/MapReceiver.java b/java/client/example/src/main/java/org/apache/qpid/example/MapReceiver.java index 89db04f8d3..ae71a121de 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/MapReceiver.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/MapReceiver.java @@ -27,8 +27,8 @@ import javax.jms.MapMessage; import javax.jms.MessageConsumer; import javax.jms.Session; -import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AddressBasedDestination; public class MapReceiver { @@ -41,7 +41,7 @@ public class MapReceiver { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}"); + Destination queue = new AddressBasedDestination("message_queue; {create: always}"); MessageConsumer consumer = session.createConsumer(queue); MapMessage m = (MapMessage)consumer.receive(); diff --git a/java/client/example/src/main/java/org/apache/qpid/example/MapSender.java b/java/client/example/src/main/java/org/apache/qpid/example/MapSender.java index 0ce9383add..fa85c00a51 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/MapSender.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/MapSender.java @@ -33,8 +33,8 @@ import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; -import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AddressBasedDestination; public class MapSender { @@ -45,7 +45,7 @@ public class MapSender { new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}"); + Destination queue = new AddressBasedDestination("ADDR:message_queue; {create: always}"); MessageProducer producer = session.createProducer(queue); MapMessage m = session.createMapMessage(); diff --git a/java/client/example/src/main/java/org/apache/qpid/example/Spout.java b/java/client/example/src/main/java/org/apache/qpid/example/Spout.java index 5da319a658..6e7b7c85ba 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/Spout.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/Spout.java @@ -27,7 +27,7 @@ import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; -import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AddressBasedDestination; public class Spout extends OptionParser { @@ -87,7 +87,7 @@ public class Spout extends OptionParser Connection con = createConnection(); con.start(); Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE); - Destination dest = new AMQAnyDestination(address); + Destination dest = new AddressBasedDestination(address); MessageProducer producer = ssn.createProducer(dest); int count = Integer.parseInt(getOp(COUNT)); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java deleted file mode 100644 index 999b22299c..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client; - -import java.net.URISyntaxException; - -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.Topic; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.messaging.Address; -import org.apache.qpid.url.BindingURL; - -/** - * In order to support JMS 1.0 the Qpid implementation maps the - * direct exchange to JMS Queue and topic exchange to JMS Topic. - * - * The JMS 1.1 spec provides a javax.Destination as an abstraction - * to represent any type of destination. - * The abstract class AMQDestination has most of the functionality - * to support any destination defined in AMQP 0-10 spec. - */ -public class AMQAnyDestination extends AMQDestination implements Queue, Topic -{ - public AMQAnyDestination(BindingURL binding) - { - super(binding); - } - - public AMQAnyDestination(String str) throws URISyntaxException - { - super(str); - } - - public AMQAnyDestination(Address addr) throws Exception - { - super(addr); - } - - public AMQAnyDestination(AMQShortString exchangeName,AMQShortString exchangeClass, - AMQShortString routingKey,boolean isExclusive, - boolean isAutoDelete, AMQShortString queueName, - boolean isDurable, AMQShortString[] bindingKeys) - { - super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable, bindingKeys); - } - - @Override - public boolean isNameRequired() - { - return getAMQQueueName() == null; - } - - public String getTopicName() throws JMSException - { - if (getRoutingKey() != null) - { - return getRoutingKey().asString(); - } - else if (getSubject() != null) - { - return getSubject(); - } - else - { - return null; - } - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index acd46da11a..64b3623029 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -20,22 +20,16 @@ */ package org.apache.qpid.client; -import java.net.URISyntaxException; -import java.util.Map; - import javax.jms.Destination; +import javax.jms.JMSException; import javax.naming.NamingException; import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; -import org.apache.qpid.client.messaging.address.AddressHelper; -import org.apache.qpid.client.messaging.address.Link; -import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.messaging.Address; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLHelper; @@ -57,10 +51,8 @@ public abstract class AMQDestination implements Destination, Referenceable protected boolean _isAutoDelete; - private boolean _browseOnly; + protected boolean _browseOnly; - private boolean _isAddressResolved; - private AMQShortString _queueName; private AMQShortString _routingKey; @@ -99,39 +91,12 @@ public abstract class AMQDestination implements Destination, Referenceable " should be one of {BURL|ADDR}"); } } - } - - public enum AddressOption { - ALWAYS, NEVER, SENDER, RECEIVER; - - public static AddressOption getOption(String str) - { - if ("always".equals(str)) return ALWAYS; - else if ("never".equals(str)) return NEVER; - else if ("sender".equals(str)) return SENDER; - else if ("receiver".equals(str)) return RECEIVER; - else throw new IllegalArgumentException(str + " is not an allowed value"); - } - } + } protected final static DestSyntax defaultDestSyntax; - protected DestSyntax _destSyntax = DestSyntax.ADDR; - - protected AddressHelper _addrHelper; - protected Address _address; - protected int _addressType = AMQDestination.UNKNOWN_TYPE; - protected String _name; - protected String _subject; - protected AddressOption _create = AddressOption.NEVER; - protected AddressOption _assert = AddressOption.NEVER; - protected AddressOption _delete = AddressOption.NEVER; - - protected Node _targetNode; - protected Node _sourceNode; - protected Link _targetLink; - protected Link _link; - + protected DestSyntax _destSyntax = DestSyntax.BURL; + // ----- / Fields required to support new address syntax ------- static @@ -148,14 +113,6 @@ public abstract class AMQDestination implements Destination, Referenceable return defaultDestSyntax; } - protected AMQDestination(Address address) throws Exception - { - this._address = address; - getInfoFromAddress(); - _destSyntax = DestSyntax.ADDR; - _logger.debug("Based on " + address + " the selected destination syntax is " + _destSyntax); - } - public static DestSyntax getDestType(String str) { if (str.startsWith("BURL:") || @@ -181,30 +138,8 @@ public abstract class AMQDestination implements Destination, Referenceable } } - protected AMQDestination(String str) throws URISyntaxException - { - _destSyntax = getDestType(str); - str = stripSyntaxPrefix(str); - if (_destSyntax == DestSyntax.BURL) - { - getInfoFromBindingURL(new AMQBindingURL(str)); - } - else - { - this._address = createAddressFromString(str); - try - { - getInfoFromAddress(); - } - catch(Exception e) - { - URISyntaxException ex = new URISyntaxException(str,"Error parsing address"); - ex.initCause(e); - throw ex; - } - } - _logger.debug("Based on " + str + " the selected destination syntax is " + _destSyntax); - } + // Used by the AddressBasedDestination impl + protected AMQDestination() {} //retained for legacy support protected AMQDestination(BindingURL binding) @@ -400,15 +335,7 @@ public abstract class AMQDestination implements Destination, Referenceable public String toString() { - if (_destSyntax == DestSyntax.BURL) - { - return toURL(); - } - else - { - return _address.toString(); - } - + return toURL(); } public boolean isCheckedForQueueBinding() @@ -559,7 +486,7 @@ public abstract class AMQDestination implements Destination, Referenceable null); // factory location } - public static Destination createDestination(BindingURL binding) + public static Destination createDestination(BindingURL binding) throws JMSException { AMQShortString type = binding.getExchangeClass(); @@ -575,9 +502,13 @@ public abstract class AMQDestination implements Destination, Referenceable { return new AMQHeadersExchange(binding); } + else if (type.equals(ExchangeDefaults.FANOUT_EXCHANGE_CLASS)) + { + return new AMQQueue(binding); + } else { - return new AMQAnyDestination(binding); + throw new JMSException("Unsupported exchange type"); } } @@ -591,249 +522,36 @@ public abstract class AMQDestination implements Destination, Referenceable } else { - Address address = createAddressFromString(str); - return new AMQAnyDestination(address); + return new AddressBasedDestination(str); } } - // ----- new address syntax ----------- - - public static class Binding + public boolean isBrowseOnly() { - String exchange; - String bindingKey; - String queue; - Map<String,Object> args; - - public Binding(String exchange, - String queue, - String bindingKey, - Map<String,Object> args) - { - this.exchange = exchange; - this.queue = queue; - this.bindingKey = bindingKey; - this.args = args; - } - - public String getExchange() + return _browseOnly; + } + + public long getConsumerCapacity(AMQSession ssn) throws Exception + { + if (ssn.prefetch()) { - return exchange; + return ssn.getAMQConnection().getMaxPrefetch(); } - - public String getQueue() + else { - return queue; + return 0; } - - public String getBindingKey() + } + + public long getProducerCapacity(AMQSession ssn) throws Exception + { + if (ssn.prefetch()) { - return bindingKey; + return ssn.getAMQConnection().getMaxPrefetch(); } - - public Map<String, Object> getArgs() + else { - return args; + return 0; } } - - public Address getAddress() { - return _address; - } - - protected void setAddress(Address addr) { - _address = addr; - } - - public int getAddressType(){ - return _addressType; - } - - public void setAddressType(int addressType){ - _addressType = addressType; - } - - public String getAddressName() { - return _name; - } - - public void setAddressName(String name){ - _name = name; - } - - public String getSubject() { - return _subject; - } - - public void setSubject(String subject) { - _subject = subject; - } - - public AddressOption getCreate() { - return _create; - } - - public void setCreate(AddressOption option) { - _create = option; - } - - public AddressOption getAssert() { - return _assert; - } - - public void setAssert(AddressOption option) { - _assert = option; - } - - public AddressOption getDelete() { - return _delete; - } - - public void setDelete(AddressOption option) { - _delete = option; - } - - public Node getTargetNode() - { - return _targetNode; - } - - public void setTargetNode(Node node) - { - _targetNode = node; - } - - public Node getSourceNode() - { - return _sourceNode; - } - - public void setSourceNode(Node node) - { - _sourceNode = node; - } - - public Link getLink() - { - return _link; - } - - public void setLink(Link link) - { - _link = link; - } - - public void setExchangeName(AMQShortString name) - { - this._exchangeName = name; - } - - public void setExchangeClass(AMQShortString type) - { - this._exchangeClass = type; - } - - public void setRoutingKey(AMQShortString rk) - { - this._routingKey = rk; - } - - public boolean isAddressResolved() - { - return _isAddressResolved; - } - - public void setAddressResolved(boolean addressResolved) - { - _isAddressResolved = addressResolved; - } - - private static Address createAddressFromString(String str) - { - return Address.parse(str); - } - - private void getInfoFromAddress() throws Exception - { - _name = _address.getName(); - _subject = _address.getSubject(); - - _addrHelper = new AddressHelper(_address); - - _create = _addrHelper.getCreate() != null ? - AddressOption.getOption(_addrHelper.getCreate()):AddressOption.NEVER; - - _assert = _addrHelper.getAssert() != null ? - AddressOption.getOption(_addrHelper.getAssert()):AddressOption.NEVER; - - _delete = _addrHelper.getDelete() != null ? - AddressOption.getOption(_addrHelper.getDelete()):AddressOption.NEVER; - - _browseOnly = _addrHelper.isBrowseOnly(); - - _addressType = _addrHelper.getTargetNodeType(); - _targetNode = _addrHelper.getTargetNode(_addressType); - _sourceNode = _addrHelper.getSourceNode(_addressType); - _link = _addrHelper.getLink(); - } - - // This method is needed if we didn't know the node type at the beginning. - // Therefore we have to query the broker to figure out the type. - // Once the type is known we look for the necessary properties. - public void rebuildTargetAndSourceNodes(int addressType) - { - _targetNode = _addrHelper.getTargetNode(addressType); - _sourceNode = _addrHelper.getSourceNode(addressType); - } - - // ----- / new address syntax ----------- - - public boolean isBrowseOnly() - { - return _browseOnly; - } - - private void setBrowseOnly(boolean b) - { - _browseOnly = b; - } - - public AMQDestination copyDestination() - { - AMQDestination dest = - new AMQAnyDestination(_exchangeName, - _exchangeClass, - _routingKey, - _isExclusive, - _isAutoDelete, - _queueName, - _isDurable, - _bindingKeys - ); - - dest.setDestSyntax(_destSyntax); - dest.setAddress(_address); - dest.setAddressName(_name); - dest.setSubject(_subject); - dest.setCreate(_create); - dest.setAssert(_assert); - dest.setDelete(_create); - dest.setBrowseOnly(_browseOnly); - dest.setAddressType(_addressType); - dest.setTargetNode(_targetNode); - dest.setSourceNode(_sourceNode); - dest.setLink(_link); - dest.setAddressResolved(_isAddressResolved); - return dest; - } - - protected void setAutoDelete(boolean b) - { - _isAutoDelete = b; - } - - protected void setDurable(boolean b) - { - _isDurable = b; - } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java index 5bd1bd629a..a1a5a3c7a6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java @@ -30,12 +30,6 @@ import org.apache.qpid.url.BindingURL; public class AMQQueue extends AMQDestination implements Queue { - - public AMQQueue(String address) throws URISyntaxException - { - super(address); - } - /** * Create a reference to a non temporary queue using a BindingURL object. * Note this does not actually imply the queue exists. @@ -149,6 +143,12 @@ public class AMQQueue extends AMQDestination implements Queue super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, exclusive, autoDelete, queueName, durable, bindingKeys); } + + public AMQQueue(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable,AMQShortString[] bindingKeys) + { + super(exchangeName, exchangeClass, routingKey, exclusive, + autoDelete, queueName, durable, bindingKeys); + } public AMQShortString getRoutingKey() { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index d34290e007..c058ceb624 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -98,6 +98,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.url.AMQBindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -976,7 +977,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null, + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, isTopic(destination), null, null, isBrowseOnlyDestination(destination), false); } @@ -984,7 +985,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, isTopic(destination), messageSelector, null, isBrowseOnlyDestination(destination), false); } @@ -993,7 +994,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic), + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, isTopic(destination), messageSelector, null, isBrowseOnlyDestination(destination), false); } @@ -1034,30 +1035,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkNotClosed(); Topic origTopic = checkValidTopic(topic, true); - AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); - if (dest.getDestSyntax() == DestSyntax.ADDR && - !dest.isAddressResolved()) + // The check valid Topic will throw an exception if topic is not an instanceof + // AMQTopic or AddressBasedTopc. + Topic dest; + if (topic instanceof AMQTopic) { - try - { - handleAddressBasedDestination(dest,false,true); - if (dest.getAddressType() != AMQDestination.TOPIC_TYPE) - { - throw new JMSException("Durable subscribers can only be created for Topics"); - } - dest.getSourceNode().setDurable(true); - } - catch(AMQException e) - { - JMSException ex = new JMSException("Error when verifying destination"); - ex.initCause(e); - ex.setLinkedException(e); - throw ex; - } - catch(TransportException e) - { - throw toJMSException("Error when verifying destination", e); - } + dest = AMQTopic.createDurableTopic(origTopic, name, _connection); + } + else + { + dest = AddressBasedTopic.createDurableTopic(origTopic, name, _connection, this); } String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector; @@ -1071,7 +1058,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (subscriber == null) { // After the address is resolved routing key will not be null. - AMQShortString topicName = dest.getRoutingKey(); + AMQShortString topicName = new AMQShortString(dest.getTopicName()); if (_strictAMQP) { @@ -1085,7 +1072,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic + "' for creation durableSubscriber. Requesting queue deletion regardless."); } - deleteQueue(dest.getAMQQueueName()); + if (topic instanceof AMQTopic) + { + deleteQueue(((AMQTopic)dest).getAMQQueueName()); + } + else + { + ((AddressBasedTopic)topic).deleteSubscription(this); + } } else { @@ -1099,13 +1093,34 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec // says we must trash the subscription. - boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()); + AMQShortString exchangeName; + AMQShortString queueName; + + if (topic instanceof AMQTopic) + { + exchangeName = ((AMQTopic)dest).getExchangeName(); + queueName = ((AMQTopic)dest).getAMQQueueName(); + + } + else + { + exchangeName = new AMQShortString(((AddressBasedTopic)topic).getAddress().getName()); + queueName = new AMQShortString(((AddressBasedTopic)topic).getAddress().getSubject()); + } + boolean isQueueBound = isQueueBound(exchangeName, queueName); boolean isQueueBoundForTopicAndSelector = - isQueueBound(dest.getExchangeName().asString(), dest.getAMQQueueName().asString(), topicName.asString(), args); + isQueueBound(exchangeName.asString(), queueName.asString(), topicName.asString(), args); if (isQueueBound && !isQueueBoundForTopicAndSelector) { - deleteQueue(dest.getAMQQueueName()); + if (topic instanceof AMQTopic) + { + deleteQueue(((AMQTopic)dest).getAMQQueueName()); + } + else + { + ((AddressBasedTopic)topic).deleteSubscription(this); + } } } } @@ -1217,36 +1232,34 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public Queue createQueue(String queueName) throws JMSException { checkNotClosed(); - try + DestSyntax syntax = AMQDestination.getDestType(queueName); + queueName = AMQDestination.stripSyntaxPrefix(queueName); + if (syntax == AMQDestination.DestSyntax.BURL) { - if (queueName.indexOf('/') == -1 && queueName.indexOf(';') == -1) + if (queueName.indexOf('/') == -1) { - DestSyntax syntax = AMQDestination.getDestType(queueName); - if (syntax == AMQDestination.DestSyntax.BURL) - { - // For testing we may want to use the prefix - return new AMQQueue(getDefaultQueueExchangeName(), - new AMQShortString(AMQDestination.stripSyntaxPrefix(queueName))); - } - else - { - AMQQueue queue = new AMQQueue(queueName); - return queue; - - } + return new AMQQueue(getDefaultQueueExchangeName(), + new AMQShortString(queueName)); } else { - return new AMQQueue(queueName); + try + { + return new AMQQueue(new AMQBindingURL(queueName)); + } + catch (URISyntaxException urlse) + { + _logger.error("", urlse); + JMSException jmse = new JMSException(urlse.getReason()); + jmse.setLinkedException(urlse); + jmse.initCause(urlse); + throw jmse; + } } } - catch (URISyntaxException urlse) + else { - _logger.error("", urlse); - JMSException jmse = new JMSException(urlse.getReason()); - jmse.setLinkedException(urlse); - jmse.initCause(urlse); - throw jmse; + return new AddressBasedQueue(queueName); } } @@ -1511,36 +1524,39 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public Topic createTopic(String topicName) throws JMSException { checkNotClosed(); - try + DestSyntax syntax = AMQDestination.getDestType(topicName); + topicName = AMQDestination.stripSyntaxPrefix(topicName); + if (syntax == AMQDestination.DestSyntax.BURL) { - if (topicName.indexOf('/') == -1 && topicName.indexOf(';') == -1) + if (topicName.indexOf('/') == -1) + { + return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName)); + } + else { - DestSyntax syntax = AMQDestination.getDestType(topicName); - // for testing we may want to use the prefix to indicate our choice. - topicName = AMQDestination.stripSyntaxPrefix(topicName); - if (syntax == AMQDestination.DestSyntax.BURL) + try { - return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName)); + return new AMQTopic(new AMQBindingURL(topicName)); } - else + catch (URISyntaxException urlse) { - return new AMQTopic("ADDR:" + getDefaultTopicExchangeName() + "/" + topicName); + _logger.error("", urlse); + JMSException jmse = new JMSException(urlse.getReason()); + jmse.setLinkedException(urlse); + jmse.initCause(urlse); + throw jmse; } } - else - { - return new AMQTopic(topicName); - } - } - catch (URISyntaxException urlse) + else { - _logger.error("", urlse); - JMSException jmse = new JMSException(urlse.getReason()); - jmse.setLinkedException(urlse); - jmse.initCause(urlse); - throw jmse; - } + if (topicName.indexOf('/') == -1 && topicName.indexOf(';') == -1) + { + topicName = getDefaultTopicExchangeName() + "/" + topicName; + } + + return new AddressBasedTopic(topicName); + } } public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException @@ -2463,7 +2479,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic ("Cannot create a durable subscription with a temporary topic: " + topic); } - if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic)) + if (!(topic instanceof AMQTopic || topic instanceof AddressBasedTopic)) { throw new javax.jms.InvalidDestinationException( "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " @@ -2872,11 +2888,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic AMQProtocolHandler protocolHandler = getProtocolHandler(); - if (amqd.getDestSyntax() == DestSyntax.ADDR) - { - handleAddressBasedDestination(amqd,true,nowait); - } - else + if (amqd.getDestSyntax() == DestSyntax.BURL) { if (DECLARE_EXCHANGES) { @@ -2932,10 +2944,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throw new AMQException(null, "Fail-over exception interrupted basic consume.", e); } } - - public abstract void handleAddressBasedDestination(AMQDestination dest, - boolean isConsumer, - boolean noWait) throws AMQException; private void registerProducer(long producerId, MessageProducer producer) { @@ -3565,4 +3573,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _logger.debug("Rollback mark is set to " + _rollbackMark.get()); } } + + private boolean isTopic(Destination dest) + { + if (dest instanceof AMQDestination) + { + return ((AMQDestination)dest).isTopic(); + } + else + { + return dest instanceof Topic; + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 3812e612aa..40c9113a2d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -23,11 +23,8 @@ import static org.apache.qpid.transport.Option.SYNC; import static org.apache.qpid.transport.Option.UNRELIABLE; import java.lang.ref.WeakReference; -import java.util.ArrayList; -import java.util.Collections; import java.util.Date; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Timer; import java.util.TimerTask; @@ -37,8 +34,6 @@ import javax.jms.Destination; import javax.jms.JMSException; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination.AddressOption; -import org.apache.qpid.client.AMQDestination.Binding; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; @@ -47,18 +42,15 @@ import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.FieldTableSupport; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_10; -import org.apache.qpid.client.messaging.address.Link; -import org.apache.qpid.client.messaging.address.Link.Reliability; -import org.apache.qpid.client.messaging.address.Node.ExchangeNode; -import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.messaging.address.AddressResolver; +import org.apache.qpid.messaging.address.amqp_0_10.SubscriptionSettings_0_10; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.ExchangeBoundResult; import org.apache.qpid.transport.ExchangeQueryResult; -import org.apache.qpid.transport.ExecutionErrorCode; import org.apache.qpid.transport.ExecutionException; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; @@ -143,7 +135,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * USed to store the range of in tx messages */ private RangeSet _txRangeSet = new RangeSet(); - private int _txSize = 0; + private int _txSize = 0; + private AddressResolver addressResolver; //--- constructors /** @@ -345,31 +338,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } else { - List<Binding> bindings = new ArrayList<Binding>(); - bindings.addAll(destination.getSourceNode().getBindings()); - bindings.addAll(destination.getTargetNode().getBindings()); - - String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ? - destination.getAddressName(): "amq.topic"; - - for (Binding binding: bindings) - { - String queue = binding.getQueue() == null? - queueName.asString(): binding.getQueue(); - - String exchange = binding.getExchange() == null ? - defaultExchange : - binding.getExchange(); - - _logger.debug("Binding queue : " + queue + - " exchange: " + exchange + - " using binding key " + binding.getBindingKey() + - " with args " + printMap(binding.getArgs())); - getQpidSession().exchangeBind(queue, - exchange, - binding.getBindingKey(), - binding.getArgs()); - } + // do nothing atm + // when creating a producer or consumer the create/assert method should be invokved + // for consumers create and delete subscriptions should be called in the constructor and close() + // when closing them the delete method should be invoked } if (!nowait) @@ -573,45 +545,51 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { boolean preAcquire; - long capacity = getCapacity(consumer.getDestination()); + long capacity; + try + { + capacity = consumer.getDestination().getConsumerCapacity(this); + } + catch (Exception e1) + { + AMQException ex = new AMQException(AMQConstant.INTERNAL_ERROR, "Error retrieving capacity",e1); + throw ex; + } try { - boolean isTopic; + boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE; Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments()); if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL) { - isTopic = consumer.getDestination() instanceof AMQTopic || + boolean isTopic = consumer.getDestination() instanceof AMQTopic || consumer.getDestination().getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS) ; preAcquire = isTopic || (!consumer.isNoConsume() && (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals(""))); + + getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag), + acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, + preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, + null, 0, arguments, + consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } else { - isTopic = consumer.getDestination().getAddressType() == AMQDestination.TOPIC_TYPE; - + AddressBasedDestination dest = (AddressBasedDestination)consumer.getDestination(); preAcquire = !consumer.isNoConsume() && - (isTopic || consumer.getMessageSelector() == null || + (dest.isTopic() || consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")); - arguments.putAll( - (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs()); + SubscriptionSettings_0_10 settings = new SubscriptionSettings_0_10(); + settings.setAcceptMode(acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT); + settings.setAccquireMode(preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED); + settings.setArgs(arguments); + settings.setMessageSelector(messageSelector); + settings.setSubscriptionTag(String.valueOf(tag)); + dest.createSubscription(this,settings); } - - boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE; - - if (consumer.getDestination().getLink() != null) - { - acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE; - } - - getQpidSession().messageSubscribe - (queueName.toString(), String.valueOf(tag), - acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, - preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments, - consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } catch (JMSException e) { @@ -646,21 +624,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - private long getCapacity(AMQDestination destination) - { - long capacity = 0; - if (destination.getDestSyntax() == DestSyntax.ADDR && - destination.getLink().getConsumerCapacity() > 0) - { - capacity = destination.getLink().getConsumerCapacity(); - } - else if (prefetch()) - { - capacity = getAMQConnection().getMaxPrefetch(); - } - return capacity; - } - /** * Create an 0_10 message producer */ @@ -775,12 +738,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } else { - QueueNode node = (QueueNode)amqd.getSourceNode(); - getQpidSession().queueDeclare(queueName.toString(), node.getAlternateExchange() , - node.getDeclareArgs(), - node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, - node.isDurable() ? Option.DURABLE : Option.NONE, - node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + // do nothing } // passive --> false @@ -825,7 +783,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic //only set if msg list is null try { - long capacity = getCapacity(consumer.getDestination()); + long capacity = consumer.getDestination().getConsumerCapacity(this); if (capacity == 0) { @@ -1056,317 +1014,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { return AMQMessageDelegateFactory.FACTORY_0_10; } - - public boolean isExchangeExist(AMQDestination dest,ExchangeNode node,boolean assertNode) - { - boolean match = true; - ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get(); - match = !result.getNotFound(); - - if (match) - { - if (assertNode) - { - match = (result.getDurable() == node.isDurable()) && - (node.getExchangeType() != null && - node.getExchangeType().equals(result.getType())) && - (matchProps(result.getArguments(),node.getDeclareArgs())); - } - else if (node.getExchangeType() != null) - { - // even if assert is false, better to verify this - match = node.getExchangeType().equals(result.getType()); - if (!match) - { - _logger.debug("Exchange type doesn't match. Expected : " + node.getExchangeType() + - " actual " + result.getType()); - } - } - else - { - _logger.debug("Setting Exchange type " + result.getType()); - node.setExchangeType(result.getType()); - dest.setExchangeClass(new AMQShortString(result.getType())); - } - } - - return match; - } - - public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException - { - boolean match = true; - try - { - QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get(); - match = dest.getAddressName().equals(result.getQueue()); - - if (match && assertNode) - { - match = (result.getDurable() == node.isDurable()) && - (result.getAutoDelete() == node.isAutoDelete()) && - (result.getExclusive() == node.isExclusive()) && - (matchProps(result.getArguments(),node.getDeclareArgs())); - } - else if (match) - { - // should I use the queried details to update the local data structure. - } - } - catch(SessionException e) - { - if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED) - { - match = false; - } - else - { - throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()), - "Error querying queue",e); - } - } - - return match; - } - - private boolean matchProps(Map<String,Object> target,Map<String,Object> source) - { - boolean match = true; - for (String key: source.keySet()) - { - match = target.containsKey(key) && - target.get(key).equals(source.get(key)); - - if (!match) - { - StringBuffer buf = new StringBuffer(); - buf.append("Property given in address did not match with the args sent by the broker."); - buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, "); - buf.append(" Actual { ").append(key).append(" : ").append(target.get(key)).append(" }"); - _logger.debug(buf.toString()); - return match; - } - } - - return match; - } - - /** - * 1. Try to resolve the address type (queue or exchange) - * 2. if type == queue, - * 2.1 verify queue exists or create if create == true - * 2.2 If not throw exception - * - * 3. if type == exchange, - * 3.1 verify exchange exists or create if create == true - * 3.2 if not throw exception - * 3.3 if exchange exists (or created) create subscription queue. - */ - - @SuppressWarnings("deprecation") - public void handleAddressBasedDestination(AMQDestination dest, - boolean isConsumer, - boolean noWait) throws AMQException - { - if (dest.isAddressResolved()) - { - if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType()) - { - createSubscriptionQueue(dest); - } - } - else - { - boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) || - (isConsumer && dest.getAssert() == AddressOption.RECEIVER) || - (!isConsumer && dest.getAssert() == AddressOption.SENDER); - - boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) || - (isConsumer && dest.getCreate() == AddressOption.RECEIVER) || - (!isConsumer && dest.getCreate() == AddressOption.SENDER); - - - - int type = resolveAddressType(dest); - - if (type == AMQDestination.QUEUE_TYPE && - dest.getLink().getReliability() == Reliability.UNSPECIFIED) - { - dest.getLink().setReliability(Reliability.AT_LEAST_ONCE); - } - else if (type == AMQDestination.TOPIC_TYPE && - dest.getLink().getReliability() == Reliability.UNSPECIFIED) - { - dest.getLink().setReliability(Reliability.UNRELIABLE); - } - else if (type == AMQDestination.TOPIC_TYPE && - dest.getLink().getReliability() == Reliability.AT_LEAST_ONCE) - { - throw new AMQException("AT-LEAST-ONCE is not yet supported for Topics"); - } - - switch (type) - { - case AMQDestination.QUEUE_TYPE: - { - if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode)) - { - setLegacyFiledsForQueueType(dest); - break; - } - else if(createNode) - { - setLegacyFiledsForQueueType(dest); - send0_10QueueDeclare(dest,null,false,noWait); - sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), - null,dest.getExchangeName(),dest, false); - break; - } - } - - case AMQDestination.TOPIC_TYPE: - { - if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode)) - { - setLegacyFiledsForTopicType(dest); - verifySubject(dest); - if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) - { - createSubscriptionQueue(dest); - } - break; - } - else if(createNode) - { - setLegacyFiledsForTopicType(dest); - verifySubject(dest); - sendExchangeDeclare(dest.getAddressName(), - dest.getExchangeClass().asString(), - dest.getTargetNode().getAlternateExchange(), - dest.getTargetNode().getDeclareArgs(), - false); - if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) - { - createSubscriptionQueue(dest); - } - break; - } - } - - default: - throw new AMQException( - "The name '" + dest.getAddressName() + - "' supplied in the address doesn't resolve to an exchange or a queue"); - } - dest.setAddressResolved(true); - } - } - - public int resolveAddressType(AMQDestination dest) throws AMQException - { - int type = dest.getAddressType(); - String name = dest.getAddressName(); - if (type != AMQDestination.UNKNOWN_TYPE) - { - return type; - } - else - { - ExchangeBoundResult result = getQpidSession().exchangeBound(name,name,null,null).get(); - if (result.getQueueNotFound() && result.getExchangeNotFound()) { - //neither a queue nor an exchange exists with that name; treat it as a queue - type = AMQDestination.QUEUE_TYPE; - } else if (result.getExchangeNotFound()) { - //name refers to a queue - type = AMQDestination.QUEUE_TYPE; - } else if (result.getQueueNotFound()) { - //name refers to an exchange - type = AMQDestination.TOPIC_TYPE; - } else { - //both a queue and exchange exist for that name - throw new AMQException("Ambiguous address, please specify queue or topic as node type"); - } - dest.setAddressType(type); - dest.rebuildTargetAndSourceNodes(type); - return type; - } - } - - private void verifySubject(AMQDestination dest) throws AMQException - { - if (dest.getSubject() == null || dest.getSubject().trim().equals("")) - { - - if ("topic".equals(dest.getExchangeClass().toString())) - { - dest.setRoutingKey(new AMQShortString("#")); - dest.setSubject(dest.getRoutingKey().toString()); - } - else - { - dest.setRoutingKey(new AMQShortString("")); - dest.setSubject(""); - } - } - } - - private void createSubscriptionQueue(AMQDestination dest) throws AMQException - { - QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null - - if (dest.getQueueName() == null) - { - if (dest.getLink() != null && dest.getLink().getName() != null) - { - dest.setQueueName(new AMQShortString(dest.getLink().getName())); - } - } - node.setExclusive(true); - node.setAutoDelete(!node.isDurable()); - send0_10QueueDeclare(dest,null,false,true); - node.addBinding(new Binding(dest.getAddressName(), - dest.getQueueName(),// should have one by now - dest.getSubject(), - Collections.<String,Object>emptyMap())); - sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), - null,dest.getExchangeName(),dest, false); - } - - public void setLegacyFiledsForQueueType(AMQDestination dest) - { - // legacy support - dest.setQueueName(new AMQShortString(dest.getAddressName())); - dest.setExchangeName(new AMQShortString("")); - dest.setExchangeClass(new AMQShortString("")); - dest.setRoutingKey(dest.getAMQQueueName()); - } - - public void setLegacyFiledsForTopicType(AMQDestination dest) - { - // legacy support - dest.setExchangeName(new AMQShortString(dest.getAddressName())); - ExchangeNode node = (ExchangeNode)dest.getTargetNode(); - dest.setExchangeClass(node.getExchangeType() == null? - ExchangeDefaults.TOPIC_EXCHANGE_CLASS: - new AMQShortString(node.getExchangeType())); - dest.setRoutingKey(new AMQShortString(dest.getSubject())); - } - - /** This should be moved to a suitable utility class */ - private String printMap(Map<String,Object> map) - { - StringBuilder sb = new StringBuilder(); - sb.append("<"); - if (map != null) - { - for(String key : map.keySet()) - { - sb.append(key).append(" = ").append(map.get(key)).append(" "); - } - } - sb.append(">"); - return sb.toString(); - } protected void acknowledgeImpl() { @@ -1389,4 +1036,16 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic _highestDeliveryTag.set(-1); super.resubscribe(); } + + public boolean isQueueExist(String queue) + { + QueueQueryResult result = _qpidSession.queueQuery(queue, Option.NONE).get(); + return queue.equals(result.getQueue()); + } + + public boolean isExchangeExist(String exchange) + { + ExchangeQueryResult result = _qpidSession.exchangeQuery(exchange, Option.NONE).get(); + return !result.getNotFound(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 369c8a6e9d..5c599392c2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -590,14 +590,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B { declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false); } - - public void handleAddressBasedDestination(AMQDestination dest, - boolean isConsumer, - boolean noWait) throws AMQException - { - throw new UnsupportedOperationException("The new addressing based sytanx is " - + "not supported for AMQP 0-8/0-9 versions"); - } protected void flushAcknowledgments() { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 780dbcafc2..54736b08f6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -33,16 +33,7 @@ import org.apache.qpid.url.BindingURL; public class AMQTopic extends AMQDestination implements Topic { - public AMQTopic(String address) throws URISyntaxException - { - super(address); - } - - public AMQTopic(Address address) throws Exception - { - super(address); - } - + /** * Constructor for use in creating a topic using a BindingURL. * @@ -102,37 +93,8 @@ public class AMQTopic extends AMQDestination implements Topic if (topic instanceof AMQDestination && topic instanceof javax.jms.Topic) { AMQDestination qpidTopic = (AMQDestination)topic; - if (qpidTopic.getDestSyntax() == DestSyntax.ADDR) - { - try - { - AMQTopic t = new AMQTopic(qpidTopic.getAddress()); - AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection); - // link is never null if dest was created using an address string. - t.getLink().setName(queueName.asString()); - t.getSourceNode().setAutoDelete(false); - t.getSourceNode().setDurable(true); - - // The legacy fields are also populated just in case. - t.setQueueName(queueName); - t.setAutoDelete(false); - t.setDurable(true); - return t; - } - catch(Exception e) - { - JMSException ex = new JMSException("Error creating durable topic"); - ex.initCause(e); - ex.setLinkedException(e); - throw ex; - } - } - else - { return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getRoutingKey(), false, - getDurableTopicQueueName(subscriptionName, connection), - true); - } + getDurableTopicQueueName(subscriptionName, connection),true); } else { @@ -151,10 +113,6 @@ public class AMQTopic extends AMQDestination implements Topic { return getRoutingKey().asString(); } - else if (getSubject() != null) - { - return getSubject(); - } else { return null; @@ -164,14 +122,7 @@ public class AMQTopic extends AMQDestination implements Topic @Override public AMQShortString getExchangeName() { - if (super.getExchangeName() == null && super.getAddressName() != null) - { - return new AMQShortString(super.getAddressName()); - } - else - { - return _exchangeName; - } + return _exchangeName; } public AMQShortString getRoutingKey() @@ -180,15 +131,9 @@ public class AMQTopic extends AMQDestination implements Topic { return super.getRoutingKey(); } - else if (getSubject() != null) - { - return new AMQShortString(getSubject()); - } else { - setRoutingKey(new AMQShortString("")); - setSubject(""); - return super.getRoutingKey(); + return null; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 5fba351d8a..7721722748 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -17,27 +17,36 @@ */ package org.apache.qpid.client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.qpid.client.AMQDestination.AddressOption; -import org.apache.qpid.client.AMQDestination.DestSyntax; -import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.client.message.*; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInternalException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.*; -import org.apache.qpid.filter.MessageFilter; -import org.apache.qpid.filter.JMSSelectorFilter; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; -import java.util.Iterator; -import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.AMQMessageDelegate_0_10; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.UnprocessedMessage_0_10; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.filter.JMSSelectorFilter; +import org.apache.qpid.filter.MessageFilter; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.messaging.QpidDestination.CheckMode; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.transport.Acquired; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.TransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This is a 0.10 message consumer. @@ -83,6 +92,16 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose); _0_10session = (AMQSession_0_10) session; + + if (AMQDestination.DestSyntax.ADDR == destination.getDestSyntax()) + { + AddressBasedDestination addrDest = (AddressBasedDestination)destination; + addrDest.resolveAddress((AMQSession_0_10)session); + addrDest.create((AMQSession_0_10)session,CheckMode.FOR_RECEIVER); + addrDest.azzert((AMQSession_0_10)session,CheckMode.FOR_RECEIVER); + // ideally we should be invoking addrDest.createSubscription() here; + } + if (messageSelector != null && !messageSelector.equals("")) { try @@ -93,33 +112,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { throw new InvalidSelectorException("cannot create consumer because of selector issue"); } + if (destination instanceof AMQQueue) { _preAcquire = false; } } - // Destination setting overrides connection defaults - if (destination.getDestSyntax() == DestSyntax.ADDR && - destination.getLink().getConsumerCapacity() > 0) + try { - capacity = destination.getLink().getConsumerCapacity(); + capacity = destination.getConsumerCapacity(session); } - else if (getSession().prefetch()) + catch(Exception e) { - capacity = _0_10session.getAMQConnection().getMaxPrefetch(); - } - - if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) - { - boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ; - - if (!namedQueue) - { - _destination = destination.copyDestination(); - _destination.setQueueName(null); - } - } + JMSException ex = new JMSException("Error retrieving capacity"); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } } @@ -473,22 +483,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM public boolean isExclusive() { - AMQDestination dest = this.getDestination(); - if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) - { - if (dest.getAddressType() == AMQDestination.TOPIC_TYPE) - { - return true; - } - else - { - return dest.getLink().getSubscription().isExclusive(); - } - } - else - { - return _exclusive; - } + return _exclusive; } void cleanupQueue() throws AMQException, FailoverException @@ -496,11 +491,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM AMQDestination dest = this.getDestination(); if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) { - if (dest.getDelete() == AddressOption.ALWAYS || - dest.getDelete() == AddressOption.RECEIVER ) + try + { + ((AddressBasedDestination)dest).delete((AMQSession_0_10)getSession(),CheckMode.FOR_RECEIVER); + } + catch(Exception e) { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - this.getDestination().getQueueName()); + AMQException ex = new AMQException(AMQConstant.INTERNAL_ERROR,"Error deleting queue",e); + throw ex; } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 57f64c2f92..d2a88bcc52 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -31,13 +31,13 @@ import javax.jms.JMSException; import javax.jms.Message; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.QpidMessageProperties; -import org.apache.qpid.client.messaging.address.Link.Reliability; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.messaging.QpidDestination.CheckMode; +import org.apache.qpid.messaging.address.Link.Reliability; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; @@ -86,7 +86,10 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer { try { - getSession().handleAddressBasedDestination(destination,false,false); + AddressBasedDestination addrDest = (AddressBasedDestination)destination; + addrDest.resolveAddress((AMQSession_0_10)getSession()); + addrDest.create((AMQSession_0_10)getSession(),CheckMode.FOR_SENDER); + addrDest.azzert((AMQSession_0_10)getSession(),CheckMode.FOR_SENDER); } catch(Exception e) { @@ -165,11 +168,13 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer deliveryProp.setPriority(MessageDeliveryPriority.get((short) priority)); message.setJMSPriority(priority); } + String exchangeName = destination.getExchangeName() == null ? "" : destination.getExchangeName().toString(); if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(exchangeName)) { deliveryProp.setExchange(exchangeName); } + String routingKey = destination.getRoutingKey().toString(); if (deliveryProp.getRoutingKey() == null || ! deliveryProp.getRoutingKey().equals(routingKey)) { @@ -177,7 +182,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer } if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR && - (destination.getSubject() != null || + (((AddressBasedDestination)destination).getAddress().getSubject() != null || (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT) != null)) ) { @@ -191,10 +196,11 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer if (appProps.get(QpidMessageProperties.QPID_SUBJECT) == null) { // use default subject in address string - appProps.put(QpidMessageProperties.QPID_SUBJECT,destination.getSubject()); + appProps.put(QpidMessageProperties.QPID_SUBJECT, + ((AddressBasedDestination)destination).getAddress().getSubject()); } - if (destination.getAddressType() == AMQDestination.TOPIC_TYPE) + if (((AddressBasedDestination)destination).isTopic()) { deliveryProp.setRoutingKey((String) messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT)); @@ -218,8 +224,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer deliveryMode == DeliveryMode.PERSISTENT) ); - boolean unreliable = (destination.getDestSyntax() == DestSyntax.ADDR) && - (destination.getLink().getReliability() == Reliability.UNRELIABLE); + boolean unreliable = false; //(destination.getDestSyntax() == DestSyntax.ADDR) && + // (destination.getLink().getReliability() == Reliability.UNRELIABLE); ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.slice(); @@ -258,19 +264,15 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer AMQDestination dest = _destination; if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) { - if (dest.getDelete() == AddressOption.ALWAYS || - dest.getDelete() == AddressOption.SENDER ) + try { - try - { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - _destination.getQueueName()); - } - catch(TransportException e) - { - throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); - } + ((AddressBasedDestination)dest).delete((AMQSession_0_10)getSession(),CheckMode.FOR_SENDER); } + catch(TransportException e) + { + throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index f360b546b2..c6c36b9afe 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -42,6 +42,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQPInvalidClassException; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.client.AddressBasedDestination; import org.apache.qpid.client.CustomJMSXProperty; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.Message; @@ -271,13 +272,14 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate private Destination convertToAddressBasedDestination(String exchange, String routingKey, String subject) { String addr; - if ("".equals(exchange)) // type Queue + if ("".equals(exchange)) // type Queue and the routing key is the Queue name. { subject = (subject == null) ? "" : "/" + subject; addr = routingKey + subject; } else { + // routing key is the subject here. addr = exchange + "/" + routingKey; } @@ -315,36 +317,14 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate if (amqd.getDestSyntax() == AMQDestination.DestSyntax.ADDR) { - try - { - int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd); - if (type == AMQDestination.QUEUE_TYPE) - { - ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForQueueType(amqd); - } - else - { - ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForTopicType(amqd); - } - } - catch(AMQException ex) - { - JMSException e = new JMSException("Error occured while figuring out the node type"); - e.initCause(ex); - e.setLinkedException(ex); - throw e; - } - catch (TransportException e) - { - JMSException jmse = new JMSException("Exception occured while figuring out the node type:" + e.getMessage()); - jmse.initCause(e); - jmse.setLinkedException(e); - throw jmse; - } - + AddressBasedDestination dest = (AddressBasedDestination)amqd; + dest.resolveAddress((AMQSession_0_10)_session); } - final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString()); + String exchangeName = amqd.getExchangeName().asString(); + String routingKey = amqd.getRoutingKey().asString(); + + final ReplyTo replyTo = new ReplyTo(exchangeName, routingKey); _destinationCache.put(replyTo, new SoftReference<Destination>(destination)); _messageProps.setReplyTo(replyTo); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java index 1b6c0c751d..424e7d7cc0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java @@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap; import javax.jms.JMSException; import javax.jms.Session; -import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; @@ -85,7 +84,7 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate } /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */ - private AMQSession<?,?> _session; + protected AMQSession<?,?> _session; private final long _deliveryTag; protected AbstractAMQMessageDelegate(long deliveryTag) @@ -132,14 +131,17 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate } else { - dest = new AMQAnyDestination(exchange, - new AMQShortString(exchangeInfo.exchangeType), - routingKey, - false, - false, - routingKey, - false, - new AMQShortString[] {routingKey}); + // This is to cater to fanout, match and nameless exchange types. + // This method is only used if the syntax is BURL. + // See AMQMessageDelegate_0_10.java for more details. + dest = new AMQQueue(exchange, + new AMQShortString(exchangeInfo.exchangeType), + routingKey, + routingKey, + false, + false, + false, + new AMQShortString[] {routingKey}); } return dest; diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java deleted file mode 100644 index 368ec60525..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ /dev/null @@ -1,332 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.messaging.address; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQDestination.Binding; -import org.apache.qpid.client.messaging.address.Link.Reliability; -import org.apache.qpid.client.messaging.address.Link.Subscription; -import org.apache.qpid.client.messaging.address.Node.ExchangeNode; -import org.apache.qpid.client.messaging.address.Node.QueueNode; -import org.apache.qpid.client.messaging.address.Node.UnknownNodeType; -import org.apache.qpid.configuration.Accessor; -import org.apache.qpid.configuration.Accessor.MapAccessor; -import org.apache.qpid.messaging.Address; - -/** - * Utility class for extracting information from the address class - */ -public class AddressHelper -{ - public static final String NODE = "node"; - public static final String LINK = "link"; - public static final String X_DECLARE = "x-declare"; - public static final String X_BINDINGS = "x-bindings"; - public static final String X_SUBSCRIBE = "x-subscribes"; - public static final String CREATE = "create"; - public static final String ASSERT = "assert"; - public static final String DELETE = "delete"; - public static final String FILTER = "filter"; - public static final String NO_LOCAL = "no-local"; - public static final String DURABLE = "durable"; - public static final String EXCLUSIVE = "exclusive"; - public static final String AUTO_DELETE = "auto-delete"; - public static final String TYPE = "type"; - public static final String ALT_EXCHANGE = "alternate-exchange"; - public static final String BINDINGS = "bindings"; - public static final String BROWSE = "browse"; - public static final String MODE = "mode"; - public static final String CAPACITY = "capacity"; - public static final String CAPACITY_SOURCE = "source"; - public static final String CAPACITY_TARGET = "target"; - public static final String NAME = "name"; - public static final String EXCHANGE = "exchange"; - public static final String QUEUE = "queue"; - public static final String KEY = "key"; - public static final String ARGUMENTS = "arguments"; - public static final String RELIABILITY = "reliability"; - - private Address address; - private Accessor addressProps; - private Accessor nodeProps; - private Accessor linkProps; - - public AddressHelper(Address address) - { - this.address = address; - addressProps = new MapAccessor(address.getOptions()); - Map node_props = address.getOptions() == null - || address.getOptions().get(NODE) == null ? null - : (Map) address.getOptions().get(NODE); - - if (node_props != null) - { - nodeProps = new MapAccessor(node_props); - } - - Map link_props = address.getOptions() == null - || address.getOptions().get(LINK) == null ? null - : (Map) address.getOptions().get(LINK); - - if (link_props != null) - { - linkProps = new MapAccessor(link_props); - } - } - - public String getCreate() - { - return addressProps.getString(CREATE); - } - - public String getAssert() - { - return addressProps.getString(ASSERT); - } - - public String getDelete() - { - return addressProps.getString(DELETE); - } - - public boolean isNoLocal() - { - Boolean b = nodeProps.getBoolean(NO_LOCAL); - return b == null ? false : b; - } - - public boolean isBrowseOnly() - { - String mode = addressProps.getString(MODE); - return mode != null && mode.equals(BROWSE) ? true : false; - } - - @SuppressWarnings("unchecked") - public List<Binding> getBindings(Map props) - { - List<Binding> bindings = new ArrayList<Binding>(); - List<Map> bindingList = (List<Map>) props.get(X_BINDINGS); - if (bindingList != null) - { - for (Map bindingMap : bindingList) - { - Binding binding = new Binding( - (String) bindingMap.get(EXCHANGE), - (String) bindingMap.get(QUEUE), - (String) bindingMap.get(KEY), - bindingMap.get(ARGUMENTS) == null ? Collections.EMPTY_MAP - : (Map<String, Object>) bindingMap - .get(ARGUMENTS)); - bindings.add(binding); - } - } - return bindings; - } - - public Map getDeclareArgs(Map props) - { - if (props != null && props.get(X_DECLARE) != null) - { - return (Map) props.get(X_DECLARE); - - } else - { - return Collections.EMPTY_MAP; - } - } - - public int getTargetNodeType() throws Exception - { - if (nodeProps == null || nodeProps.getString(TYPE) == null) - { - // need to query and figure out - return AMQDestination.UNKNOWN_TYPE; - } else if (nodeProps.getString(TYPE).equals("queue")) - { - return AMQDestination.QUEUE_TYPE; - } else if (nodeProps.getString(TYPE).equals("topic")) - { - return AMQDestination.TOPIC_TYPE; - } else - { - throw new Exception("unkown exchange type"); - } - } - - public Node getTargetNode(int addressType) - { - // target node here is the default exchange - if (nodeProps == null || addressType == AMQDestination.QUEUE_TYPE) - { - return new ExchangeNode(); - } else if (addressType == AMQDestination.TOPIC_TYPE) - { - Map node = (Map) address.getOptions().get(NODE); - return createExchangeNode(node); - } else - { - // don't know yet - return null; - } - } - - private Node createExchangeNode(Map parent) - { - Map declareArgs = getDeclareArgs(parent); - MapAccessor argsMap = new MapAccessor(declareArgs); - ExchangeNode node = new ExchangeNode(); - node.setExchangeType(argsMap.getString(TYPE) == null ? null : argsMap - .getString(TYPE)); - fillInCommonNodeArgs(node, parent, argsMap); - return node; - } - - private Node createQueueNode(Map parent) - { - Map declareArgs = getDeclareArgs(parent); - MapAccessor argsMap = new MapAccessor(declareArgs); - QueueNode node = new QueueNode(); - node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE)); - node.setExclusive(argsMap.getBoolean(EXCLUSIVE) == null ? false - : argsMap.getBoolean(EXCLUSIVE)); - fillInCommonNodeArgs(node, parent, argsMap); - - return node; - } - - private void fillInCommonNodeArgs(Node node, Map parent, MapAccessor argsMap) - { - node.setDurable(getDurability(parent)); - node.setAutoDelete(argsMap.getBoolean(AUTO_DELETE) == null ? false - : argsMap.getBoolean(AUTO_DELETE)); - node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE)); - node.setBindings(getBindings(parent)); - if (getDeclareArgs(parent).containsKey(ARGUMENTS)) - { - node.setDeclareArgs((Map<String,Object>)getDeclareArgs(parent).get(ARGUMENTS)); - } - } - - private boolean getDurability(Map map) - { - Accessor access = new MapAccessor(map); - Boolean result = access.getBoolean(DURABLE); - return (result == null) ? false : result.booleanValue(); - } - - /** - * if the type == queue x-declare args from the node props is used. if the - * type == exchange x-declare args from the link props is used else just - * create a default temp queue. - */ - public Node getSourceNode(int addressType) - { - if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null) - { - return createQueueNode((Map) address.getOptions().get(NODE)); - } - if (addressType == AMQDestination.TOPIC_TYPE && linkProps != null) - { - return createQueueNode((Map) address.getOptions().get(LINK)); - } else - { - // need to query the info - return new QueueNode(); - } - } - - public Link getLink() throws Exception - { - Link link = new Link(); - link.setSubscription(new Subscription()); - if (linkProps != null) - { - link.setDurable(linkProps.getBoolean(DURABLE) == null ? false - : linkProps.getBoolean(DURABLE)); - link.setName(linkProps.getString(NAME)); - - String reliability = linkProps.getString(RELIABILITY); - if ( reliability != null) - { - if (reliability.equalsIgnoreCase("unreliable")) - { - link.setReliability(Reliability.UNRELIABLE); - } - else if (reliability.equalsIgnoreCase("at-least-once")) - { - link.setReliability(Reliability.AT_LEAST_ONCE); - } - else - { - throw new Exception("The reliability mode '" + - reliability + "' is not yet supported"); - } - - } - - if (((Map) address.getOptions().get(LINK)).get(CAPACITY) instanceof Map) - { - MapAccessor capacityProps = new MapAccessor( - (Map) ((Map) address.getOptions().get(LINK)) - .get(CAPACITY)); - link - .setConsumerCapacity(capacityProps - .getInt(CAPACITY_SOURCE) == null ? 0 - : capacityProps.getInt(CAPACITY_SOURCE)); - link - .setProducerCapacity(capacityProps - .getInt(CAPACITY_TARGET) == null ? 0 - : capacityProps.getInt(CAPACITY_TARGET)); - } - else - { - int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps - .getInt(CAPACITY); - link.setConsumerCapacity(cap); - link.setProducerCapacity(cap); - } - link.setFilter(linkProps.getString(FILTER)); - // so far filter type not used - - if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE)) - { - Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE); - - if (x_subscribe.containsKey(ARGUMENTS)) - { - link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS)); - } - - boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ? - Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false; - - link.getSubscription().setExclusive(exclusive); - } - } - - return link; - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java deleted file mode 100644 index 5f97d625b4..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.messaging.address; - -import static org.apache.qpid.client.messaging.address.Link.Reliability.UNSPECIFIED; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.qpid.client.messaging.address.Node.QueueNode; - -public class Link -{ - public enum FilterType { SQL92, XQUERY, SUBJECT } - - public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE, UNSPECIFIED } - - protected String name; - protected String _filter; - protected FilterType _filterType = FilterType.SUBJECT; - protected boolean _isNoLocal; - protected boolean _isDurable; - protected int _consumerCapacity = 0; - protected int _producerCapacity = 0; - protected Node node; - protected Subscription subscription; - protected Reliability reliability = UNSPECIFIED; - - public Reliability getReliability() - { - return reliability; - } - - public void setReliability(Reliability reliability) - { - this.reliability = reliability; - } - - public Node getNode() - { - return node; - } - - public void setNode(Node node) - { - this.node = node; - } - - public boolean isDurable() - { - return _isDurable; - } - - public void setDurable(boolean durable) - { - _isDurable = durable; - } - - public String getFilter() - { - return _filter; - } - - public void setFilter(String filter) - { - this._filter = filter; - } - - public FilterType getFilterType() - { - return _filterType; - } - - public void setFilterType(FilterType type) - { - _filterType = type; - } - - public boolean isNoLocal() - { - return _isNoLocal; - } - - public void setNoLocal(boolean noLocal) - { - _isNoLocal = noLocal; - } - - public int getConsumerCapacity() - { - return _consumerCapacity; - } - - public void setConsumerCapacity(int capacity) - { - _consumerCapacity = capacity; - } - - public int getProducerCapacity() - { - return _producerCapacity; - } - - public void setProducerCapacity(int capacity) - { - _producerCapacity = capacity; - } - - public String getName() - { - return name; - } - - public void setName(String name) - { - this.name = name; - } - - public Subscription getSubscription() - { - return this.subscription; - } - - public void setSubscription(Subscription subscription) - { - this.subscription = subscription; - } - - public static class Subscription - { - private Map<String,Object> args = new HashMap<String,Object>(); - private boolean exclusive = false; - - public Map<String, Object> getArgs() - { - return args; - } - - public void setArgs(Map<String, Object> args) - { - this.args = args; - } - - public boolean isExclusive() - { - return exclusive; - } - - public void setExclusive(boolean exclusive) - { - this.exclusive = exclusive; - } - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java deleted file mode 100644 index c98b194334..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.client.messaging.address; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import javax.naming.OperationNotSupportedException; - -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQDestination.Binding; - -public abstract class Node -{ - protected int _nodeType = AMQDestination.UNKNOWN_TYPE; - protected boolean _isDurable; - protected boolean _isAutoDelete; - protected String _alternateExchange; - protected List<Binding> _bindings = new ArrayList<Binding>(); - protected Map<String,Object> _declareArgs = Collections.emptyMap(); - - public int getType() - { - return _nodeType; - } - - public boolean isDurable() - { - return _isDurable; - } - - public void setDurable(boolean durable) - { - _isDurable = durable; - } - - public boolean isAutoDelete() - { - return _isAutoDelete; - } - - public void setAutoDelete(boolean autoDelete) - { - _isAutoDelete = autoDelete; - } - - public String getAlternateExchange() - { - return _alternateExchange; - } - - public void setAlternateExchange(String altExchange) - { - _alternateExchange = altExchange; - } - - public List<Binding> getBindings() - { - return _bindings; - } - - public void setBindings(List<Binding> bindings) - { - _bindings = bindings; - } - - public void addBinding(Binding binding) { - this._bindings.add(binding); - } - - public Map<String,Object> getDeclareArgs() - { - return _declareArgs; - } - - public void setDeclareArgs(Map<String,Object> options) - { - _declareArgs = options; - } - - public static class QueueNode extends Node - { - protected boolean _isExclusive; - protected QpidQueueOptions _queueOptions = new QpidQueueOptions(); - - public QueueNode() - { - _nodeType = AMQDestination.QUEUE_TYPE; - } - - public boolean isExclusive() - { - return _isExclusive; - } - - public void setExclusive(boolean exclusive) - { - _isExclusive = exclusive; - } - } - - public static class ExchangeNode extends Node - { - protected QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions(); - protected String _exchangeType; - - public ExchangeNode() - { - _nodeType = AMQDestination.TOPIC_TYPE; - } - - public String getExchangeType() - { - return _exchangeType; - } - - public void setExchangeType(String exchangeType) - { - _exchangeType = exchangeType; - } - - } - - public static class UnknownNodeType extends Node - { - } -} diff --git a/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java b/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java index cb3ab718e9..6a221130bd 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java +++ b/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java @@ -30,8 +30,8 @@ import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; -import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQBrokerDetails; +import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Connection; @@ -97,11 +97,12 @@ public class FailoverExchangeMethod implements FailoverMethod, MessageListener { _ssn = _conn.createSession(false,Session.AUTO_ACKNOWLEDGE); MessageConsumer cons = _ssn.createConsumer( - new AMQAnyDestination(new AMQShortString("amq.failover"), - new AMQShortString("amq.failover"), - new AMQShortString(""), - true,true,null,false, - new AMQShortString[0])); + new AMQQueue(new AMQShortString("amq.failover"), + new AMQShortString("amq.failover"), + new AMQShortString(""), + new AMQShortString(""), + true,true,false, + new AMQShortString[0])); cons.setMessageListener(this); } } diff --git a/java/client/src/main/java/org/apache/qpid/messaging/QpidDestination.java b/java/client/src/main/java/org/apache/qpid/messaging/QpidDestination.java new file mode 100644 index 0000000000..d021ed959e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/QpidDestination.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging; + +public interface QpidDestination +{ + public enum CheckMode {FOR_RECEIVER, FOR_SENDER}; + + public void checkCreate(Session ssn,CheckMode mode) throws Exception; + + public void checkAssert(Session ssn,CheckMode mode) throws Exception; + + public void checkDelete(Session ssn,CheckMode mode) throws Exception; + + public void createSubscription(Session ssn,SubscriptionSettings settings) throws Exception; + + public void deleteSubscription(Session ssn) throws Exception; + + public String getSubscriptionQueue() throws Exception; + + public long getConsumerCapacity() throws Exception; + + public long getProducerCapacity() throws Exception; +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/QpidQueue.java b/java/client/src/main/java/org/apache/qpid/messaging/QpidQueue.java new file mode 100644 index 0000000000..6bc1846475 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/QpidQueue.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging; + +public abstract class QpidQueue implements QpidDestination +{ + protected String queueName; + + public String getQueueName() + { + return queueName; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/QpidTopic.java b/java/client/src/main/java/org/apache/qpid/messaging/QpidTopic.java new file mode 100644 index 0000000000..05728c1bd3 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/QpidTopic.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging; + +public abstract class QpidTopic implements QpidDestination +{ + protected String topicName; + + public String getTopicName() + { + return topicName; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/Session.java b/java/client/src/main/java/org/apache/qpid/messaging/Session.java new file mode 100644 index 0000000000..bc3d7456fb --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/Session.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging; + +/** + * Currently only a marker interface used primarily + * for address destination refactoring QPID-3401 + * + */ +public interface Session +{ + +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/SubscriptionSettings.java b/java/client/src/main/java/org/apache/qpid/messaging/SubscriptionSettings.java new file mode 100644 index 0000000000..e90d25ec5e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/SubscriptionSettings.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging; + +public interface SubscriptionSettings +{ + +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/AddressException.java b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressException.java new file mode 100644 index 0000000000..c1b2000db0 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressException.java @@ -0,0 +1,14 @@ +package org.apache.qpid.messaging.address; + +public class AddressException extends Exception +{ + public AddressException(String message) + { + super(message); + } + + public AddressException(String message, Throwable cause) + { + super(message,cause); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressHelper.java new file mode 100644 index 0000000000..7f5b1f9cd1 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressHelper.java @@ -0,0 +1,157 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address; + +import static org.apache.qpid.messaging.address.AddressProperty.*; + +import org.apache.qpid.configuration.Accessor; +import org.apache.qpid.configuration.Accessor.NestedMapAccessor; +import org.apache.qpid.messaging.Address; +import org.apache.qpid.messaging.Address.PolicyType; +import org.apache.qpid.messaging.Address.AddressType; +import org.apache.qpid.messaging.QpidDestination.CheckMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AddressHelper +{ + private static final Logger _logger = LoggerFactory.getLogger(AddressHelper.class); + + protected Address address; + protected Accessor addressProps; + + public AddressHelper(Address address) + { + this.address = address; + addressProps = new NestedMapAccessor(address.getOptions()); + } + + public PolicyType getCreate() + { + return PolicyType.getPolicyType(addressProps.getString(CREATE)); + } + + public PolicyType getAssert() + { + return PolicyType.getPolicyType(addressProps.getString(ASSERT)); + } + + public PolicyType getDelete() + { + return PolicyType.getPolicyType(addressProps.getString(DELETE)); + } + + public boolean isBrowseOnly() + { + String mode = addressProps.getString(MODE); + return mode != null && mode.equals(BROWSE) ? true : false; + } + + public AddressType getNodeType() + { + String type = addressProps.getString(getFQN(NODE,TYPE)); + if ("topic".equalsIgnoreCase(type)) + { + return AddressType.TOPIC_ADDRESS; + } + else if ("queue".equalsIgnoreCase(type)) + { + return AddressType.QUEUE_ADDRESS; + } + else + { + return AddressType.UNSPECIFIED; + } + } + + public boolean isNodeDurable() + { + Boolean b = addressProps.getBoolean(getFQN(NODE,DURABLE)); + return b == null ? false : b.booleanValue(); + } + + public String getLinkName() + { + return addressProps.getString(getFQN(LINK,NAME)); + } + + public boolean isLinkDurable() + { + Boolean b = addressProps.getBoolean(getFQN(LINK,DURABLE)); + return b == null ? false : b.booleanValue(); + } + + public String getLinkReliability() + { + return addressProps.getString(getFQN(LINK,RELIABILITY)); + } + + public int getLinkProducerCapacity() + { + return getCapacity(CheckMode.FOR_SENDER); + } + + public int getLinkConsumerCapacity() + { + return getCapacity(CheckMode.FOR_RECEIVER); + } + + private int getCapacity(CheckMode mode) + { + int capacity = 0; + try + { + capacity = addressProps.getInt(getFQN(LINK,CAPACITY)); + } + catch(Exception e) + { + try + { + if (mode == CheckMode.FOR_RECEIVER) + { + capacity = addressProps.getInt(getFQN(LINK,CAPACITY,CAPACITY_SOURCE)); + } + else + { + capacity = addressProps.getInt(getFQN(LINK,CAPACITY,CAPACITY_TARGET)); + } + } + catch(Exception ex) + { + if (ex instanceof NumberFormatException && !ex.getMessage().equals("null")) + { + _logger.info("Unable to retrieve capacity from address: " + address,ex); + } + } + } + + return capacity; + } + + public static boolean isAllowed(PolicyType policy, CheckMode mode) + { + return (policy == PolicyType.ALWAYS || + (policy == PolicyType.RECEIVER && mode == CheckMode.FOR_RECEIVER) || + (policy == PolicyType.SENDER && mode == CheckMode.FOR_SENDER) + ); + + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/AddressProperty.java b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressProperty.java new file mode 100644 index 0000000000..26b0f2e676 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressProperty.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address; + +public final class AddressProperty +{ + public static final String NODE = "node"; + public static final String LINK = "link"; + public static final String CREATE = "create"; + public static final String ASSERT = "assert"; + public static final String DELETE = "delete"; + public static final String FILTER = "filter"; + public static final String NO_LOCAL = "no-local"; + public static final String DURABLE = "durable"; + public static final String TYPE = "type"; + public static final String BROWSE = "browse"; + public static final String MODE = "mode"; + public static final String CAPACITY = "capacity"; + public static final String CAPACITY_SOURCE = "source"; + public static final String CAPACITY_TARGET = "target"; + public static final String NAME = "name"; + public static final String RELIABILITY = "reliability"; + + public static String getFQN(String... propNames) + { + StringBuilder sb = new StringBuilder(); + for(String prop: propNames) + { + sb.append(prop).append("/"); + } + return sb.substring(0, sb.length() -1); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/AddressResolver.java b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressResolver.java new file mode 100644 index 0000000000..d7cf23138b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressResolver.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address; + +import org.apache.qpid.messaging.Address; +import org.apache.qpid.messaging.Address.AddressType; +import org.apache.qpid.messaging.Address.PolicyType; +import org.apache.qpid.messaging.QpidDestination; +public interface AddressResolver +{ + public QpidDestination resolve(Address address) throws AddressException; + +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/messaging/address/Link.java new file mode 100644 index 0000000000..55eb86372a --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/Link.java @@ -0,0 +1,112 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address; + +import static org.apache.qpid.messaging.address.Link.Reliability.UNSPECIFIED; + +public class Link +{ + public enum FilterType { SQL92, XQUERY, SUBJECT } + + public enum Reliability + { + UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE, UNSPECIFIED; + + public static Reliability getReliability(String reliability) throws AddressException + { + if (reliability == null) + { + return UNSPECIFIED; + } + else if (reliability.equalsIgnoreCase("unreliable")) + { + return UNRELIABLE; + } + else if (reliability.equalsIgnoreCase("at-least-once")) + { + return AT_LEAST_ONCE; + } + else + { + throw new AddressException("The reliability mode '" + + reliability + "' is not yet supported"); + } + } + } + + protected String name; + protected String filter; + protected FilterType filterType = FilterType.SUBJECT; + protected boolean noLocal; + protected boolean durable; + protected int consumerCapacity = 0; + protected int producerCapacity = 0; + protected Reliability reliability = UNSPECIFIED; + + public Link(AddressHelper helper) throws AddressException + { + name = helper.getLinkName(); + durable = helper.isLinkDurable(); + reliability = Reliability.getReliability(helper.getLinkReliability()); + consumerCapacity = helper.getLinkConsumerCapacity(); + producerCapacity = helper.getLinkProducerCapacity(); + } + + public Reliability getReliability() + { + return reliability; + } + + public boolean isDurable() + { + return durable; + } + + public String getFilter() + { + return filter; + } + + public FilterType getFilterType() + { + return filterType; + } + + public boolean isNoLocal() + { + return noLocal; + } + + public int getConsumerCapacity() + { + return consumerCapacity; + } + + public int getProducerCapacity() + { + return producerCapacity; + } + + public String getName() + { + return name; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/messaging/address/Node.java new file mode 100644 index 0000000000..29c65f0a1d --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/Node.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address; + +import org.apache.qpid.messaging.Address.PolicyType; + +public class Node +{ + boolean durable = false; + PolicyType createPolicy = PolicyType.NEVER; + PolicyType assertPolicy = PolicyType.NEVER; + PolicyType deletePolicy = PolicyType.NEVER; + + public Node(AddressHelper helper) + { + durable = helper.isNodeDurable(); + createPolicy = helper.getCreate(); + assertPolicy = helper.getAssert(); + deletePolicy = helper.getDelete(); + } + + public boolean isDurable() + { + return durable; + } + + public PolicyType getCreatePolicy() + { + return createPolicy; + } + + public PolicyType getAssertPolicy() + { + return assertPolicy; + } + + public PolicyType getDeletePolicy() + { + return deletePolicy; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressHelper_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressHelper_0_10.java new file mode 100644 index 0000000000..243b2365e5 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressHelper_0_10.java @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import static org.apache.qpid.messaging.address.AddressProperty.*; +import static org.apache.qpid.messaging.address.amqp_0_10.AddressProperty_0_10.*; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.configuration.Accessor; +import org.apache.qpid.configuration.Accessor.MapAccessor; +import org.apache.qpid.messaging.Address; +import org.apache.qpid.messaging.address.AddressHelper; + + +public class AddressHelper_0_10 extends AddressHelper +{ + protected Accessor declareProps; + + public AddressHelper_0_10(Address address) + { + super(address); + } + + // Node properties -------------------- + + public boolean isNodeExclusive() + { + Boolean b = addressProps.getBoolean(getFQN(NODE,X_DECLARE,EXCLUSIVE)); + return b == null ? false : b.booleanValue(); + } + + public boolean isNodeAutoDelete() + { + Boolean b = addressProps.getBoolean(getFQN(NODE,X_DECLARE,AUTO_DELETE)); + return b == null ? false : b.booleanValue(); + } + + public String getNodeAltExchange() + { + return addressProps.getString(getFQN(NODE,X_DECLARE,ALT_EXCHANGE)); + } + + public Map getNodeDeclareArgs() + { + Map map = addressProps.getMap(getFQN(NODE,X_DECLARE,ARGUMENTS)); + return map == null ? Collections.EMPTY_MAP : map; + } + + public String getNodeExchangeType() + { + String type = addressProps.getString(getFQN(NODE,X_DECLARE,EXCHANGE_TYPE)); + return type == null ? "topic" : type; + } + + public List<Binding> getNodeBindings() + { + return getBindings(addressProps.getList(getFQN(NODE,X_BINDINGS))); + } + + // Link properties -------------------- + + public List<Binding> getLinkQueueBindings() + { + return getBindings(addressProps.getList(getFQN(LINK,X_BINDINGS))); + } + + public Map getLinkQueueDeclareArgs() + { + Map map = addressProps.getMap(getFQN(LINK,X_DECLARE,ARGUMENTS)); + return map == null ? Collections.EMPTY_MAP : map; + } + + public Boolean isLinkQueueExclusive() + { + return addressProps.getBoolean(getFQN(LINK,X_DECLARE,EXCLUSIVE)); + } + + public Boolean isLinkQueueAutoDelete() + { + return addressProps.getBoolean(getFQN(LINK,X_DECLARE,AUTO_DELETE)); + } + + public String getLinkQueueAltExchange() + { + return addressProps.getString(getFQN(LINK,X_DECLARE,ALT_EXCHANGE)); + } + + public Boolean isSubscriptionExclusive() + { + return addressProps.getBoolean(getFQN(LINK,X_SUBSCRIBE,EXCLUSIVE)); + } + + public Map getSubscriptionArguments() + { + Map m = addressProps.getMap(getFQN(LINK,X_SUBSCRIBE,ARGUMENTS)); + return m == null ? Collections.EMPTY_MAP : m; + } + + @SuppressWarnings("unchecked") + private List<Binding> getBindings(List<Map> bindingList) + { + List<Binding> bindings = new ArrayList<Binding>(); + if (bindingList != null) + { + for (Map map : bindingList) + { + MapAccessor bindingMap = new MapAccessor(map); + Binding binding = new Binding( + bindingMap.getString(EXCHANGE), + bindingMap.getString(QUEUE), + bindingMap.getString(KEY), + bindingMap.getMap(ARGUMENTS) == null ? Collections.EMPTY_MAP + : bindingMap.getMap(ARGUMENTS)); + bindings.add(binding); + } + } + return bindings; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressProperty_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressProperty_0_10.java new file mode 100644 index 0000000000..8d46c29d4e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressProperty_0_10.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +public final class AddressProperty_0_10 +{ + public static final String X_DECLARE = "x-declare"; + public static final String X_BINDINGS = "x-bindings"; + public static final String X_SUBSCRIBE = "x-subscribes"; + + public static final String EXCLUSIVE = "exclusive"; + public static final String AUTO_DELETE = "auto-delete"; + public static final String ALT_EXCHANGE = "alternate-exchange"; + public static final String EXCHANGE_TYPE = "type"; + + public static final String BINDINGS = "bindings"; + public static final String EXCHANGE = "exchange"; + public static final String QUEUE = "queue"; + public static final String KEY = "key"; + public static final String ARGUMENTS = "arguments"; +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressResolver_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressResolver_0_10.java new file mode 100644 index 0000000000..acd5fbb4d4 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressResolver_0_10.java @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import org.apache.qpid.messaging.Address; +import org.apache.qpid.messaging.Address.AddressType; +import org.apache.qpid.messaging.Address.PolicyType; +import org.apache.qpid.messaging.QpidDestination; +import org.apache.qpid.messaging.address.AddressException; +import org.apache.qpid.messaging.address.AddressResolver; +import org.apache.qpid.transport.ExchangeBoundResult; +import org.apache.qpid.transport.Session; + +public class AddressResolver_0_10 implements AddressResolver +{ + Session ssn; + + public AddressResolver_0_10(Session session) + { + this.ssn = session; + } + + /** + * 1. Check if it's an exchange (topic) or a queue + * 2. Create a QpidTopic or a QpidQueue based on that + */ + public QpidDestination resolve(Address address) throws AddressException + { + AddressHelper_0_10 helper = new AddressHelper_0_10(address); + + if (!address.isResolved()) + { + checkAddressType(address,helper); + address.setResolved(true); + } + + Link_0_10 link = new Link_0_10(helper); + Node_0_10 node = (address.getAddressType() == AddressType.TOPIC_ADDRESS) ? + new ExchangeNode(helper) : new QueueNode(helper); + + try + { + QpidDestination dest = (QpidDestination) ((address.getAddressType() == AddressType.TOPIC_ADDRESS) ? + new QpidTopic_0_10(address,(ExchangeNode)node,link): + new QpidQueue_0_10(address,(QueueNode)node,link)); + + return dest; + } + catch(Exception e) + { + throw new AddressException("Error creating destination impl",e); + } + } + + private void checkAddressType(Address address,AddressHelper_0_10 helper) throws AddressException + { + ExchangeBoundResult result = ssn.exchangeBound(address.getName(),address.getName(), + null,null).get(); + if (result.getQueueNotFound() && result.getExchangeNotFound()) { + //neither a queue nor an exchange exists with that name; + //treat it as a queue unless a type is specified. + if (helper.getNodeType() == AddressType.UNSPECIFIED) + { + address.setAddressType(AddressType.QUEUE_ADDRESS); + } + else + { + address.setAddressType(AddressType.TOPIC_ADDRESS); + } + } + else if (result.getExchangeNotFound()) + { + //name refers to a queue + address.setAddressType(AddressType.QUEUE_ADDRESS); + } + else if (result.getQueueNotFound()) + { + //name refers to an exchange + address.setAddressType(AddressType.TOPIC_ADDRESS); + } + else + { + //both a queue and exchange exist for that name + if (helper.getNodeType() == AddressType.UNSPECIFIED) + { + throw new AddressException("Ambiguous address, please specify a node type. Ex type:{queue|topic}"); + } + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Binding.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Binding.java new file mode 100644 index 0000000000..46c789a4e7 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Binding.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import java.util.Map; + +public class Binding +{ + String exchange; + String bindingKey; + String queue; + Map<String,Object> args; + + public Binding(String exchange, + String queue, + String bindingKey, + Map<String,Object> args) + { + this.exchange = exchange; + this.queue = queue; + this.bindingKey = bindingKey; + this.args = args; + } + + public String getExchange() + { + return exchange; + } + + public String getQueue() + { + return queue; + } + + public String getBindingKey() + { + return bindingKey; + } + + public Map<String, Object> getArgs() + { + return args; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/ExchangeNode.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/ExchangeNode.java new file mode 100644 index 0000000000..633fd3ed24 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/ExchangeNode.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + + +public class ExchangeNode extends Node_0_10 +{ + private String exchangeType; + + public ExchangeNode(AddressHelper_0_10 helper) + { + super(helper); + exchangeType = helper.getNodeExchangeType(); + } + + public String getExchangeType() + { + return exchangeType; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Link_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Link_0_10.java new file mode 100644 index 0000000000..c8df601c48 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Link_0_10.java @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.messaging.address.AddressException; +import org.apache.qpid.messaging.address.Link; + +public class Link_0_10 extends Link +{ + private Subscription subscription; + private List<Binding> bindings; + + private Boolean autoDelete = null; + private String altExchange; + private Boolean exclusive = null; + private Map<String,Object> declareArgs; + + @SuppressWarnings("unchecked") + public Link_0_10(AddressHelper_0_10 helper) throws AddressException + { + super(helper); + autoDelete = helper.isLinkQueueAutoDelete(); + exclusive = helper.isLinkQueueExclusive(); + altExchange = helper.getLinkQueueAltExchange(); + declareArgs = helper.getLinkQueueDeclareArgs(); + bindings = helper.getLinkQueueBindings(); + subscription = new Subscription(); + subscription.setExclusive(helper.isSubscriptionExclusive()); + subscription.setArgs(helper.getSubscriptionArguments()); + } + + public List<Binding> getQueueBindings() + { + return bindings; + } + + public Boolean isQueueAutoDelete() + { + return autoDelete; + } + + public String getQueueAltExchange() + { + return altExchange; + } + + public Boolean isQueueExclusive() + { + return exclusive; + } + + public Map<String, Object> getQueueDeclareArgs() + { + return declareArgs; + } + + + public Subscription getSubscription() + { + return this.subscription; + } + + public static class Subscription + { + private Map<String,Object> args = new HashMap<String,Object>(); + private Boolean exclusive = null; + + public Map<String, Object> getArgs() + { + return args; + } + + public void setArgs(Map<String, Object> args) + { + this.args = args; + } + + public Boolean isExclusive() + { + return exclusive; + } + + public void setExclusive(Boolean exclusive) + { + this.exclusive = exclusive; + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Node_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Node_0_10.java new file mode 100644 index 0000000000..f97b8c11b4 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Node_0_10.java @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import java.util.Map; + +import org.apache.qpid.messaging.address.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Node_0_10 extends Node +{ + private static final Logger _logger = LoggerFactory.getLogger(Node_0_10.class); + + private boolean autoDelete = false; + private String altExchange; + private Map<String,Object> declareArgs; + + public Node_0_10(AddressHelper_0_10 helper) + { + super(helper); + declareArgs = helper.getNodeDeclareArgs(); + autoDelete = helper.isNodeAutoDelete(); + altExchange = helper.getNodeAltExchange(); + } + + public boolean isAutoDelete() + { + return autoDelete; + } + + public String getAltExchange() + { + return altExchange; + } + + public Map<String, Object> getDeclareArgs() + { + return declareArgs; + } + + public boolean matchProps(Map<String,Object> target) + { + boolean match = true; + Map<String,Object> source = declareArgs; + for (String key: source.keySet()) + { + match = target.containsKey(key) && + target.get(key).equals(source.get(key)); + + if (!match) + { + StringBuffer buf = new StringBuffer(); + buf.append("Property given in address did not match with the args sent by the broker."); + buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, "); + buf.append(" Actual { ").append(key).append(" : ").append(target.get(key)).append(" }"); + _logger.debug(buf.toString()); + return match; + } + } + + return match; + } +}
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidQueue_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidQueue_0_10.java new file mode 100644 index 0000000000..4c2f94ae59 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidQueue_0_10.java @@ -0,0 +1,253 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import java.util.Map; + +import org.apache.qpid.AMQException; +import org.apache.qpid.messaging.Address; +import org.apache.qpid.messaging.QpidDestination.CheckMode; +import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.SubscriptionSettings; +import org.apache.qpid.messaging.address.AddressException; +import org.apache.qpid.messaging.address.AddressHelper; +import org.apache.qpid.messaging.address.Link; +import org.apache.qpid.messaging.address.Link.Reliability; +import org.apache.qpid.messaging.amqp_0_10.Session_0_10; +import org.apache.qpid.messaging.QpidQueue; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.transport.ExecutionErrorCode; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.QueueQueryResult; +import org.apache.qpid.transport.SessionException; + +public class QpidQueue_0_10 extends QpidQueue +{ + private Address address; + private QueueNode queue; + private Link_0_10 link; + + public QpidQueue_0_10(Address address,QueueNode queue,Link_0_10 link) throws Exception + { + this.address = address; + this.queue = queue; + this.link = link; + queueName = address.getName(); + } + + @Override + public void checkCreate(Session session,CheckMode mode) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (AddressHelper.isAllowed(queue.getCreatePolicy(), mode)) + { + ssn.queueDeclare(queueName, + queue.getAltExchange(), + queue.getDeclareArgs(), + queue.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + queue.isDurable() ? Option.DURABLE : Option.NONE, + queue.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + + if (queue.getBindings().size() > 0) + { + for (Binding binding: queue.getBindings()) + { + String queue = binding.getQueue() == null? + queueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + "anq.topic" : + binding.getExchange(); + + ssn.exchangeBind(queue, + exchange, + binding.getBindingKey(), + binding.getArgs()); + } + } + } + else + { + try + { + ssn.queueDeclare(queueName, null, null,Option.PASSIVE); + ssn.sync(); + } + catch(SessionException e) + { + if (e.getException() != null + && e.getException().getErrorCode() == ExecutionErrorCode.NOT_FOUND) + { + throw new AddressException("The Queue '" + queueName +"' does not exist",e); + } + else + { + throw e; + } + } + } + } + + @Override + public void checkAssert(Session session,CheckMode mode) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (AddressHelper.isAllowed(queue.getAssertPolicy(), mode)) + { + boolean match = false; + try + { + QueueQueryResult result = ssn.queueQuery(queueName, Option.NONE).get(); + + match = queueName.equals(result.getQueue()) && + (result.getDurable() == queue.isDurable()) && + (result.getAutoDelete() == queue.isAutoDelete()) && + (result.getExclusive() == queue.isExclusive()) && + (queue.matchProps(result.getArguments())); + } + catch(SessionException e) + { + if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED) + { + match = false; + } + else + { + throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()), + "Error querying queue",e); + } + } + if (!match) + { + throw new AddressException("The queue described in the address does not exist on the broker"); + } + } + } + + @Override + public void checkDelete(Session session,CheckMode mode) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (AddressHelper.isAllowed(queue.getDeletePolicy(), mode)) + { + ssn.queueDelete(queueName, Option.NONE); + ssn.sync(); + } + } + + @Override + public void createSubscription(Session session,SubscriptionSettings settings) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (link.getQueueBindings().size() > 0) + { + for (Binding binding: link.getQueueBindings()) + { + String queue = binding.getQueue() == null? + queueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + "amq.topic" : + binding.getExchange(); + + ssn.exchangeBind(queue, + exchange, + binding.getBindingKey(), + binding.getArgs()); + } + } + + SubscriptionSettings_0_10 settings_0_10 = (SubscriptionSettings_0_10)settings; + Map<String,Object> arguments = settings_0_10.getArgs(); + arguments.putAll((Map<? extends String, ? extends Object>)link.getSubscription().getArgs()); + + if (link.getReliability() == Reliability.UNRELIABLE || + link.getReliability() == Reliability.AT_MOST_ONCE) + { + settings_0_10.setAcceptMode(MessageAcceptMode.NONE); + } + + // for queues subscriptions are non exclusive by default. + boolean exclusive = + (link.getSubscription().isExclusive() == null) ? false : link.getSubscription().isExclusive(); + + ssn.messageSubscribe(queueName, + settings_0_10.getSubscriptionTag(), + settings_0_10.getAcceptMode(), + settings_0_10.getAccquireMode(), + null, // resume id + 0, // resume ttl + arguments, + exclusive ? Option.EXCLUSIVE : Option.NONE); + } + + @Override + public void deleteSubscription(Session session) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (link.getQueueBindings().size() > 0) + { + for (Binding binding: link.getQueueBindings()) + { + String queue = binding.getQueue() == null? + queueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + "amq.topic" : + binding.getExchange(); + + ssn.exchangeUnbind(queue, + exchange, + binding.getBindingKey(), + Option.NONE); + } + } + + // ideally we should cancel the subscription here + } + + public String getSubscriptionQueue() + { + return queueName; + } + + public long getConsumerCapacity() + { + return link.getConsumerCapacity(); + } + + public long getProducerCapacity() + { + return link.getProducerCapacity(); + } + + public String toString() + { + return address.toString(); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java new file mode 100644 index 0000000000..b9195220d4 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java @@ -0,0 +1,278 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.messaging.Address; +import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.SubscriptionSettings; +import org.apache.qpid.messaging.Address.PolicyType; +import org.apache.qpid.messaging.QpidDestination.CheckMode; +import org.apache.qpid.messaging.QpidTopic; +import org.apache.qpid.messaging.address.AddressException; +import org.apache.qpid.messaging.address.AddressHelper; +import org.apache.qpid.messaging.address.Link; +import org.apache.qpid.messaging.address.Link.Reliability; +import org.apache.qpid.messaging.amqp_0_10.Session_0_10; +import org.apache.qpid.transport.ExchangeQueryResult; +import org.apache.qpid.transport.ExecutionErrorCode; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.SessionException; + +public class QpidTopic_0_10 extends QpidTopic +{ + private String exchangeName; + private Session ssn; + private Address address; + private ExchangeNode exchange; + private Link_0_10 link; + private String subscriptionQueue; + + public QpidTopic_0_10(Address address,ExchangeNode exchange,Link_0_10 link) throws Exception + { + if (Reliability.AT_LEAST_ONCE == link.getReliability()) + { + throw new Exception("AT-LEAST-ONCE is not yet supported for Topics"); + } + + this.address = address; + this.exchange = exchange; + this.link = link; + this.exchangeName = address.getName(); + topicName = retrieveTopicName(); + } + + @Override + public void checkCreate(Session session,CheckMode mode) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (AddressHelper.isAllowed(exchange.getCreatePolicy(), mode)) + { + ssn.exchangeDeclare(exchangeName, + exchange.getExchangeType(), + exchange.getAltExchange(), + exchange.getDeclareArgs(), + exchangeName.toString().startsWith("amq.") ? + Option.PASSIVE : Option.NONE); + } + else + { + try + { + ssn.exchangeDeclare(exchangeName, null, null, null, Option.PASSIVE); + ssn.sync(); + } + catch(SessionException e) + { + if (e.getException() != null + && e.getException().getErrorCode() == ExecutionErrorCode.NOT_FOUND) + { + throw new AddressException("The exchange '" + exchangeName +"' does not exist",e); + } + else + { + throw e; + } + } + } + } + + @Override + public void checkAssert(Session session,CheckMode mode) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (AddressHelper.isAllowed(exchange.getAssertPolicy(), mode)) + { + ExchangeQueryResult result = ssn.exchangeQuery(exchangeName, Option.NONE).get(); + boolean match = !result.getNotFound() && + (result.getDurable() == exchange.isDurable()) && + (exchange.getExchangeType() != null && exchange.getExchangeType().equals(result.getType())) && + (exchange.matchProps(result.getArguments())); + + if (!match) + { + throw new AddressException("The exchange described by the address does not exist on the broker"); + } + } + } + + @Override + public void checkDelete(Session session,CheckMode mode) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (AddressHelper.isAllowed(exchange.getDeletePolicy(), mode) && + !exchangeName.toString().startsWith("amq.")) + { + ssn.exchangeDelete(exchangeName, Option.NONE); + ssn.sync(); + } + } + + @Override + public void createSubscription(Session session,SubscriptionSettings settings) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + subscriptionQueue = getSubscriptionQueueName(); + + // for topics subscriptions queues are exclusive by default. + boolean exclusive = + (link.isQueueExclusive() == null) ? true : link.getSubscription().isExclusive(); + + // for topics subscriptions are autoDelete by default. + boolean autoDelete = + (link.getSubscription().isExclusive() == null) ? true : link.getSubscription().isExclusive(); + + ssn.queueDeclare(subscriptionQueue, + link.getQueueAltExchange(), + link.getQueueDeclareArgs(), + autoDelete ? Option.AUTO_DELETE : Option.NONE, + link.isDurable() ? Option.DURABLE : Option.NONE, + exclusive ? Option.EXCLUSIVE : Option.NONE); + + if (link.getQueueBindings().size() > 0) + { + for (Binding binding: link.getQueueBindings()) + { + String queue = binding.getQueue() == null? + subscriptionQueue: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + address.getName() : + binding.getExchange(); + + ssn.exchangeBind(queue, + exchange, + binding.getBindingKey(), + binding.getArgs()); + } + } + else + { + String subject = address.getSubject(); + ssn.exchangeBind(subscriptionQueue, + address.getName(), + subject == null || subject.trim().equals("") ? "#" : subject, + null); + } + + SubscriptionSettings_0_10 settings_0_10 = (SubscriptionSettings_0_10)settings; + Map<String,Object> arguments = settings_0_10.getArgs(); + arguments.putAll((Map<? extends String, ? extends Object>)link.getSubscription().getArgs()); + + if (link.getReliability() == Reliability.UNRELIABLE || + link.getReliability() == Reliability.AT_MOST_ONCE) + { + settings_0_10.setAcceptMode(MessageAcceptMode.NONE); + } + + // for topics subscriptions are exclusive by default. + boolean exclusiveConsume = + (link.getSubscription().isExclusive() == null) ? true : link.getSubscription().isExclusive(); + + ssn.messageSubscribe(subscriptionQueue, + settings_0_10.getSubscriptionTag(), + settings_0_10.getAcceptMode(), + settings_0_10.getAccquireMode(), + null, // resume id + 0, // resume ttl + arguments, + exclusiveConsume ? Option.EXCLUSIVE : Option.NONE); + } + + @Override + public void deleteSubscription(Session session) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (link.getQueueBindings().size() > 0) + { + for (Binding binding: link.getQueueBindings()) + { + String queue = binding.getQueue() == null? + subscriptionQueue: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + "anq.topic" : + binding.getExchange(); + + ssn.exchangeUnbind(queue, + exchange, + binding.getBindingKey(), + Option.NONE); + } + } + ssn.queueDelete(subscriptionQueue, Option.NONE); + } + + private String getSubscriptionQueueName() + { + if (link.getName() == null) + { + return "TempQueue" + UUID.randomUUID(); + } + else + { + return link.getName(); + } + } + + private String retrieveTopicName() + { + if (address.getSubject() != null && !address.getSubject().trim().equals("")) + { + return address.getSubject(); + } + else if ("topic".equals(exchange.getExchangeType())) + { + return "#"; + } + else + { + return ""; + } + } + + public String getSubscriptionQueue() + { + return subscriptionQueue; + } + + public long getConsumerCapacity() + { + return link.getConsumerCapacity(); + } + + public long getProducerCapacity() + { + return link.getProducerCapacity(); + } + + public String toString() + { + return address.toString(); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QueueNode.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QueueNode.java new file mode 100644 index 0000000000..04f0911f44 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QueueNode.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import java.util.List; + +import org.apache.qpid.messaging.Address; + +public class QueueNode extends Node_0_10 +{ + private boolean exclusive = false; + private List<Binding> bindings; + + public QueueNode(AddressHelper_0_10 helper) + { + super(helper); + exclusive = helper.isNodeExclusive(); + bindings = helper.getNodeBindings(); + } + + public boolean isExclusive() + { + return exclusive; + } + + public List<Binding> getBindings() + { + return bindings; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/SubscriptionSettings_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/SubscriptionSettings_0_10.java new file mode 100644 index 0000000000..cbb51c637b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/SubscriptionSettings_0_10.java @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import java.util.Map; + +import org.apache.qpid.messaging.SubscriptionSettings; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; + +public class SubscriptionSettings_0_10 implements SubscriptionSettings +{ + String messageSelector; + String subscriptionTag; + MessageAcceptMode acceptMode; + MessageAcquireMode accquireMode; + Map<String,Object> args; + + public String getMessageSelector() + { + return messageSelector; + } + + public void setMessageSelector(String messageSelector) + { + this.messageSelector = messageSelector; + } + + public String getSubscriptionTag() + { + return subscriptionTag; + } + + public void setSubscriptionTag(String subscriptionTag) + { + this.subscriptionTag = subscriptionTag; + } + + public MessageAcceptMode getAcceptMode() + { + return acceptMode; + } + + public void setAcceptMode(MessageAcceptMode acceptMode) + { + this.acceptMode = acceptMode; + } + + public MessageAcquireMode getAccquireMode() + { + return accquireMode; + } + + public void setAccquireMode(MessageAcquireMode accquireMode) + { + this.accquireMode = accquireMode; + } + + public Map<String, Object> getArgs() + { + return args; + } + + public void setArgs(Map<String, Object> args) + { + this.args = args; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/amqp_0_10/Session_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/amqp_0_10/Session_0_10.java new file mode 100644 index 0000000000..ad611ac65d --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/amqp_0_10/Session_0_10.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.amqp_0_10; + +import org.apache.qpid.messaging.Session; + +/** + * Currently only a marker interface used primarily + * for address destination refactoring QPID-3401 + * + */ +public class Session_0_10 implements Session +{ + private org.apache.qpid.transport.Session protocolSession; + + public Session_0_10(org.apache.qpid.transport.Session ssn) + { + protocolSession = ssn; + } + + public org.apache.qpid.transport.Session getProtocolSession() + { + return protocolSession; + } +} diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java index 849827216c..674f16fe3d 100644 --- a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java +++ b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java @@ -154,7 +154,7 @@ public class AMQSession_0_10Test extends TestCase public void testExceptionOnCreateConsumer() { AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); - AMQAnyDestination destination = createDestination(); + AMQDestination destination = createDestination(); try { session.createConsumer(destination); @@ -170,10 +170,9 @@ public class AMQSession_0_10Test extends TestCase public void testExceptionOnCreateSubscriber() { AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); - AMQAnyDestination destination = createDestination(); try { - session.createSubscriber(destination); + session.createSubscriber(new AMQTopic(new AMQShortString("amq.topic"),new AMQShortString("test"))); fail("JMSException should be thrown"); } catch (Exception e) @@ -518,13 +517,13 @@ public class AMQSession_0_10Test extends TestCase assertNotNull("ExchangeDeclare event was not sent", event); } - private AMQAnyDestination createDestination() + private AMQDestination createDestination() { - AMQAnyDestination destination = null; + AMQDestination destination = null; try { - destination = new AMQAnyDestination(new AMQShortString("amq.direct"), new AMQShortString("direct"), - new AMQShortString("test"), false, true, new AMQShortString("test"), true, null); + destination = new AMQQueue(new AMQShortString("amq.direct"), new AMQShortString("direct"), + new AMQShortString("test"),new AMQShortString("test"), false, true, true, null); } catch (Exception e) { diff --git a/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java b/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java index dc5b69dc89..d154f70339 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java @@ -26,6 +26,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; @@ -36,6 +37,8 @@ public interface Accessor public Integer getInt(String name); public Long getLong(String name); public String getString(String name); + public Map getMap(String name); + public List getList(String name); static class SystemPropertyAccessor implements Accessor { @@ -58,6 +61,10 @@ public interface Accessor { return System.getProperty(name); } + + public Map getMap(String name){ throw new UnsupportedOperationException("Not supported by system properties"); } + + public List getList(String name){ throw new UnsupportedOperationException("Not supported by system properties"); } } static class MapAccessor implements Accessor @@ -144,6 +151,31 @@ public interface Accessor return null; } } + + public Map getMap(String name) + { + if (source != null && source.containsKey(name) && source.get(name) instanceof Map) + { + return (Map)source.get(name); + } + else + { + return null; + } + } + + public List getList(String name) + { + if (source != null && source.containsKey(name) && source.get(name) instanceof List) + { + return (List)source.get(name); + } + else + { + return null; + } + } + } static class PropertyFileAccessor extends MapAccessor @@ -163,6 +195,12 @@ public interface Accessor } source = props; } + + @Override + public Map getMap(String name){ throw new UnsupportedOperationException("Not supported by property file"); } + + @Override + public List getList(String name){ throw new UnsupportedOperationException("Not supported by property file"); } } static class CombinedAccessor implements Accessor @@ -190,7 +228,7 @@ public interface Accessor { for (Accessor accessor: accessors) { - if (accessor.getBoolean(name) != null) + if (accessor.getInt(name) != null) { return accessor.getInt(name); } @@ -202,7 +240,7 @@ public interface Accessor { for (Accessor accessor: accessors) { - if (accessor.getBoolean(name) != null) + if (accessor.getLong(name) != null) { return accessor.getLong(name); } @@ -214,13 +252,37 @@ public interface Accessor { for (Accessor accessor: accessors) { - if (accessor.getBoolean(name) != null) + if (accessor.getString(name) != null) { return accessor.getString(name); } } return null; } + + public Map getMap(String name) + { + for (Accessor accessor: accessors) + { + if (accessor.getMap(name) != null && accessor.getMap(name) instanceof Map) + { + return accessor.getMap(name); + } + } + return null; + } + + public List getList(String name) + { + for (Accessor accessor: accessors) + { + if (accessor.getMap(name) != null && accessor.getList(name) instanceof List) + { + return accessor.getList(name); + } + } + return null; + } } static class ValidationAccessor implements Accessor @@ -269,5 +331,97 @@ public interface Accessor } return v; } + + public Map getMap(String name){ throw new UnsupportedOperationException("Validator interface does not support maps"); } + + public List getList(String name){ throw new UnsupportedOperationException("Validator interface does not support maps"); } } + + /** + * Property names as passed in the form + * level_1_prop/level_2_prop/.../level_n_prop + * All property name upto level_n-1_prop should return + * a map or null + */ + static class NestedMapAccessor implements Accessor + { + protected Map<Object,Object> baseMap; + + public NestedMapAccessor(Map<Object,Object> map) + { + baseMap = map; + } + + private String getKey(String name) + { + if (name.lastIndexOf("/") > -1) + { + return name.substring(name.lastIndexOf("/")+1); + } + else + { + return name; + } + } + + private MapAccessor mapIterator(String name) + { + if (name.lastIndexOf("/") == -1) + { + return new MapAccessor(baseMap); + } + + String[] paths = name.substring(0,name.lastIndexOf("/")).split("/"); + Map map = baseMap == null ? Collections.EMPTY_MAP : baseMap; + + for (String path:paths) + { + + Object obj = map.get(path); + if (obj == null) + { + return new MapAccessor(null); + } + else if (obj instanceof Map) + { + map = (Map)obj; + } + else + { + throw new IllegalArgumentException(path + " doesn't retrieve another map"); + } + } + return new MapAccessor(map); + } + + public Boolean getBoolean(String name) + { + return mapIterator(name).getBoolean(getKey(name)); + } + + public Integer getInt(String name) + { + return mapIterator(name).getInt(getKey(name)); + } + + public Long getLong(String name) + { + return mapIterator(name).getLong(getKey(name)); + } + + public String getString(String name) + { + return mapIterator(name).getString(getKey(name)); + } + + public Map getMap(String name) + { + return mapIterator(name).getMap(getKey(name)); + } + + public List getList(String name) + { + return mapIterator(name).getList(getKey(name)); + } + } } diff --git a/java/common/src/main/java/org/apache/qpid/messaging/Address.java b/java/common/src/main/java/org/apache/qpid/messaging/Address.java index 2c7fe7b8ed..ce8734f1a3 100644 --- a/java/common/src/main/java/org/apache/qpid/messaging/Address.java +++ b/java/common/src/main/java/org/apache/qpid/messaging/Address.java @@ -34,6 +34,21 @@ import static org.apache.qpid.messaging.util.PyPrint.pprint; public class Address { + public enum AddressType {QUEUE_ADDRESS, TOPIC_ADDRESS, UNSPECIFIED }; + + public enum PolicyType + { + ALWAYS, NEVER, SENDER, RECEIVER; + public static PolicyType getPolicyType(String str) + { + if ( str == null || str.equals("") || "never".equals(str)) return PolicyType.NEVER; + if ("always".equals(str)) return PolicyType.ALWAYS; + else if ("sender".equals(str)) return PolicyType.SENDER; + else if ("receiver".equals(str)) return PolicyType.RECEIVER; + else throw new IllegalArgumentException(str + " is not an allowed value"); + } + } + public static Address parse(String address) { @@ -43,6 +58,8 @@ public class Address private String name; private String subject; private Map options; + private AddressType type = AddressType.QUEUE_ADDRESS; + private boolean resolved = false; public Address(String name, String subject, Map options) { @@ -50,7 +67,27 @@ public class Address this.subject = subject; this.options = options; } - + + public AddressType getAddressType() + { + return type; + } + + public void setAddressType(AddressType type) + { + this.type = type; + } + + public boolean isResolved() + { + return resolved; + } + + public void setResolved(boolean b) + { + this.resolved = b; + } + public String getName() { return name; diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 8c3c247e2b..c07178d7be 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -29,6 +29,7 @@ import java.util.Properties; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -44,14 +45,14 @@ import javax.jms.TopicSubscriber; import javax.naming.Context; import javax.naming.InitialContext; -import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.client.AddressBasedDestination; +import org.apache.qpid.client.AddressBasedQueue; +import org.apache.qpid.client.AddressBasedTopic; import org.apache.qpid.client.message.QpidMessageProperties; -import org.apache.qpid.client.messaging.address.Node.ExchangeNode; -import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.messaging.Address; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -69,6 +70,16 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { super.setUp(); _connection = getConnection() ; + _connection.setExceptionListener(new ExceptionListener() + { + + @Override + public void onException(JMSException ex) + { + // ignore + } + + }); _connection.start(); } @@ -79,6 +90,18 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase super.tearDown(); } + // Currently if we get a session exception the connection is canned. + private void recreateConnection() throws Exception + { + _connection = getConnection() ; + _connection.start(); + } + + private AddressBasedDestination getDestination(String addr) throws Exception + { + return (AddressBasedDestination)AMQDestination.createDestination(addr); + } + public void testCreateOptions() throws Exception { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); @@ -88,118 +111,140 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase // default (create never, assert never) ------------------- // create never -------------------------------------------- String addr1 = "ADDR:testQueue1"; - AMQDestination dest = new AMQAnyDestination(addr1); + AddressBasedDestination dest = getDestination(addr1); try { cons = jmsSession.createConsumer(dest); + fail("Exception should have been thrown as the queue does not exist"); } catch(JMSException e) { - assertTrue(e.getMessage().contains("The name 'testQueue1' supplied in the address " + - "doesn't resolve to an exchange or a queue")); - } + assertTrue(e.getCause().getMessage().contains("The Queue 'testQueue1' does not exist")); + recreateConnection(); + jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + } try { prod = jmsSession.createProducer(dest); + fail("Exception should have been thrown as the queue does not exist"); } catch(JMSException e) { - assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue1' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + e.printStackTrace(); + assertTrue(e.getCause().getCause().getCause().getMessage().contains("The Queue 'testQueue1' does not exist")); + recreateConnection(); + jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, (QueueNode)dest.getSourceNode() ,true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName())); // create always ------------------------------------------- addr1 = "ADDR:testQueue1; { create: always }"; - dest = new AMQAnyDestination(addr1); + dest = getDestination(addr1); cons = jmsSession.createConsumer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName())); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", - dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs())); + dest.getAddress().getName(),dest.getAddress().getName(),null)); // create receiver ----------------------------------------- addr1 = "ADDR:testQueue2; { create: receiver }"; - dest = new AMQAnyDestination(addr1); + dest = getDestination(addr1); try { prod = jmsSession.createProducer(dest); + fail("Exception should have been thrown as the queue does not exist"); } catch(JMSException e) { - assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue2' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getCause().getCause().getCause().getMessage().contains("The Queue 'testQueue2' does not exist")); + jmsSession.close(); + recreateConnection(); + jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); } - + + System.out.println("==========================================="); + System.out.println("jmsSession current exception " + ((AMQSession_0_10)jmsSession).getCurrentException()); + System.out.println("==========================================="); + assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName())); + + System.out.println("==========================================="); + System.out.println("jmsSession current exception " + ((AMQSession_0_10)jmsSession).getCurrentException()); + System.out.println("==========================================="); cons = jmsSession.createConsumer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName())); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", - dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs())); + dest.getAddress().getName(),dest.getAddress().getName(), null)); // create never -------------------------------------------- addr1 = "ADDR:testQueue3; { create: never }"; - dest = new AMQAnyDestination(addr1); + dest = getDestination(addr1); try { - cons = jmsSession.createConsumer(dest); + cons = jmsSession.createConsumer(dest); + fail("Exception should have been thrown as the queue does not exist"); } catch(JMSException e) { - assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getCause().getMessage().contains("The Queue 'testQueue3' does not exist")); + recreateConnection(); + jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); } try { prod = jmsSession.createProducer(dest); + fail("Exception should have been thrown as the queue does not exist"); } catch(JMSException e) { - assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue3' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getCause().getCause().getCause().getMessage().contains("The Queue 'testQueue3' does not exist")); + recreateConnection(); + jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName())); // create sender ------------------------------------------ addr1 = "ADDR:testQueue3; { create: sender }"; - dest = new AMQAnyDestination(addr1); + dest = getDestination(addr1); try { cons = jmsSession.createConsumer(dest); + fail("Exception should have been thrown as the queue does not exist"); } catch(JMSException e) { - assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getCause().getMessage().contains("The Queue 'testQueue3' does not exist")); + recreateConnection(); + jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName())); prod = jmsSession.createProducer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName())); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", - dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs())); + dest.getAddress().getName(),dest.getAddress().getName(), null)); } - + public void testCreateQueue() throws Exception { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); @@ -226,30 +271,30 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase "}" + "}"; - AMQDestination dest = new AMQAnyDestination(addr); + AddressBasedDestination dest = getDestination(addr); MessageConsumer cons = jmsSession.createConsumer(dest); cons.close(); // Even if the consumer is closed the queue and the bindings should be intact. assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName())); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", - dest.getAddressName(),dest.getAddressName(), null)); + dest.getAddress().getName(),dest.getAddress().getName(), null)); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", - dest.getAddressName(),"test", null)); + dest.getAddress().getName(),"test", null)); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout", - dest.getAddressName(),null, null)); + dest.getAddress().getName(),null, null)); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", - dest.getAddressName(),"a.#", null)); + dest.getAddress().getName(),"a.#", null)); Map<String,Object> args = new HashMap<String,Object>(); args.put("x-match","any"); @@ -257,7 +302,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase args.put("loc","CA"); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("amq.match", - dest.getAddressName(),null, args)); + dest.getAddress().getName(),null, args)); MessageProducer prod = jmsSession.createProducer(dest); prod.send(jmsSession.createTextMessage("test")); @@ -312,7 +357,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase "}" + "}"; - AMQDestination dest = new AMQAnyDestination(addr); + AddressBasedDestination dest = getDestination(addr); MessageConsumer cons; try @@ -338,7 +383,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertTrue("Exchange not created as expected",( - (AMQSession_0_10)jmsSession).isExchangeExist(dest, (ExchangeNode)dest.getTargetNode() , true)); + (AMQSession_0_10)jmsSession).isExchangeExist(dest.getAddress().getName())); // The existence of the queue is implicitly tested here assertTrue("Queue not bound as expected",( @@ -346,7 +391,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase dest.getQueueName(),"hello", Collections.<String, Object>emptyMap())); // The client should be able to query and verify the existence of my-exchange (QPID-2774) - dest = new AMQAnyDestination("ADDR:my-exchange; {create: never}"); + dest = getDestination("ADDR:my-exchange; {create: never}"); cons = jmsSession.createConsumer(dest); } @@ -376,27 +421,27 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase return argsString; } - public void checkQueueForBindings(Session jmsSession, AMQDestination dest,String headersBinding) throws Exception + public void checkQueueForBindings(Session jmsSession, AddressBasedDestination dest,String headersBinding) throws Exception { - assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + assertTrue("Queue not created as expected",( + (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName())); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", - dest.getAddressName(),dest.getAddressName(), null)); + dest.getAddress().getName(),dest.getAddress().getName(), null)); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", - dest.getAddressName(),"test", null)); + dest.getAddress().getName(),"test", null)); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", - dest.getAddressName(),"a.#", null)); + dest.getAddress().getName(),"a.#", null)); Address a = Address.parse(headersBinding); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("amq.match", - dest.getAddressName(),null, a.getOptions())); + dest.getAddress().getName(),null, a.getOptions())); } /** @@ -406,7 +451,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void testBindQueueWithArgs() throws Exception { - Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); String headersBinding = "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}"; String addr = "node: " + @@ -425,11 +470,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase "}"; - AMQDestination dest1 = new AMQAnyDestination("ADDR:my-queue/hello; {create: receiver, " +addr); + AddressBasedDestination dest1 = getDestination("ADDR:my-queue/hello; {create: receiver, " +addr); MessageConsumer cons = jmsSession.createConsumer(dest1); checkQueueForBindings(jmsSession,dest1,headersBinding); - AMQDestination dest2 = new AMQAnyDestination("ADDR:my-queue2/hello; {create: sender, " +addr); + AddressBasedDestination dest2 = getDestination("ADDR:my-queue2/hello; {create: sender, " +addr); MessageProducer prod = jmsSession.createProducer(dest2); checkQueueForBindings(jmsSession,dest2,headersBinding); } @@ -467,7 +512,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - AMQDestination dest = new AMQAnyDestination(address); + AddressBasedDestination dest = getDestination(address); MessageConsumer cons = jmsSession.createConsumer(dest); MessageProducer prod = jmsSession.createProducer(dest); @@ -508,8 +553,8 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase PropertiesFileInitialContextFactory props = new PropertiesFileInitialContextFactory(); Context ctx = props.getInitialContext(map); - AMQDestination dest1 = (AMQDestination)ctx.lookup("myQueue1"); - AMQDestination dest2 = (AMQDestination)ctx.lookup("myQueue2"); + AddressBasedDestination dest1 = (AddressBasedDestination)ctx.lookup("myQueue1"); + AddressBasedDestination dest2 = (AddressBasedDestination)ctx.lookup("myQueue2"); AMQDestination dest3 = (AMQDestination)ctx.lookup("myQueue3"); Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); @@ -518,25 +563,25 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase MessageConsumer cons3 = jmsSession.createConsumer(dest3); assertTrue("Destination1 was not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest1,(QueueNode)dest1.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest1.getQueueName())); assertTrue("Destination1 was not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", - dest1.getAddressName(),dest1.getAddressName(), null)); + dest1.getAddress().getName(),dest1.getAddress().getName(), null)); assertTrue("Destination2 was not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest2,(QueueNode)dest2.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest2.getQueueName())); assertTrue("Destination2 was not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", - dest2.getAddressName(),dest2.getAddressName(), null)); + dest2.getAddress().getName(),dest2.getAddress().getName(), null)); MessageProducer producer = jmsSession.createProducer(dest3); producer.send(jmsSession.createTextMessage("Hello")); TextMessage msg = (TextMessage)cons3.receive(1000); assertEquals("Destination3 was not created as expected.",msg.getText(),"Hello"); } - + /** * Test goal: Verifies the subject can be overridden using "qpid.subject" message property. * Test strategy: Creates and address with a default subject "topic1" @@ -547,7 +592,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - AMQDestination topic1 = new AMQAnyDestination("ADDR:amq.topic/topic1; {link:{name: queue1}}"); + AddressBasedDestination topic1 = getDestination("ADDR:amq.topic/topic1; {link:{name: queue1}}"); MessageProducer prod = jmsSession.createProducer(topic1); @@ -555,7 +600,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase m.setStringProperty("qpid.subject", "topic2"); MessageConsumer consForTopic1 = jmsSession.createConsumer(topic1); - MessageConsumer consForTopic2 = jmsSession.createConsumer(new AMQAnyDestination("ADDR:amq.topic/topic2; {link:{name: queue2}}")); + MessageConsumer consForTopic2 = jmsSession.createConsumer(getDestination("ADDR:amq.topic/topic2; {link:{name: queue2}}")); prod.send(m); Message msg = consForTopic1.receive(1000); @@ -591,14 +636,15 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase queue = ssn.createQueue("ADDR:my-queue2"); try { - prod = ssn.createProducer(queue); - fail("The client should throw an exception, since there is no queue present in the broker"); + prod = ssn.createProducer(queue); + fail("The client should throw an exception, since there is no queue present in the broker"); } catch(Exception e) { - String s = "The name 'my-queue2' supplied in the address " + - "doesn't resolve to an exchange or a queue"; - assertEquals(s,e.getCause().getCause().getMessage()); + String s = "The Queue 'my-queue2' does not exist"; + assertEquals(s,e.getCause().getCause().getCause().getMessage()); + recreateConnection(); + ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); } // explicit create case @@ -614,10 +660,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase cons.close(); // Using the ADDR method to create a more complicated queue - String addr = "ADDR:amq.direct/x512; {create: receiver, " + - "link : {name : 'MY.RESP.QUEUE', " + - "x-declare : { auto-delete: true, exclusive: true, " + - "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring} } } }"; + String addr = "ADDR:MY.RESP.QUEUE; {create: sender, " + + "node : {x-declare : { auto-delete: true, exclusive: true, " + + "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring} } }," + + "link : {x-bindings:[{exchange: 'amq.direct', key:x512}]}" + + " }"; queue = ssn.createQueue(addr); prod = ssn.createProducer(queue); @@ -692,9 +739,9 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase prod = ssn.createProducer(topic); cons = ssn.createConsumer(topic); - assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",( + /*assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",( (AMQSession_0_10)ssn).isQueueBound("vehicles", - "my-topic","bus", null)); + "my-topic","bus", null));*/ assertTrue("The queue was not bound to vehicle exchange using car as the binding key",( (AMQSession_0_10)ssn).isQueueBound("vehicles", @@ -710,7 +757,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); } - + /** * Test Goal : Verify the default subjects used for each exchange type. * The default for amq.topic is "#" and for the rest it's "" @@ -719,16 +766,10 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - MessageConsumer queueCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.direct")); - MessageConsumer topicCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.topic")); + MessageConsumer topicCons = ssn.createConsumer(getDestination("ADDR:amq.topic")); - MessageProducer queueProducer = ssn.createProducer(new AMQAnyDestination("ADDR:amq.direct")); - MessageProducer topicProducer1 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/usa.weather")); - MessageProducer topicProducer2 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/sales")); - - queueProducer.send(ssn.createBytesMessage()); - assertNotNull("The consumer subscribed to amq.direct " + - "with empty binding key should have received the message ",queueCons.receive(1000)); + MessageProducer topicProducer1 = ssn.createProducer(getDestination("ADDR:amq.topic/usa.weather")); + MessageProducer topicProducer2 = ssn.createProducer(getDestination("ADDR:amq.topic/sales")); topicProducer1.send(ssn.createTextMessage("25c")); assertEquals("The consumer subscribed to amq.topic " + @@ -756,7 +797,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase Destination dest = ssn.createQueue(addr); MessageConsumer browseCons = ssn.createConsumer(dest); - MessageProducer prod = ssn.createProducer(ssn.createQueue("ADDR:amq.direct/test")); + MessageProducer prod = ssn.createProducer(ssn.createTopic("ADDR:amq.direct/test")); prod.send(ssn.createTextMessage("Test1")); prod.send(ssn.createTextMessage("Test2")); @@ -791,8 +832,17 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Destination dest = ssn.createTopic("ADDR:amq.topic/foo; {link:{durable:true}}"); + + System.out.println("------------ Creating consumer 1-----------------------"); + MessageConsumer consumer1 = ssn.createConsumer(dest); + + System.out.println("------------ / Creating consumer 1-----------------------"); + + System.out.println("------------ Creating consumer 2-----------------------"); MessageConsumer consumer2 = ssn.createConsumer(dest); + System.out.println("------------/ Creating consumer 2-----------------------"); + MessageProducer prod = ssn.createProducer(dest); prod.send(ssn.createTextMessage("A")); @@ -819,7 +869,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase _connection = getConnection() ; _connection.start(); ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - dest = ssn.createTopic("ADDR:my_queue; {create: always}"); + dest = ssn.createQueue("ADDR:my_queue; {create: always}"); consumer1 = ssn.createConsumer(dest); consumer2 = ssn.createConsumer(dest); prod = ssn.createProducer(dest); @@ -842,17 +892,17 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); String addr = "ADDR:MRKT; " + - "{" + - "create: receiver," + - "node : {type: topic, x-declare: {type: topic} }," + - "link:{" + - "name: my-topic," + - "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]" + - "}" + - "}"; + "{" + + "create: receiver," + + "node : {type: topic, x-declare: {type: topic} }," + + "link:{" + + "name: my-topic," + + "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]" + + "}" + + "}"; // Using the ADDR method to create a more complicated topic - MessageConsumer cons = ssn.createConsumer(new AMQAnyDestination(addr)); + MessageConsumer cons = ssn.createConsumer(getDestination(addr)); assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",( (AMQSession_0_10)ssn).isQueueBound("MRKT", @@ -878,7 +928,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); String str = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; - Destination dest = ssn.createTopic(str); + Destination dest = ssn.createQueue(str); MessageConsumer consumer1 = ssn.createConsumer(dest); try { @@ -889,11 +939,12 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { } } - + + public void testQueueReceiversAndTopicSubscriber() throws Exception { - Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}"); - Topic topic = new AMQAnyDestination("ADDR:amq.topic/test"); + Queue queue = new AddressBasedQueue("my-queue; {create: always}"); + Topic topic = new AddressBasedTopic("amq.topic/test"); QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); QueueReceiver receiver = qSession.createReceiver(queue); @@ -917,7 +968,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertEquals("test2",((TextMessage)msg2).getText()); } - public void testDurableSubscriber() throws Exception + public void xtestDurableSubscriber() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); @@ -977,7 +1028,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase // default (create never, assert never) ------------------- // create never -------------------------------------------- String addr1 = "ADDR:testQueue1;{create: always, delete: always}"; - AMQDestination dest = new AMQAnyDestination(addr1); + AddressBasedDestination dest = getDestination(addr1); try { cons = jmsSession.createConsumer(dest); @@ -989,11 +1040,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName())); String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}"; - dest = new AMQAnyDestination(addr2); + dest = getDestination(addr2); try { cons = jmsSession.createConsumer(dest); @@ -1005,14 +1056,14 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName())); String addr3 = "ADDR:testQueue3;{create: always, delete: sender}"; - dest = new AMQAnyDestination(addr3); + dest = getDestination(addr3); try { - cons = jmsSession.createConsumer(dest); + //cons = jmsSession.createConsumer(dest); MessageProducer prod = jmsSession.createProducer(dest); prod.close(); } @@ -1022,7 +1073,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName())); } @@ -1061,7 +1112,9 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}"; try { - AMQAnyDestination dest = new AMQAnyDestination(addr3); + Destination dest = getDestination(addr3); + Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); + MessageConsumer cons = ssn.createConsumer(dest); fail("An exception should be thrown indicating it's an unsupported type"); } catch(Exception e) @@ -1072,14 +1125,14 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase String addr4 = "ADDR:amq.topic/test;{link : {reliability : at-least-once}}"; try { - AMQAnyDestination dest = new AMQAnyDestination(addr4); + Destination dest = getDestination(addr4); Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); MessageConsumer cons = ssn.createConsumer(dest); fail("An exception should be thrown indicating it's an unsupported combination"); } catch(Exception e) { - assertTrue(e.getCause().getMessage().contains("AT-LEAST-ONCE is not yet supported for Topics")); + assertTrue(e.getCause().getCause().getMessage().contains("AT-LEAST-ONCE is not yet supported for Topics")); } } @@ -1089,7 +1142,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase MessageConsumer cons; MessageProducer prod; - AMQDestination dest = new AMQAnyDestination(address); + AddressBasedDestination dest = getDestination(address); cons = ssn.createConsumer(dest); prod = ssn.createProducer(dest); @@ -1119,8 +1172,8 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase MessageConsumer cons = ssn.createConsumer(ssn.createTopic("ADDR:amq.topic/test")); MessageProducer prod = ssn.createProducer(null); - Queue queue = ssn.createQueue("ADDR:amq.topic/test"); - prod.send(queue,ssn.createTextMessage("A")); + Topic topic = ssn.createTopic("ADDR:amq.topic/test"); + prod.send(topic,ssn.createTextMessage("A")); Message msg = cons.receive(1000); assertNotNull(msg); @@ -1147,7 +1200,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase Destination replyToDest = AMQDestination.createDestination(replyTo); MessageConsumer replyToCons = session.createConsumer(replyToDest); - Destination dest = session.createQueue("ADDR:amq.direct/test"); + Destination dest = session.createTopic("ADDR:amq.direct/test"); MessageConsumer cons = session.createConsumer(dest); MessageProducer prod = session.createProducer(dest); diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java index a7efe4922b..8271481a1d 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java @@ -20,15 +20,10 @@ */ package org.apache.qpid.test.client.message; -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.CustomJMSXProperty; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.management.common.mbeans.ManagedQueue; -import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import java.nio.BufferOverflowException; +import java.util.Iterator; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.Destination; @@ -41,10 +36,16 @@ import javax.jms.Session; import javax.jms.Topic; import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.TabularData; -import java.nio.BufferOverflowException; -import java.util.Iterator; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.CustomJMSXProperty; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; /** * From the API Docs getJMSDestination: @@ -338,13 +339,13 @@ public class JMSDestinationTest extends QpidBrokerTestCase public void testGetDestinationWithCustomExchange() throws Exception { - AMQDestination dest = new AMQAnyDestination(new AMQShortString("my-exchange"), + AMQDestination dest = new AMQQueue(new AMQShortString("my-exchange"), new AMQShortString("direct"), new AMQShortString("test"), - false, - false, new AMQShortString("test"), false, + false, + false, new AMQShortString[]{new AMQShortString("test")}); // to force the creation of my-exchange. diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java b/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java index b4294ee4cc..651d401596 100644 --- a/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java +++ b/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java @@ -25,15 +25,14 @@ import java.util.ArrayList; import java.util.List; import javax.jms.Connection; -import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.TextMessage; -import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AddressBasedDestination; /** * A generic receiver which consumes messages @@ -82,7 +81,7 @@ public class Receiver extends Client implements MessageListener { super(con); setSsn(con.createSession(isTransacted(), getAck_mode())); - consumer = getSsn().createConsumer(new AMQAnyDestination(addr)); + consumer = getSsn().createConsumer(new AddressBasedDestination(addr)); if (!syncRcv) { consumer.setMessageListener(this); diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java b/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java index 14b9b7302f..806571b47a 100644 --- a/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java +++ b/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java @@ -36,8 +36,8 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AddressBasedDestination; import org.apache.qpid.tools.MessageFactory; /** @@ -95,7 +95,7 @@ public class Sender extends Client this.iterations = Integer.getInteger("iterations", -1); this.sleep_time = Long.getLong("sleep_time", 1000); this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE)); - this.dest = new AMQAnyDestination(addr); + this.dest = new AddressBasedDestination(addr); this.producer = getSsn().createProducer(dest); this.replyTo = getSsn().createTemporaryQueue(); diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java index 72ca48e1c9..2d93daa3b2 100644 --- a/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java +++ b/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java @@ -29,9 +29,7 @@ import java.text.NumberFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; import javax.jms.Destination; import javax.jms.JMSException; @@ -44,11 +42,8 @@ import org.apache.log4j.ConsoleAppender; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; -import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.client.AddressBasedDestination; import org.apache.qpid.thread.Threading; /** @@ -144,7 +139,7 @@ public class TestLauncher implements ErrorHandler controlCon = new AMQConnection(url); controlCon.start(); - controlDest = new AMQAnyDestination("control; {create: always}"); // durable + controlDest = new AddressBasedDestination("control; {create: always}"); // durable // Create the session to setup the messages controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java index 121e94cea1..a71e6ac55c 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java @@ -31,11 +31,9 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; -import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession_0_10; -import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.client.AddressBasedDestination; import org.apache.qpid.messaging.Address; public class PerfBase @@ -142,7 +140,7 @@ public class PerfBase controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); dest = createDestination(); - controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR); + controllerQueue = new AddressBasedDestination(CONTROLLER_ADDR); myControlQueue = session.createQueue(myControlQueueAddr); msgType = MessageType.getType(params.getMessageType()); System.out.println("Using " + msgType + " messages"); @@ -157,10 +155,10 @@ public class PerfBase { System.out.println("Prefix : " + prefix); Address addr = Address.parse(params.getAddress()); - AMQAnyDestination temp = new AMQAnyDestination(params.getAddress()); - int type = ((AMQSession_0_10)session).resolveAddressType(temp); + AddressBasedDestination temp = new AddressBasedDestination(addr); + temp.resolveAddress((AMQSession_0_10)session); - if ( type == AMQDestination.TOPIC_TYPE) + if (temp.isTopic()) { addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions()); System.out.println("Setting subject : " + addr); @@ -171,11 +169,11 @@ public class PerfBase System.out.println("Setting name : " + addr); } - return new AMQAnyDestination(addr); + return new AddressBasedDestination(addr); } else { - return new AMQAnyDestination(params.getAddress()); + return new AddressBasedDestination(params.getAddress()); } } |