summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-16 16:03:42 +0000
committerRobert Greig <rgreig@apache.org>2006-12-16 16:03:42 +0000
commitfc20045970761b1057a7f8fe5ef9f40bfc9d0240 (patch)
tree99716c2c31850d4c4d16b4a737c760e1f410d3af
parent159db77661f43cbe78bec888019002c9632ad256 (diff)
downloadqpid-python-fc20045970761b1057a7f8fe5ef9f40bfc9d0240.tar.gz
QPID-205 : Do not allow subscription to temporary topics created on a different session.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487821 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java28
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java170
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java17
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java13
6 files changed, 182 insertions, 74 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 80502cccf1..4dc6b5f914 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -885,6 +885,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
final String selector,
final FieldTable rawSelector) throws JMSException
{
+ checkTemporaryDestination(destination);
+
return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport()
{
public Object operation() throws JMSException
@@ -929,6 +931,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}.execute(_connection);
}
+ private void checkTemporaryDestination(Destination destination)
+ throws JMSException
+ {
+ if((destination instanceof TemporaryDestination))
+ {
+ _logger.debug("destination is temporary");
+ final TemporaryDestination tempDest = (TemporaryDestination) destination;
+ if(tempDest.getSession() != this)
+ {
+ _logger.debug("destination is on different session");
+ throw new JMSException("Cannot consume from a temporary destination created onanother session");
+ }
+ if(tempDest.isDeleted())
+ {
+ _logger.debug("destination is deleted");
+ throw new JMSException("Cannot consume from a deleted destination");
+ }
+ }
+ }
+
public boolean hasConsumer(Destination destination)
{
@@ -1497,12 +1519,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
- private void checkValidTopic(Topic topic) throws InvalidDestinationException
+ private void checkValidTopic(Topic topic) throws JMSException
{
if (topic == null)
{
throw new javax.jms.InvalidDestinationException("Invalid Topic");
}
+ if((topic instanceof TemporaryDestination) && ((TemporaryDestination)topic).getSession() != this)
+ {
+ throw new JMSException("Cannot create a subscription on a temporary topic created in another session");
+ }
}
private void checkValidQueue(Queue queue) throws InvalidDestinationException
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
index 05e3165886..81fee69f90 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
@@ -26,10 +26,12 @@ import javax.jms.TemporaryQueue;
/**
* AMQ implementation of a TemporaryQueue.
*/
-final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue
+final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination
{
+
private final AMQSession _session;
+ private boolean _deleted;
/**
* Create a new instance of an AMQTemporaryQueue
@@ -49,10 +51,20 @@ final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue
{
throw new JMSException("Temporary Queue has consumers so cannot be deleted");
}
+ _deleted = true;
// Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
// by the server when there are no more subscriptions to that queue. This is probably not
// quite right for JMSCompliance.
}
+ public AMQSession getSession()
+ {
+ return _session;
+ }
+
+ public boolean isDeleted()
+ {
+ return _deleted;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
index 122b13cf3b..241a9abc9b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
@@ -26,10 +26,11 @@ import javax.jms.TemporaryTopic;
/**
* AMQ implementation of TemporaryTopic.
*/
-class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic
+class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic, TemporaryDestination
{
private final AMQSession _session;
+ private boolean _deleted;
/**
* Create new temporary topic.
*/
@@ -49,9 +50,20 @@ class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic
throw new JMSException("Temporary Topic has consumers so cannot be deleted");
}
+ _deleted = true;
// Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
// by the server when there are no more subscriptions to that queue. This is probably not
// quite right for JMSCompliance.
}
+ public AMQSession getSession()
+ {
+ return _session;
+ }
+
+ public boolean isDeleted()
+ {
+ return _deleted;
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 705501363c..e11d70cf41 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -142,7 +142,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void setDisableMessageID(boolean b) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
checkNotClosed();
// IGNORED
}
@@ -156,7 +156,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void setDisableMessageTimestamp(boolean b) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
_disableTimestamps = b;
}
@@ -168,11 +168,11 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void setDeliveryMode(int i) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT)
{
throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i +
- " is illegal");
+ " is illegal");
}
_deliveryMode = i;
}
@@ -185,7 +185,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void setPriority(int i) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
if (i < 0 || i > 9)
{
throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9");
@@ -201,7 +201,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void setTimeToLive(long l) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
if (l < 0)
{
throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l);
@@ -229,8 +229,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Message message) throws JMSException
{
- checkPreConditions();
- checkInitialDestination();
+ checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
@@ -242,8 +242,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Message message, int deliveryMode) throws JMSException
{
- checkPreConditions();
- checkInitialDestination();
+ checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
@@ -254,8 +254,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Message message, int deliveryMode, boolean immediate) throws JMSException
{
- checkPreConditions();
- checkInitialDestination();
+ checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive,
@@ -266,8 +266,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Message message, int deliveryMode, int priority,
long timeToLive) throws JMSException
{
- checkPreConditions();
- checkInitialDestination();
+ checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory,
@@ -277,8 +277,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Destination destination, Message message) throws JMSException
{
- checkPreConditions();
- checkDestination(destination);
+ checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -291,8 +291,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
int priority, long timeToLive)
throws JMSException
{
- checkPreConditions();
- checkDestination(destination);
+ checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -319,8 +319,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
int priority, long timeToLive, boolean mandatory, boolean immediate)
throws JMSException
{
- checkPreConditions();
- checkDestination(destination);
+ checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -334,8 +334,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
boolean immediate, boolean waitUntilSent)
throws JMSException
{
- checkPreConditions();
- checkDestination(destination);
+ checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -347,7 +347,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
{
- if(message instanceof AbstractJMSMessage)
+ if (message instanceof AbstractJMSMessage)
{
return (AbstractJMSMessage) message;
}
@@ -355,7 +355,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
{
AbstractJMSMessage newMessage;
- if(message instanceof BytesMessage)
+ if (message instanceof BytesMessage)
{
BytesMessage bytesMessage = (BytesMessage) message;
bytesMessage.reset();
@@ -363,41 +363,40 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
JMSBytesMessage nativeMsg = (JMSBytesMessage) _session.createBytesMessage();
-
byte[] buf = new byte[1024];
int len;
- while((len = bytesMessage.readBytes(buf)) != -1)
+ while ((len = bytesMessage.readBytes(buf)) != -1)
{
- nativeMsg.writeBytes(buf,0,len);
+ nativeMsg.writeBytes(buf, 0, len);
}
newMessage = nativeMsg;
}
- else if(message instanceof MapMessage)
+ else if (message instanceof MapMessage)
{
MapMessage origMessage = (MapMessage) message;
MapMessage nativeMessage = _session.createMapMessage();
Enumeration mapNames = origMessage.getMapNames();
- while(mapNames.hasMoreElements())
+ while (mapNames.hasMoreElements())
{
String name = (String) mapNames.nextElement();
nativeMessage.setObject(name, origMessage.getObject(name));
}
newMessage = (AbstractJMSMessage) nativeMessage;
}
- else if(message instanceof ObjectMessage)
+ else if (message instanceof ObjectMessage)
{
ObjectMessage origMessage = (ObjectMessage) message;
ObjectMessage nativeMessage = _session.createObjectMessage();
nativeMessage.setObject(origMessage.getObject());
-
+
newMessage = (AbstractJMSMessage) nativeMessage;
}
- else if(message instanceof TextMessage)
+ else if (message instanceof TextMessage)
{
TextMessage origMessage = (TextMessage) message;
TextMessage nativeMessage = _session.createTextMessage();
@@ -406,7 +405,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
newMessage = (AbstractJMSMessage) nativeMessage;
}
- else if(message instanceof StreamMessage)
+ else if (message instanceof StreamMessage)
{
StreamMessage origMessage = (StreamMessage) message;
StreamMessage nativeMessage = _session.createStreamMessage();
@@ -415,7 +414,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
try
{
origMessage.reset();
- while(true)
+ while (true)
{
nativeMessage.writeObject(origMessage.readObject());
}
@@ -433,10 +432,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
Enumeration propertyNames = message.getPropertyNames();
- while(propertyNames.hasMoreElements())
+ while (propertyNames.hasMoreElements())
{
String propertyName = String.valueOf(propertyNames.nextElement());
- if(!propertyName.startsWith("JMSX_"))
+ if (!propertyName.startsWith("JMSX_"))
{
Object value = message.getObjectProperty(propertyName);
newMessage.setObjectProperty(propertyName, value);
@@ -445,28 +444,26 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
-
+
int priority = message.getJMSPriority();
- if(priority < 0)
+ if (priority < 0)
{
priority = 0;
}
- else if(priority > 9)
+ else if (priority > 9)
{
priority = 9;
}
newMessage.setJMSPriority(priority);
- if(message.getJMSReplyTo() != null)
+ if (message.getJMSReplyTo() != null)
{
newMessage.setJMSReplyTo(message.getJMSReplyTo());
}
newMessage.setJMSType(message.getJMSType());
-
-
- if(newMessage != null)
+ if (newMessage != null)
{
return newMessage;
}
@@ -478,15 +475,14 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
-
private void validateDestination(Destination destination) throws JMSException
{
if (!(destination instanceof AMQDestination))
{
throw new JMSException("Unsupported destination class: " +
- (destination != null ? destination.getClass() : null));
+ (destination != null ? destination.getClass() : null));
}
- declareDestination((AMQDestination)destination);
+ declareDestination((AMQDestination) destination);
}
protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority,
@@ -497,6 +493,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
/**
* The caller of this method must hold the failover mutex.
+ *
* @param destination
* @param origMessage
* @param deliveryMode
@@ -509,6 +506,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority,
long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
{
+ checkTemporaryDestination(destination);
AbstractJMSMessage message = convertToNativeMessage(origMessage);
AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
@@ -568,7 +566,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
_protocolHandler.writeFrame(compositeFrame, wait);
- if(message != origMessage)
+ if (message != origMessage)
{
_logger.warn("Updating original message");
origMessage.setJMSPriority(message.getJMSPriority());
@@ -579,9 +577,29 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
}
+ private void checkTemporaryDestination(AMQDestination destination) throws JMSException
+ {
+ if(destination instanceof TemporaryDestination)
+ {
+ _logger.debug("destination is temporary destination");
+ TemporaryDestination tempDest = (TemporaryDestination) destination;
+ if(tempDest.getSession().isClosed())
+ {
+ _logger.debug("session is closed");
+ throw new JMSException("Session for temporary destination has been closed");
+ }
+ if(tempDest.isDeleted())
+ {
+ _logger.debug("destination is deleted");
+ throw new JMSException("Cannot send to a deleted temporary destination");
+ }
+ }
+ }
+
/**
* Create content bodies. This will split a large message into numerous bodies depending on the negotiated
* maximum frame size.
+ *
* @param payload
* @return the array of content bodies
*/
@@ -611,8 +629,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
for (int i = 0; i < bodies.length; i++)
{
bodies[i] = new ContentBody();
- payload.position((int)framePayloadMax * i);
- int length = (remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining;
+ payload.position((int) framePayloadMax * i);
+ int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
payload.limit(payload.position() + length);
bodies[i].payload = payload.slice();
remaining -= length;
@@ -633,32 +651,42 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
_encoding = encoding;
}
- private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException {
- checkNotClosed();
+ private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException
+ {
+ checkNotClosed();
- if(_session == null || _session.isClosed()){
- throw new javax.jms.IllegalStateException("Invalid Session");
- }
- }
+ if (_session == null || _session.isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Invalid Session");
+ }
+ }
- private void checkInitialDestination(){
- if(_destination == null){
- throw new UnsupportedOperationException("Destination is null");
- }
- }
+ private void checkInitialDestination()
+ {
+ if (_destination == null)
+ {
+ throw new UnsupportedOperationException("Destination is null");
+ }
+ }
+
+ private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException
+ {
+ if (_destination != null && suppliedDestination != null)
+ {
+ throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
+ }
+
+ if (suppliedDestination == null)
+ {
+ throw new InvalidDestinationException("Supplied Destination was invalid");
+ }
- private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException{
- if (_destination != null && suppliedDestination != null){
- throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
- }
- if (suppliedDestination == null){
- throw new InvalidDestinationException("Supplied Destination was invalid");
- }
- }
+ }
- public AMQSession getSession() {
- return _session;
- }
+ public AMQSession getSession()
+ {
+ return _session;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java b/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java
new file mode 100644
index 0000000000..8c11672a65
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java
@@ -0,0 +1,17 @@
+package org.apache.qpid.client;
+
+import javax.jms.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Provides support for covenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
+ * so that operations related to their "temporary-ness" can be abstracted out.
+ */
+interface TemporaryDestination extends Destination
+{
+
+ public void delete() throws JMSException;
+ public AMQSession getSession();
+ public boolean isDeleted();
+
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index 80de66735c..026ef2e614 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -235,6 +235,19 @@ public class TopicSessionTest extends TestCase
fail("Unexpected Exception: " + je.getMessage());
}
+ TopicSession session2 = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ try
+ {
+ MessageConsumer consumer2 = session2.createConsumer(topic);
+ fail("Expected a JMSException when subscribing to a temporary topic created on adifferent session");
+ }
+ catch (JMSException je)
+ {
+ ; // pass
+ }
+
+
+
conn.close();
}