diff options
author | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-06 10:32:50 +0000 |
---|---|---|
committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-06 10:32:50 +0000 |
commit | d5865c18ddfa24f32ad47a6628f16e1cb5028f8f (patch) | |
tree | 556d8c2f728c4993a79c69730454ff4863d025c8 | |
parent | 8e159b2051510728f64f31c6b06e322cb2d7974d (diff) | |
download | qpid-python-d5865c18ddfa24f32ad47a6628f16e1cb5028f8f.tar.gz |
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@563097 13f79535-47bb-0310-9956-ffa450edef68
16 files changed, 1177 insertions, 162 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/Session.java index f94d26b854..46cc2f340c 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/Session.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/Session.java @@ -77,23 +77,24 @@ public interface Session //------------------------------------------------------ /** * Transfer the given message to a specified exchange. - * <p> Following are the valid options for messageTransfer - * <ul> - * <li> CONFIRM - * <li> PRE_ACCQUIRE - * </ul> - * <p> In the absence of a particular option, the defaul value is: - * <ul> - * <li> CONFIRM = false - * <li> NO-ACCQUIRE - * </ul> * - * @param exchange The exchange the message is being sent. - * @param msg The Message to be sent - * @param options A list of valid options + * @param confirmMode <ul> </li>off (0): confirmation is not required, once a message has been transferred in pre-acquire + * mode (or once acquire has been sent in no-acquire mode) the message is considered + * transferred + * <p/> + * <li> on (1): an acquired message (whether acquisition was implicit as in pre-acquire mode or + * explicit as in no-acquire mode) is not considered transferred until the original + * transfer is complete (signaled via execution.complete) + * </ul> + * @param acquireMode <ul> <li> no-acquire (0): the message must be explicitly acquired + * <p/> + * <li> pre-acquire (1): the message is acquired when the transfer starts + * </ul> + * @param exchange The exchange the message is being sent. + * @param msg The Message to be sent * @throws QpidException If the session fails to send the message due to some error */ - public void messageTransfer(String exchange, Message msg, Option... options) throws QpidException; + public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) throws QpidException; /** * Declare the beginning of a message transfer operation. This operation must @@ -103,31 +104,31 @@ public interface Session * <p> In the interval [messageTransfer endData] any attempt to call a method other than * {@link Session#addMessageHeaders}, {@link Session#endData} ore {@link Session#close} * will result in an exception being thrown. - * <p> Following are the valid options for messageTransfer - * <ul> - * <li> CONFIRM - * <li> PRE_ACCQUIRE - * </ul> - * <p> In the absence of a particular option, the defaul value is: - * <ul> - * <li> CONFIRM = false - * <li> NO-ACCQUIRE - * </ul> * - * @param exchange The exchange the message is being sent. - * @param options Set of options. + * @param confirmMode <ul> </li>off (0): confirmation is not required, once a message has been transferred in pre-acquire + * mode (or once acquire has been sent in no-acquire mode) the message is considered + * transferred + * <p/> + * <li> on (1): an acquired message (whether acquisition was implicit as in pre-acquire mode or + * explicit as in no-acquire mode) is not considered transferred until the original + * transfer is complete (signaled via execution.complete) + * </ul> + * @param acquireMode <ul> <li> no-acquire (0): the message must be explicitly acquired + * <p/> + * <li> pre-acquire (1): the message is acquired when the transfer starts + * </ul> + * @param exchange The exchange the message is being sent. * @throws QpidException If the session fails to send the message due to some error. */ - public void messageTransfer(String exchange, Option... options) throws QpidException; + public void messageTransfer(String exchange, short confirmMode, short acquireMode) throws QpidException; /** * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} - * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being sent. + * or to the message being sent. * * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code> * @throws QpidException If the session fails to execute the method due to some error * @see org.apache.qpidity.DeliveryProperties - * @see org.apache.qpidity.ApplicationProperties */ public void addMessageHeaders(Header... headers) throws QpidException; @@ -371,7 +372,12 @@ public interface Session * <p>In the absence of a particular option, the defaul value is false for each option * * @param queueName The name of the delcared queue. - * @param alternateExchange Alternate excahnge. + * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message + * may be rejected by a queue for the following reasons: + * <oL> <li> The queue is deleted when it is not empty; + * <<li> Immediate delivery of a message is requested, but there are no consumers connected to + * the queue. </ol> + * @param arguments Used for backward compatibility * @param options Set of Options. * @throws QpidException If the session fails to declare the queue due to some error. * @see Option @@ -385,6 +391,7 @@ public interface Session * @param queueName The queue to be bound. * @param exchangeName The exchange name. * @param routingKey The routing key. + * @param arguments Used for backward compatibility * @throws QpidException If the session fails to bind the queue due to some error. */ public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) @@ -396,6 +403,7 @@ public interface Session * @param queueName The queue to be unbound. * @param exchangeName The exchange name. * @param routingKey The routing key. + * @param arguments Used for backward compatibility * @throws QpidException If the session fails to unbind the queue due to some error. */ public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) @@ -448,9 +456,12 @@ public interface Session * <p/> * <p>In the absence of a particular option, the defaul value is false for each option</p> * * - * @param exchangeName The exchange name. - * @param exchangeClass The fully qualified name of the exchange class. - * @param options Set of options. + * @param exchangeName The exchange name. + * @param exchangeClass The fully qualified name of the exchange class. + * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which + * the message will be sent. + * @param options Set of options. + * @param arguments Used for backward compatibility * @throws QpidException If the session fails to declare the exchange due to some error. * @see Option */ diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java index bbcc17aca5..e3883f462e 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java @@ -13,162 +13,192 @@ import org.apache.qpidity.*; public class ClientSession implements org.apache.qpid.nclient.Session { - Map<String,MessagePartListener> messagListeners = new HashMap<String,MessagePartListener>(); - + Map<String,MessagePartListener> messagListeners = new HashMap<String,MessagePartListener>(); + + //------------------------------------------------------ // Session housekeeping methods //------------------------------------------------------ public void close() throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void suspend() throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void resume() throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + }//------------------------------------------------------ // Messaging methods // Producer //------------------------------------------------------ - public void messageTransfer(String exchange, Message msg, Option... options) throws QpidException + public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public void messageTransfer(String exchange, Option... options) throws QpidException + public void messageTransfer(String exchange, short confirmMode, short acquireMode) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void addMessageHeaders(Header... headers) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void addData(byte[] data, int off, int len) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void endData() throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public void messageSubscribe(String queue, String destination, MessagePartListener listener, Map<String, ?> filter, - Option... options) throws QpidException + public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, + MessagePartListener listener, Map<String, ?> filter, Option... options) + throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void messageCancel(String destination) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public void messageAcknowledge(Range... range) throws QpidException + public void setMessageListener(String destination, MessagePartListener listener) { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public void messageReject(Range... range) throws QpidException + public void messageFlowMode(String destination, short mode) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public Range[] messageAcquire(Range... range) throws QpidException + public void messageFlow(String destination, short unit, long value) throws QpidException { - return new Range[0]; //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public void messageRelease(Range... range) throws QpidException + public boolean messageFlush(String destination) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + return false; } + public void messageStop(String destination) throws QpidException + { + // TODO - public void messageFlowMode(String destination, short mode) + } + + public void messageAcknowledge(Range<Long>... range) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public void messageFlow(String destination, short unit, long value) + public void messageReject(Range<Long>... range) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public boolean messageFlush(String destination) + public Range<Long>[] messageAcquire(Range<Long>... range) throws QpidException { - return false; //To change body of implemented methods use File | Settings | File Templates. + // TODO + return null; } - public void messageStop(String destination) + public void messageRelease(Range<Long>... range) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + }// ----------------------------------------------- // Local transaction methods // ---------------------------------------------- public void txSelect() throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void txCommit() throws QpidException, IllegalStateException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void txRollback() throws QpidException, IllegalStateException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, - Option... options) throws QpidException + public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options) + throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws - QpidException + public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) + throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws - QpidException + public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) + throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void queuePurge(String queueName) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void queueDelete(String queueName, Option... options) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, Map<String, ?> arguments, Option... options) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void exchangeDelete(String exchangeName, Option... options) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void setMessageListener(String destination,MessagePartListener listener) - { - messagListeners.put(destination, listener); + // TODO + } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java index be3c9de194..f880d71ea3 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java @@ -17,8 +17,11 @@ */ package org.apache.qpid.nclient.jms; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Option; +import org.apache.qpidity.url.BindingURL; + import javax.jms.Destination; -import javax.jms.JMSException; /** * Implementation of the JMS Destination interface @@ -35,24 +38,60 @@ public class DestinationImpl implements Destination */ protected SessionImpl _session; + /** + * The excahnge name + */ + protected String _exchangeName; + + /** + * The excahnge class + */ + protected String _exchangeClass; + + /** + * The queu name + */ + protected String _queueName; + //--- Constructor /** * Create a new DestinationImpl with a given name. * - * @param name The name of this destination. + * @param name The name of this destination. * @param session The session used to create this destination. - * @throws JMSException If the destiantion name is not valid + * @throws QpidException If the destiantion name is not valid */ - protected DestinationImpl(SessionImpl session, String name) throws JMSException + protected DestinationImpl(SessionImpl session, String name) throws QpidException { - // TODO validate that this destination name exists - //_session.getQpidSession() _session = session; _name = name; } + /** + * Create a destiantion from a binding URL + * + * @param session The session used to create this queue. + * @param binding The URL + * @throws QpidException If the URL is not valid + */ + protected DestinationImpl(SessionImpl session, BindingURL binding) throws QpidException + { + _session = session; + _exchangeName = binding.getExchangeName(); + _exchangeClass = binding.getExchangeClass(); + _name = binding.getDestinationName(); + // _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE)); + boolean isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE)); + boolean isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE)); + _queueName = binding.getQueueName(); + // create this exchange + _session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeClass, null, null, + isDurable ? Option.DURABLE : Option.NO_OPTION, + isAutoDelete ? Option.AUTO_DELETE : Option.NO_OPTION); + } + //---- Getters and Setters - + /** * Gets the name of this destination. * @@ -84,5 +123,20 @@ public class DestinationImpl implements Destination return _name; } + // getter methods + public String getQpidQueueName() + { + return _queueName; + } + + public String getExchangeName() + { + return _exchangeName; + } + + public String getExchangeClass() + { + return _exchangeClass; + } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java index 5641869e3e..973fb23332 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java @@ -17,8 +17,6 @@ */ package org.apache.qpid.nclient.jms; -//import org.apache.qpid.nclient.api.MessageReceiver; - import org.apache.qpid.nclient.jms.message.QpidMessage; import org.apache.qpid.nclient.jms.filter.JMSSelectorFilter; import org.apache.qpid.nclient.jms.filter.MessageFilter; @@ -27,6 +25,7 @@ import org.apache.qpid.nclient.MessagePartListener; import org.apache.qpidity.Range; import org.apache.qpidity.QpidException; import org.apache.qpidity.Option; +import org.apache.qpidity.exchange.ExchangeDefaults; import javax.jms.*; @@ -120,15 +119,16 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer _noLocal = noLocal; _subscriptionName = subscriptionName; _isStopped = getSession().isStopped(); + // let's create a message part assembler + /** + * A Qpid message listener that pushes messages to this consumer session when this consumer is + * asynchronous or directly to this consumer when it is synchronously accessed. + */ + MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this)); + if (destination instanceof Queue) { // this is a queue we expect that this queue exists - // let's create a message part assembler - /** - * A Qpid message listener that pushes messages to this consumer session when this consumer is - * asynchronous or directly to this consumer when it is synchronously accessed. - */ - MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this)); getSession().getQpidSession() .messageSubscribe(destination.getName(), getMessageActorID(), org.apache.qpid.nclient.Session.CONFIRM_MODE_NOT_REQUIRED, @@ -144,25 +144,44 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { // this is a topic we need to create a temporary queue for this consumer // unless this is a durable subscriber + String queueName; if (subscriptionName != null) { // this ia a durable subscriber // create a persistent queue for this subscriber - // getSession().getQpidSession().queueDeclare(destination.getName()); + queueName = "topic-" + subscriptionName; + getSession().getQpidSession() + .queueDeclare(queueName, null, null, Option.EXCLUSIVE, Option.DURABLE); } else { // this is a non durable subscriber // create a temporary queue - + queueName = "topic-" + getMessageActorID(); + getSession().getQpidSession() + .queueDeclare(queueName, null, null, Option.AUTO_DELETE, Option.EXCLUSIVE); } + // bind this queue with the topic exchange + getSession().getQpidSession() + .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getName(), null); + // subscribe to this topic + getSession().getQpidSession() + .messageSubscribe(queueName, getMessageActorID(), + org.apache.qpid.nclient.Session.CONFIRM_MODE_NOT_REQUIRED, + // We always acquire the messages + org.apache.qpid.nclient.Session.ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, + _noLocal ? Option.NO_LOCAL : Option.NO_OPTION, + // Request exclusive subscription access, meaning only this subscription + // can access the queue. + Option.EXCLUSIVE); + } // set the flow mode getSession().getQpidSession() .messageFlowMode(getMessageActorID(), org.apache.qpid.nclient.Session.MESSAGE_FLOW_MODE_CREDIT); } - //----- Message consumer API + //----- Message consumer API /** * Gets this MessageConsumer's message selector. * @@ -426,7 +445,14 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { messageOk = _filter.matches(message.getJMSMessage()); } - // right now we need to acquire this message if needed + if (!messageOk && _preAcquire) + { + // this is the case for topics + // We need to ack this message + acknowledgeMessage(message); + } + // now we need to acquire this message if needed + // this is the case of queue with a message selector set if (!_preAcquire && messageOk) { messageOk = acquireMessage(message); @@ -569,4 +595,19 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer } return result; } + + /** + * Acknowledge a message + * + * @param message The message to be acknowledged + * @throws QpidException If the message cannot be acquired due to some internal error. + */ + private void acknowledgeMessage(QpidMessage message) throws QpidException + { + if (!_preAcquire) + { + Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + getSession().getQpidSession().messageAcknowledge(range); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java index 454762b3e3..9eeadcac68 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java @@ -17,96 +17,300 @@ */ package org.apache.qpid.nclient.jms; -import javax.jms.MessageProducer; -import javax.jms.JMSException; -import javax.jms.Destination; -import javax.jms.Message; +import javax.jms.*; /** - * Implements MessageProducer + * Implements MessageProducer */ public class MessageProducerImpl extends MessageActor implements MessageProducer { + /** + * If true, messages will not get a timestamp. + */ + private boolean _disableTimestamps = false; + /** + * Priority of messages created by this producer. + */ + private int _messagePriority = Message.DEFAULT_PRIORITY; + + /** + * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. + */ + private long _timeToLive; + + /** + * Delivery mode used for this producer. + */ + private int _deliveryMode = DeliveryMode.PERSISTENT; + + /** + * Speicify whether the messageID is disable + */ + private boolean _disableMessageId = false; + + //-- constructors public MessageProducerImpl(SessionImpl session, DestinationImpl destination) { super(session, destination); } - // Interface javax.jms.MessageProducer - - public void setDisableMessageID(boolean b) throws JMSException + //--- Interface javax.jms.MessageProducer + /** + * Sets whether message IDs are disabled. + * + * @param value Specify whether the MessageID must be disabled + * @throws JMSException If disabling messageID fails due to some internal error. + */ + public void setDisableMessageID(boolean value) throws JMSException { - //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + _disableMessageId = value; } + /** + * Gets an indication of whether message IDs are disabled. + * + * @return true is messageID is disabled, false otherwise + * @throws JMSException If getting whether messagID is disabled fails due to some internal error. + */ public boolean getDisableMessageID() throws JMSException { - return false; //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + return _disableMessageId; } - public void setDisableMessageTimestamp(boolean b) throws JMSException + /** + * Sets whether message timestamps are disabled. + * <P> JMS spec says: + * <p> Since timestamps take some effort to create and increase a + * message's size, some JMS providers may be able to optimize message + * overhead if they are given a hint that the timestamp is not used by an + * application.... + * these messages must have the timestamp set to zero; if the provider + * ignores the hint, the timestamp must be set to its normal value. + * <p>Message timestamps are enabled by default. + * + * @param value Indicates if message timestamps are disabled + * @throws JMSException if disabling the timestamps fails due to some internal error. + */ + public void setDisableMessageTimestamp(boolean value) throws JMSException { - //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + _disableTimestamps = value; } + /** + * Gets an indication of whether message timestamps are disabled. + * + * @return an indication of whether message timestamps are disabled + * @throws JMSException if getting whether timestamps are disabled fails due to some internal error. + */ public boolean getDisableMessageTimestamp() throws JMSException { - return false; //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + return _disableTimestamps; } - public void setDeliveryMode(int i) throws JMSException + /** + * Sets the producer's default delivery mode. + * <p> JMS specification says: + * <p>Delivery mode is set to {@link DeliveryMode#PERSISTENT} by default. + * + * @param deliveryMode The message delivery mode for this message producer; legal + * values are {@link DeliveryMode#NON_PERSISTENT} + * and {@link DeliveryMode#PERSISTENT}. + * @throws JMSException if setting the delivery mode fails due to some internal error. + */ + public void setDeliveryMode(int deliveryMode) throws JMSException { - //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + if ((deliveryMode != DeliveryMode.NON_PERSISTENT) && (deliveryMode != DeliveryMode.PERSISTENT)) + { + throw new JMSException( + "DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + deliveryMode + " is illegal"); + } + _deliveryMode = deliveryMode; } + /** + * Gets the producer's delivery mode. + * + * @return The message delivery mode for this message producer + * @throws JMSException If getting the delivery mode fails due to some internal error. + */ public int getDeliveryMode() throws JMSException { - return 0; //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + return _deliveryMode; } - public void setPriority(int i) throws JMSException + /** + * Sets the producer's message priority. + * <p> The jms spec says: + * <p> The JMS API defines ten levels of priority value, with 0 as the + * lowest priority and 9 as the highest. Clients should consider priorities + * 0-4 as gradations of normal priority and priorities 5-9 as gradations + * of expedited priority. + * <p> Priority is set to 4 by default. + * + * @param priority The message priority for this message producer; must be a value between 0 and 9 + * @throws JMSException if setting this producer priority fails due to some internal error. + */ + public void setPriority(int priority) throws JMSException { - //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + if ((priority < 0) || (priority > 9)) + { + throw new IllegalArgumentException( + "Priority of " + priority + " is illegal. Value must be in range 0 to 9"); + } + _messagePriority = priority; } + /** + * Gets the producer's message priority. + * + * @return The message priority for this message producer. + * @throws JMSException If getting this producer message priority fails due to some internal error. + */ public int getPriority() throws JMSException { - return 0; //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + return _messagePriority; } - public void setTimeToLive(long l) throws JMSException + /** + * Sets the default length of time in milliseconds from its dispatch time + * that a produced message should be retained by the message system. + * <p> The JMS spec says that time to live must be set to zero by default. + * + * @param timeToLive The message time to live in milliseconds; zero is unlimited + * @throws JMSException If setting the default time to live fails due to some internal error. + */ + public void setTimeToLive(long timeToLive) throws JMSException { - //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + if (timeToLive < 0) + { + throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive); + } + _timeToLive = timeToLive; } + /** + * Gets the default length of time in milliseconds from its dispatch time + * that a produced message should be retained by the message system. + * + * @return The default message time to live in milliseconds; zero is unlimited + * @throws JMSException if getting the default time to live fails due to some internal error. + * @see javax.jms.MessageProducer#setTimeToLive + */ public long getTimeToLive() throws JMSException { - return 0; //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + return _timeToLive; } + /** + * Gets the destination associated with this producer. + * + * @return This producer's destination. + * @throws JMSException If getting the destination for this producer fails + * due to some internal error. + */ public Destination getDestination() throws JMSException { - return null; //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + return _destination; } + /** + * Sends a message using the producer's default delivery mode, priority, destination + * and time to live. + * + * @param message the message to be sent + * @throws JMSException If sending the message fails due to some internal error. + * @throws MessageFormatException If an invalid message is specified. + * @throws InvalidDestinationException If this producer destination is invalid. + * @throws java.lang.UnsupportedOperationException + * If a client uses this method with a producer that did + * not specify a destination at creation time. + */ public void send(Message message) throws JMSException { - //To change body of implemented methods use File | Settings | File Templates. + send(message, _deliveryMode, _messagePriority, _timeToLive); } - public void send(Message message, int i, int i1, long l) throws JMSException + /** + * Sends a message to this producer default destination, specifying delivery mode, + * priority, and time to live. + * + * @param message The message to send + * @param deliveryMode The delivery mode to use + * @param priority The priority for this message + * @param timeToLive The message's lifetime (in milliseconds) + * @throws JMSException If sending the message fails due to some internal error. + * @throws MessageFormatException If an invalid message is specified. + * @throws InvalidDestinationException If this producer's destination is invalid. + * @throws java.lang.UnsupportedOperationException + * If a client uses this method with a producer that did + * not specify a destination at creation time. + */ + public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { - //To change body of implemented methods use File | Settings | File Templates. + send(_destination, message, deliveryMode, priority, timeToLive); } + /** + * Sends a message to a specified destination using this producer's default + * delivery mode, priority and time to live. + * <p/> + * <P>Typically, a message producer is assigned a destination at creation + * time; however, the JMS API also supports unidentified message producers, + * which require that the destination be supplied every time a message is + * sent. + * + * @param destination The destination to send this message to + * @param message The message to send + * @throws JMSException If sending the message fails due to some internal error. + * @throws MessageFormatException If an invalid message is specified. + * @throws InvalidDestinationException If an invalid destination is specified. + */ public void send(Destination destination, Message message) throws JMSException { - //To change body of implemented methods use File | Settings | File Templates. + send(destination, message, _deliveryMode, _messagePriority, _timeToLive); } - public void send(Destination destination, Message message, int i, int i1, long l) throws JMSException + /** + * Sends a message to a destination specifying delivery mode, priority and time to live. + * + * @param destination The destination to send this message to. + * @param message The message to be sent. + * @param deliveryMode The delivery mode to use. + * @param priority The priority for this message. + * @param timeToLive The message's lifetime (in milliseconds) + * @throws JMSException If sending the message fails due to some internal error. + * @throws MessageFormatException If an invalid message is specified. + * @throws InvalidDestinationException If an invalid destination is specified. + */ + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) + throws JMSException { - //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + getSession().checkDestination(destination); + // Do not allow negative timeToLive values + if (timeToLive < 0) + { + throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive); + } + // check that the message is not a foreign one + + // set the properties + + // + + // dispatch it + // todo getSession().getQpidSession().messageTransfer(((DestinationImpl) destination).getExchangeName(), message, Option); } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java index 9120173fd9..6dcdde1728 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java @@ -17,6 +17,11 @@ */ package org.apache.qpid.nclient.jms; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Option; +import org.apache.qpidity.url.BindingURL; +import org.apache.qpidity.exchange.ExchangeDefaults; + import javax.jms.Queue; import javax.jms.JMSException; @@ -32,15 +37,32 @@ public class QueueImpl extends DestinationImpl implements Queue * * @param name The name of this queue. * @param session The session used to create this queue. - * @throws JMSException If the queue name is not valid + * @throws QpidException If the queue name is not valid */ - protected QueueImpl(SessionImpl session, String name) throws JMSException + protected QueueImpl(SessionImpl session, String name) throws QpidException { super(session, name); + _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + _exchangeClass = ExchangeDefaults.FANOUT_EXCHANGE_CLASS; + _queueName = name; + // check that this queue exist on the server + // As pasive is set the server will not create the queue. + session.getQpidSession().queueDeclare(name, null, null, Option.PASSIVE); } - //---- Interface javax.jms.Queue + /** + * Create a destiantion from a binding URL + * + * @param session The session used to create this queue. + * @param binding The URL + * @throws QpidException If the URL is not valid + */ + protected QueueImpl(SessionImpl session, BindingURL binding) throws QpidException + { + super(session, binding); + } + //---- Interface javax.jms.Queue /** * Gets the name of this queue. * diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java index 22136814a1..dc9ad144a7 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java @@ -569,7 +569,7 @@ public class SessionImpl implements Session { checkNotClosed(); checkDestination(destination); - MessageConsumerImpl consumer = null; + MessageConsumerImpl consumer; try { consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null); @@ -602,7 +602,16 @@ public class SessionImpl implements Session public Queue createQueue(String queueName) throws JMSException { checkNotClosed(); - return new QueueImpl(this, queueName); + Queue result; + try + { + result = new QueueImpl(this, queueName); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return result; } /** @@ -624,7 +633,16 @@ public class SessionImpl implements Session public Topic createTopic(String topicName) throws JMSException { checkNotClosed(); - return new TopicImpl(this, topicName); + Topic result; + try + { + result = new TopicImpl(this, topicName); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return result; } /** @@ -713,25 +731,43 @@ public class SessionImpl implements Session } /** - * Create a TemporaryQueue. Its lifetime will be tha of the Connection unless it is deleted earlier. + * Create a TemporaryQueue. Its lifetime will be the Connection unless it is deleted earlier. * * @return A temporary queue. * @throws JMSException If creating the temporary queue fails due to some internal error. */ public TemporaryQueue createTemporaryQueue() throws JMSException { - return new TemporaryQueueImpl(this); + TemporaryQueue result; + try + { + result = new TemporaryQueueImpl(this); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return result; } /** - * Create a TemporaryTopic. Its lifetime will be tha of the Connection unless it is deleted earlier. + * Create a TemporaryTopic. Its lifetime will be the Connection unless it is deleted earlier. * * @return A temporary topic. * @throws JMSException If creating the temporary topic fails due to some internal error. */ public TemporaryTopic createTemporaryTopic() throws JMSException { - return new TemporaryTopicImpl(this); + TemporaryTopic result; + try + { + result = new TemporaryTopicImpl(this); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return result; } /** diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java index b33ab0d990..50130cee55 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java @@ -17,13 +17,18 @@ */ package org.apache.qpid.nclient.jms; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Option; +import org.apache.qpidity.exchange.ExchangeDefaults; + import javax.jms.TemporaryQueue; import javax.jms.JMSException; +import java.util.UUID; /** * Implements TemporaryQueue */ -public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue, TemporaryDestination +public class TemporaryQueueImpl extends DestinationImpl implements TemporaryQueue, TemporaryDestination { /** * Indicates whether this temporary queue is deleted. @@ -32,16 +37,23 @@ public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue, Tem //--- constructor - /** + /** * Create a new TemporaryQueueImpl with a given name. * * @param session The session used to create this TemporaryQueueImpl. - * @throws JMSException If creating the TemporaryQueueImpl fails due to some error. + * @throws QpidException If creating the TemporaryQueueImpl fails due to some error. */ - public TemporaryQueueImpl(SessionImpl session) throws JMSException + protected TemporaryQueueImpl(SessionImpl session) throws QpidException { - // temporary destinations do not have names and are not registered in the JNDI namespace. + // temporary destinations do not have names super(session, "NAME_NOT_SET"); + _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + _exchangeClass = ExchangeDefaults.FANOUT_EXCHANGE_CLASS; + _queueName = "TempQueue-" + UUID.randomUUID(); + // check that this queue exist on the server + // As pasive is set the server will not create the queue. + session.getQpidSession().queueDeclare(_queueName, null, null, Option.AUTO_DELETE); + session.getQpidSession().queueBind(_queueName, _exchangeName, _queueName, null); } //-- TemporaryDestination Interface @@ -59,11 +71,22 @@ public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue, Tem /** * Delete this temporary destinaiton * - * @throws JMSException If deleting this temporary queue fails due to some error. + * @throws JMSException If deleting this temporary queue fails due to some error. */ public void delete() throws JMSException { // todo delete this temporary queue _isDeleted = true; } + + //---- Interface javax.jms.Queue + /** + * Gets the name of this queue. + * + * @return This queue's name. + */ + public String getQueueName() throws JMSException + { + return super.getName(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java index 11deba8361..1dfb082557 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java @@ -17,6 +17,8 @@ */ package org.apache.qpid.nclient.jms; +import org.apache.qpidity.QpidException; + import javax.jms.TemporaryTopic; import javax.jms.JMSException; @@ -36,9 +38,9 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic, Tem * Create a new TemporaryTopicImpl with a given name. * * @param session The session used to create this TemporaryTopicImpl. - * @throws JMSException If creating the TemporaryTopicImpl fails due to some error. + * @throws QpidException If creating the TemporaryTopicImpl fails due to some error. */ - public TemporaryTopicImpl(SessionImpl session) throws JMSException + protected TemporaryTopicImpl(SessionImpl session) throws QpidException { // temporary destinations do not have names. super(session, "NAME_NOT_SET"); diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java index 25c2afa4e7..52875ab0d5 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java @@ -17,8 +17,11 @@ */ package org.apache.qpid.nclient.jms; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.exchange.ExchangeDefaults; +import org.apache.qpidity.url.BindingURL; + import javax.jms.Topic; -import javax.jms.JMSException; /** * Implementation of the javax.jms.Topic interface. @@ -29,16 +32,30 @@ public class TopicImpl extends DestinationImpl implements Topic /** * Create a new TopicImpl with a given name. * - * @param name The name of this topic + * @param name The name of this topic * @param session The session used to create this queue. - * @throws JMSException If the topic name is not valid + * @throws QpidException If the topic name is not valid */ - public TopicImpl(SessionImpl session, String name) throws JMSException + public TopicImpl(SessionImpl session, String name) throws QpidException { super(session, name); + _exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; + _exchangeClass = ExchangeDefaults.TOPIC_EXCHANGE_CLASS; + } + + /** + * Create a TopicImpl from a binding URL + * + * @param session The session used to create this Topic. + * @param binding The URL + * @throws QpidException If the URL is not valid + */ + protected TopicImpl(SessionImpl session, BindingURL binding) throws QpidException + { + super(session, binding); } - //--- javax.jsm.Topic Interface + //--- javax.jsm.Topic Interface /** * Gets the name of this topic. * diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java index 5ea830b2cc..03caf16520 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java @@ -33,17 +33,14 @@ import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLSyntaxException; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; +import javax.jms.*; import java.util.Collections; import java.util.Enumeration; import java.util.Map; import java.util.UUID; -public abstract class AbstractJMSMessage extends QpidMessage implements org.apache.qpid.jms.Message +public abstract class AbstractJMSMessage extends QpidMessage implements Message { private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap()); diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java index 02a43b0414..9b8222cf7d 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java @@ -145,6 +145,7 @@ public class QpidMessage //todo return new Long(1); } + } diff --git a/java/common/src/main/java/org/apache/qpidity/url/AMQBindingURL.java b/java/common/src/main/java/org/apache/qpidity/url/AMQBindingURL.java new file mode 100644 index 0000000000..0edf9ac21b --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/url/AMQBindingURL.java @@ -0,0 +1,261 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.url; + +import org.apache.qpidity.exchange.ExchangeDefaults; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +import java.util.HashMap; +import java.net.URI; +import java.net.URISyntaxException; + +public class AMQBindingURL implements BindingURL +{ + private static final Logger _logger = LoggerFactory.getLogger(AMQBindingURL.class); + + String _url; + String _exchangeClass; + String _exchangeName; + String _destinationName; + String _queueName; + private HashMap<String, String> _options; + + public AMQBindingURL(String url) throws URLSyntaxException + { + // format: + // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* + if (_logger.isDebugEnabled()) + { + _logger.debug("Parsing URL: " + url); + } + _url = url; + _options = new HashMap<String, String>(); + parseBindingURL(); + } + + private void parseBindingURL() throws URLSyntaxException + { + try + { + URI connection = new URI(_url); + String exchangeClass = connection.getScheme(); + if (exchangeClass == null) + { + _url = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + "://" + "" + "//" + _url; + // URLHelper.parseError(-1, "Exchange Class not specified.", _url); + parseBindingURL(); + return; + } + else + { + setExchangeClass(exchangeClass); + } + String exchangeName = connection.getHost(); + if (exchangeName == null) + { + if (getExchangeClass().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) + { + setExchangeName(""); + } + else + { + throw URLHelper.parseError(-1, "Exchange Name not specified.", _url); + } + } + else + { + setExchangeName(exchangeName); + } + String queueName; + if ((connection.getPath() == null) || connection.getPath().equals("")) + { + throw URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(), + "Destination or Queue requried", _url); + } + else + { + int slash = connection.getPath().indexOf("/", 1); + if (slash == -1) + { + throw URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(), + "Destination requried", _url); + } + else + { + String path = connection.getPath(); + setDestinationName(path.substring(1, slash)); + + // We don't set queueName yet as the actual value we use depends on options set + // when we are dealing with durable subscriptions + + queueName = path.substring(slash + 1); + + } + } + + URLHelper.parseOptions(_options, connection.getQuery()); + processOptions(); + // We can now call setQueueName as the URL is full parsed. + setQueueName(queueName); + // Fragment is #string (not used) + if (_logger.isDebugEnabled()) + { + _logger.debug("URL Parsed: " + this); + } + } + catch (URISyntaxException uris) + { + throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); + } + } + + + private void processOptions() + { + // this is where we would parse any options that needed more than just storage. + } + + public String getURL() + { + return _url; + } + + public String getExchangeClass() + { + return _exchangeClass; + } + + private void setExchangeClass(String exchangeClass) + { + + _exchangeClass = exchangeClass; + if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) + { + setOption(BindingURL.OPTION_EXCLUSIVE, "true"); + } + + } + + public String getExchangeName() + { + return _exchangeName; + } + + private void setExchangeName(String name) + { + _exchangeName = name; + } + + public String getDestinationName() + { + return _destinationName; + } + + private void setDestinationName(String name) + { + _destinationName = name; + } + + public String getQueueName() + { + return _queueName; + } + + public void setQueueName(String name) throws URLSyntaxException + { + if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) + { + if (Boolean.parseBoolean(getOption(OPTION_DURABLE))) + { + if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION)) + { + _queueName = getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION); + } + else + { + throw URLHelper.parseError(-1, + "Durable subscription must have values for " + BindingURL.OPTION_CLIENTID + " and " + BindingURL.OPTION_SUBSCRIPTION + ".", + _url); + + } + } + else + { + _queueName = null; + } + } + else + { + _queueName = name; + } + + } + + public String getOption(String key) + { + return _options.get(key); + } + + public void setOption(String key, String value) + { + _options.put(key, value); + } + + public boolean containsOption(String key) + { + return _options.containsKey(key); + } + + public String getRoutingKey() + { + if (_exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) + { + return getQueueName(); + } + + if (containsOption(BindingURL.OPTION_ROUTING_KEY)) + { + return getOption(OPTION_ROUTING_KEY); + } + + return getDestinationName(); + } + + public void setRoutingKey(String key) + { + setOption(OPTION_ROUTING_KEY, key); + } + + public String toString() + { + StringBuffer sb = new StringBuffer(); + + sb.append(_exchangeClass); + sb.append("://"); + sb.append(_exchangeName); + sb.append('/'); + sb.append(_destinationName); + sb.append('/'); + sb.append(_queueName); + + sb.append(URLHelper.printOptions(_options)); + + return sb.toString(); + } +} diff --git a/java/common/src/main/java/org/apache/qpidity/url/BindingURL.java b/java/common/src/main/java/org/apache/qpidity/url/BindingURL.java new file mode 100644 index 0000000000..a795b05b22 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/url/BindingURL.java @@ -0,0 +1,53 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.url; + +import org.apache.qpid.framing.AMQShortString; + +/* + Binding URL format: + <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* +*/ +public interface BindingURL +{ + public static final String OPTION_EXCLUSIVE = "exclusive"; + public static final String OPTION_AUTODELETE = "autodelete"; + public static final String OPTION_DURABLE = "durable"; + public static final String OPTION_CLIENTID = "clientid"; + public static final String OPTION_SUBSCRIPTION = "subscription"; + public static final String OPTION_ROUTING_KEY = "routingkey"; + + + String getURL(); + + String getExchangeClass(); + + String getExchangeName(); + + String getDestinationName(); + + String getQueueName(); + + String getOption(String key); + + boolean containsOption(String key); + + String getRoutingKey(); + + String toString(); +} diff --git a/java/common/src/main/java/org/apache/qpidity/url/URLHelper.java b/java/common/src/main/java/org/apache/qpidity/url/URLHelper.java new file mode 100644 index 0000000000..eba1b0bbeb --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/url/URLHelper.java @@ -0,0 +1,169 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.url; + +import java.util.HashMap; + +public class URLHelper +{ + public static char DEFAULT_OPTION_SEPERATOR = '&'; + public static char ALTERNATIVE_OPTION_SEPARATOR = ','; + public static char BROKER_SEPARATOR = ';'; + + public static void parseOptions(HashMap<String, String> optionMap, String options) throws URLSyntaxException + { + // options looks like this + // brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value'' + + if ((options == null) || (options.indexOf('=') == -1)) + { + return; + } + + int optionIndex = options.indexOf('='); + + String option = options.substring(0, optionIndex); + + int length = options.length(); + + int nestedQuotes = 0; + + // to store index of final "'" + int valueIndex = optionIndex; + + // Walk remainder of url. + while ((nestedQuotes > 0) || (valueIndex < length)) + { + valueIndex++; + + if (valueIndex >= length) + { + break; + } + + if (options.charAt(valueIndex) == '\'') + { + if ((valueIndex + 1) < options.length()) + { + if ((options.charAt(valueIndex + 1) == DEFAULT_OPTION_SEPERATOR) + || (options.charAt(valueIndex + 1) == ALTERNATIVE_OPTION_SEPARATOR) + || (options.charAt(valueIndex + 1) == BROKER_SEPARATOR) + || (options.charAt(valueIndex + 1) == '\'')) + { + nestedQuotes--; + + if (nestedQuotes == 0) + { + // We've found the value of an option + break; + } + } + else + { + nestedQuotes++; + } + } + else + { + // We are at the end of the string + // Check to see if we are corectly closing quotes + if (options.charAt(valueIndex) == '\'') + { + nestedQuotes--; + } + + break; + } + } + } + + if ((nestedQuotes != 0) || (valueIndex < (optionIndex + 2))) + { + int sepIndex = 0; + + // Try and identify illegal separator character + if (nestedQuotes > 1) + { + for (int i = 0; i < nestedQuotes; i++) + { + sepIndex = options.indexOf('\'', sepIndex); + sepIndex++; + } + } + + if ((sepIndex >= options.length()) || (sepIndex == 0)) + { + throw parseError(valueIndex, "Unterminated option", options); + } + else + { + throw parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" + + options.charAt(sepIndex) + "'", options); + } + } + + // optionIndex +2 to skip "='" + String value = options.substring(optionIndex + 2, valueIndex); + + optionMap.put(option, value); + + if (valueIndex < (options.length() - 1)) + { + // Recurse to get remaining options + parseOptions(optionMap, options.substring(valueIndex + 2)); + } + } + + public static URLSyntaxException parseError(int index, String error, String url) + { + return parseError(index, 1, error, url); + } + + public static URLSyntaxException parseError(int index, int length, String error, String url) + { + return new URLSyntaxException(url, error, index, length); + } + + public static String printOptions(HashMap<String, String> options) + { + if (options.isEmpty()) + { + return ""; + } + else + { + StringBuffer sb = new StringBuffer(); + sb.append('?'); + for (String key : options.keySet()) + { + sb.append(key); + + sb.append("='"); + + sb.append(options.get(key)); + + sb.append("'"); + sb.append(DEFAULT_OPTION_SEPERATOR); + } + + sb.deleteCharAt(sb.length() - 1); + + return sb.toString(); + } + } +} diff --git a/java/common/src/main/java/org/apache/qpidity/url/URLSyntaxException.java b/java/common/src/main/java/org/apache/qpidity/url/URLSyntaxException.java new file mode 100644 index 0000000000..bc65b70d14 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/url/URLSyntaxException.java @@ -0,0 +1,94 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.url; + +import java.net.URISyntaxException; + +public class URLSyntaxException extends URISyntaxException +{ + private int _length; + + public URLSyntaxException(String url, String error, int index, int length) + { + super(url, error, index); + + _length = length; + } + + private static String getPositionString(int index, int length) + { + StringBuffer sb = new StringBuffer(index + 1); + + for (int i = 0; i < index; i++) + { + sb.append(" "); + } + + if (length > -1) + { + for (int i = 0; i < length; i++) + { + sb.append('^'); + } + } + + return sb.toString(); + } + + + public String toString() + { + StringBuffer sb = new StringBuffer(); + + sb.append(getReason()); + + if (getIndex() > -1) + { + if (_length != -1) + { + sb.append(" between indicies "); + sb.append(getIndex()); + sb.append(" and "); + sb.append(_length); + } + else + { + sb.append(" at index "); + sb.append(getIndex()); + } + } + + sb.append(" "); + if (getIndex() != -1) + { + sb.append("\n"); + } + + sb.append(getInput()); + + if (getIndex() != -1) + { + sb.append("\n"); + sb.append(getPositionString(getIndex(), _length)); + } + + return sb.toString(); + } + + +} |