summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-08-22 09:50:18 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-08-22 09:50:18 +0000
commit9d88761d6711f7f8722091fcb8849c0e89a8b8c9 (patch)
tree42fbc35147da861f1fa7387e8dc2bee6245f173f
parenta514d8f302b09452b902839028bddaaec0844fb2 (diff)
downloadqpid-python-9d88761d6711f7f8722091fcb8849c0e89a8b8c9.tar.gz
RG-AMQP : Add jndi support and trivial work on JMS
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1160167 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java9
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java6
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java7
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java6
4 files changed, 26 insertions, 2 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
index ed7bc02cbf..363a56c2a0 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
@@ -219,4 +219,13 @@ public class ConnectionImpl implements Connection
{
return _conn;
}
+
+ public boolean isStarted()
+ {
+ synchronized (_lock)
+ {
+ return _state == State.STARTED;
+ }
+ }
+
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
index 28985c52cd..cf1c363a3c 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
@@ -21,6 +21,7 @@ package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.client.Receiver;
import org.apache.qpid.amqp_1_0.jms.MessageConsumer;
import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -171,4 +172,9 @@ public class MessageConsumerImpl implements MessageConsumer
{
return _noLocal;
}
+
+ public void start()
+ {
+ _receiver.setCredit(UnsignedInteger.valueOf(100), true);
+ }
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
index f2881404af..84cf1bb4c6 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
@@ -83,11 +83,16 @@ class MessageFactory
{
message = new MapMessageImpl(header, properties, appProperties, (Map) ((AmqpValue)bodySection).getValue(), footer, _session);
}
- else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof Map)
+ else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof List)
{
message = new StreamMessageImpl(header, properties, appProperties,
(List) ((AmqpValue)bodySection).getValue(), footer, _session);
}
+ else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof String)
+ {
+ message = new TextMessageImpl(header, properties, appProperties,
+ (String) ((AmqpValue)bodySection).getValue(), footer, _session);
+ }
else if(bodySection instanceof Data)
{
message = new BytesMessageImpl(header, properties, appProperties, (Data) bodySection, footer, _session);
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
index 9eb52dd4ed..ba44aafcee 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
@@ -119,7 +119,7 @@ public class SessionImpl implements Session
public void close() throws JMSException
{
- //TODO
+ _session.close();
}
public void recover() throws JMSException
@@ -165,6 +165,10 @@ public class SessionImpl implements Session
{
messageConsumer = new MessageConsumerImpl(destination, this, selector, noLocal);
addConsumer(messageConsumer);
+ if(_connection.isStarted())
+ {
+ messageConsumer.start();
+ }
}
return messageConsumer;
}