diff options
author | Robert Greig <rgreig@apache.org> | 2006-12-16 16:03:42 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-12-16 16:03:42 +0000 |
commit | fc20045970761b1057a7f8fe5ef9f40bfc9d0240 (patch) | |
tree | 99716c2c31850d4c4d16b4a737c760e1f410d3af | |
parent | 159db77661f43cbe78bec888019002c9632ad256 (diff) | |
download | qpid-python-fc20045970761b1057a7f8fe5ef9f40bfc9d0240.tar.gz |
QPID-205 : Do not allow subscription to temporary topics created on a different session.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487821 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 182 insertions, 74 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 80502cccf1..4dc6b5f914 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -885,6 +885,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi final String selector, final FieldTable rawSelector) throws JMSException { + checkTemporaryDestination(destination); + return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport() { public Object operation() throws JMSException @@ -929,6 +931,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi }.execute(_connection); } + private void checkTemporaryDestination(Destination destination) + throws JMSException + { + if((destination instanceof TemporaryDestination)) + { + _logger.debug("destination is temporary"); + final TemporaryDestination tempDest = (TemporaryDestination) destination; + if(tempDest.getSession() != this) + { + _logger.debug("destination is on different session"); + throw new JMSException("Cannot consume from a temporary destination created onanother session"); + } + if(tempDest.isDeleted()) + { + _logger.debug("destination is deleted"); + throw new JMSException("Cannot consume from a deleted destination"); + } + } + } + public boolean hasConsumer(Destination destination) { @@ -1497,12 +1519,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /* * I could have combined the last 3 methods, but this way it improves readability */ - private void checkValidTopic(Topic topic) throws InvalidDestinationException + private void checkValidTopic(Topic topic) throws JMSException { if (topic == null) { throw new javax.jms.InvalidDestinationException("Invalid Topic"); } + if((topic instanceof TemporaryDestination) && ((TemporaryDestination)topic).getSession() != this) + { + throw new JMSException("Cannot create a subscription on a temporary topic created in another session"); + } } private void checkValidQueue(Queue queue) throws InvalidDestinationException diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java index 05e3165886..81fee69f90 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java @@ -26,10 +26,12 @@ import javax.jms.TemporaryQueue; /** * AMQ implementation of a TemporaryQueue. */ -final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue +final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination { + private final AMQSession _session; + private boolean _deleted; /** * Create a new instance of an AMQTemporaryQueue @@ -49,10 +51,20 @@ final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue { throw new JMSException("Temporary Queue has consumers so cannot be deleted"); } + _deleted = true; // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted // by the server when there are no more subscriptions to that queue. This is probably not // quite right for JMSCompliance. } + public AMQSession getSession() + { + return _session; + } + + public boolean isDeleted() + { + return _deleted; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java index 122b13cf3b..241a9abc9b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java @@ -26,10 +26,11 @@ import javax.jms.TemporaryTopic; /** * AMQ implementation of TemporaryTopic. */ -class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic +class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic, TemporaryDestination { private final AMQSession _session; + private boolean _deleted; /** * Create new temporary topic. */ @@ -49,9 +50,20 @@ class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic throw new JMSException("Temporary Topic has consumers so cannot be deleted"); } + _deleted = true; // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted // by the server when there are no more subscriptions to that queue. This is probably not // quite right for JMSCompliance. } + public AMQSession getSession() + { + return _session; + } + + public boolean isDeleted() + { + return _deleted; + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 705501363c..e11d70cf41 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -142,7 +142,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setDisableMessageID(boolean b) throws JMSException { - checkPreConditions(); + checkPreConditions(); checkNotClosed(); // IGNORED } @@ -156,7 +156,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setDisableMessageTimestamp(boolean b) throws JMSException { - checkPreConditions(); + checkPreConditions(); _disableTimestamps = b; } @@ -168,11 +168,11 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setDeliveryMode(int i) throws JMSException { - checkPreConditions(); + checkPreConditions(); if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT) { throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i + - " is illegal"); + " is illegal"); } _deliveryMode = i; } @@ -185,7 +185,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setPriority(int i) throws JMSException { - checkPreConditions(); + checkPreConditions(); if (i < 0 || i > 9) { throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9"); @@ -201,7 +201,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setTimeToLive(long l) throws JMSException { - checkPreConditions(); + checkPreConditions(); if (l < 0) { throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l); @@ -229,8 +229,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message) throws JMSException { - checkPreConditions(); - checkInitialDestination(); + checkPreConditions(); + checkInitialDestination(); synchronized (_connection.getFailoverMutex()) @@ -242,8 +242,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message, int deliveryMode) throws JMSException { - checkPreConditions(); - checkInitialDestination(); + checkPreConditions(); + checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { @@ -254,8 +254,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message, int deliveryMode, boolean immediate) throws JMSException { - checkPreConditions(); - checkInitialDestination(); + checkPreConditions(); + checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, @@ -266,8 +266,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { - checkPreConditions(); - checkInitialDestination(); + checkPreConditions(); + checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, @@ -277,8 +277,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Destination destination, Message message) throws JMSException { - checkPreConditions(); - checkDestination(destination); + checkPreConditions(); + checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -291,8 +291,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j int priority, long timeToLive) throws JMSException { - checkPreConditions(); - checkDestination(destination); + checkPreConditions(); + checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -319,8 +319,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j int priority, long timeToLive, boolean mandatory, boolean immediate) throws JMSException { - checkPreConditions(); - checkDestination(destination); + checkPreConditions(); + checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -334,8 +334,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j boolean immediate, boolean waitUntilSent) throws JMSException { - checkPreConditions(); - checkDestination(destination); + checkPreConditions(); + checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -347,7 +347,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException { - if(message instanceof AbstractJMSMessage) + if (message instanceof AbstractJMSMessage) { return (AbstractJMSMessage) message; } @@ -355,7 +355,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { AbstractJMSMessage newMessage; - if(message instanceof BytesMessage) + if (message instanceof BytesMessage) { BytesMessage bytesMessage = (BytesMessage) message; bytesMessage.reset(); @@ -363,41 +363,40 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j JMSBytesMessage nativeMsg = (JMSBytesMessage) _session.createBytesMessage(); - byte[] buf = new byte[1024]; int len; - while((len = bytesMessage.readBytes(buf)) != -1) + while ((len = bytesMessage.readBytes(buf)) != -1) { - nativeMsg.writeBytes(buf,0,len); + nativeMsg.writeBytes(buf, 0, len); } newMessage = nativeMsg; } - else if(message instanceof MapMessage) + else if (message instanceof MapMessage) { MapMessage origMessage = (MapMessage) message; MapMessage nativeMessage = _session.createMapMessage(); Enumeration mapNames = origMessage.getMapNames(); - while(mapNames.hasMoreElements()) + while (mapNames.hasMoreElements()) { String name = (String) mapNames.nextElement(); nativeMessage.setObject(name, origMessage.getObject(name)); } newMessage = (AbstractJMSMessage) nativeMessage; } - else if(message instanceof ObjectMessage) + else if (message instanceof ObjectMessage) { ObjectMessage origMessage = (ObjectMessage) message; ObjectMessage nativeMessage = _session.createObjectMessage(); nativeMessage.setObject(origMessage.getObject()); - + newMessage = (AbstractJMSMessage) nativeMessage; } - else if(message instanceof TextMessage) + else if (message instanceof TextMessage) { TextMessage origMessage = (TextMessage) message; TextMessage nativeMessage = _session.createTextMessage(); @@ -406,7 +405,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j newMessage = (AbstractJMSMessage) nativeMessage; } - else if(message instanceof StreamMessage) + else if (message instanceof StreamMessage) { StreamMessage origMessage = (StreamMessage) message; StreamMessage nativeMessage = _session.createStreamMessage(); @@ -415,7 +414,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j try { origMessage.reset(); - while(true) + while (true) { nativeMessage.writeObject(origMessage.readObject()); } @@ -433,10 +432,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } Enumeration propertyNames = message.getPropertyNames(); - while(propertyNames.hasMoreElements()) + while (propertyNames.hasMoreElements()) { String propertyName = String.valueOf(propertyNames.nextElement()); - if(!propertyName.startsWith("JMSX_")) + if (!propertyName.startsWith("JMSX_")) { Object value = message.getObjectProperty(propertyName); newMessage.setObjectProperty(propertyName, value); @@ -445,28 +444,26 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode()); - + int priority = message.getJMSPriority(); - if(priority < 0) + if (priority < 0) { priority = 0; } - else if(priority > 9) + else if (priority > 9) { priority = 9; } newMessage.setJMSPriority(priority); - if(message.getJMSReplyTo() != null) + if (message.getJMSReplyTo() != null) { newMessage.setJMSReplyTo(message.getJMSReplyTo()); } newMessage.setJMSType(message.getJMSType()); - - - if(newMessage != null) + if (newMessage != null) { return newMessage; } @@ -478,15 +475,14 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } - private void validateDestination(Destination destination) throws JMSException { if (!(destination instanceof AMQDestination)) { throw new JMSException("Unsupported destination class: " + - (destination != null ? destination.getClass() : null)); + (destination != null ? destination.getClass() : null)); } - declareDestination((AMQDestination)destination); + declareDestination((AMQDestination) destination); } protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, @@ -497,6 +493,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j /** * The caller of this method must hold the failover mutex. + * * @param destination * @param origMessage * @param deliveryMode @@ -509,6 +506,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException { + checkTemporaryDestination(destination); AbstractJMSMessage message = convertToNativeMessage(origMessage); AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(), @@ -568,7 +566,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j _protocolHandler.writeFrame(compositeFrame, wait); - if(message != origMessage) + if (message != origMessage) { _logger.warn("Updating original message"); origMessage.setJMSPriority(message.getJMSPriority()); @@ -579,9 +577,29 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } } + private void checkTemporaryDestination(AMQDestination destination) throws JMSException + { + if(destination instanceof TemporaryDestination) + { + _logger.debug("destination is temporary destination"); + TemporaryDestination tempDest = (TemporaryDestination) destination; + if(tempDest.getSession().isClosed()) + { + _logger.debug("session is closed"); + throw new JMSException("Session for temporary destination has been closed"); + } + if(tempDest.isDeleted()) + { + _logger.debug("destination is deleted"); + throw new JMSException("Cannot send to a deleted temporary destination"); + } + } + } + /** * Create content bodies. This will split a large message into numerous bodies depending on the negotiated * maximum frame size. + * * @param payload * @return the array of content bodies */ @@ -611,8 +629,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j for (int i = 0; i < bodies.length; i++) { bodies[i] = new ContentBody(); - payload.position((int)framePayloadMax * i); - int length = (remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining; + payload.position((int) framePayloadMax * i); + int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; payload.limit(payload.position() + length); bodies[i].payload = payload.slice(); remaining -= length; @@ -633,32 +651,42 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j _encoding = encoding; } - private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException { - checkNotClosed(); + private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException + { + checkNotClosed(); - if(_session == null || _session.isClosed()){ - throw new javax.jms.IllegalStateException("Invalid Session"); - } - } + if (_session == null || _session.isClosed()) + { + throw new javax.jms.IllegalStateException("Invalid Session"); + } + } - private void checkInitialDestination(){ - if(_destination == null){ - throw new UnsupportedOperationException("Destination is null"); - } - } + private void checkInitialDestination() + { + if (_destination == null) + { + throw new UnsupportedOperationException("Destination is null"); + } + } + + private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException + { + if (_destination != null && suppliedDestination != null) + { + throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); + } + + if (suppliedDestination == null) + { + throw new InvalidDestinationException("Supplied Destination was invalid"); + } - private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException{ - if (_destination != null && suppliedDestination != null){ - throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); - } - if (suppliedDestination == null){ - throw new InvalidDestinationException("Supplied Destination was invalid"); - } - } + } - public AMQSession getSession() { - return _session; - } + public AMQSession getSession() + { + return _session; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java b/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java new file mode 100644 index 0000000000..8c11672a65 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java @@ -0,0 +1,17 @@ +package org.apache.qpid.client;
+
+import javax.jms.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Provides support for covenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
+ * so that operations related to their "temporary-ness" can be abstracted out.
+ */
+interface TemporaryDestination extends Destination
+{
+
+ public void delete() throws JMSException;
+ public AMQSession getSession();
+ public boolean isDeleted();
+
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 80de66735c..026ef2e614 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -235,6 +235,19 @@ public class TopicSessionTest extends TestCase fail("Unexpected Exception: " + je.getMessage()); } + TopicSession session2 = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + try + { + MessageConsumer consumer2 = session2.createConsumer(topic); + fail("Expected a JMSException when subscribing to a temporary topic created on adifferent session"); + } + catch (JMSException je) + { + ; // pass + } + + + conn.close(); } |