From 9d88761d6711f7f8722091fcb8849c0e89a8b8c9 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 22 Aug 2011 09:50:18 +0000 Subject: RG-AMQP : Add jndi support and trivial work on JMS git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1160167 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java | 9 +++++++++ .../org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java | 6 ++++++ .../java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java | 7 ++++++- .../main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java | 6 +++++- 4 files changed, 26 insertions(+), 2 deletions(-) diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java index ed7bc02cbf..363a56c2a0 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java @@ -219,4 +219,13 @@ public class ConnectionImpl implements Connection { return _conn; } + + public boolean isStarted() + { + synchronized (_lock) + { + return _state == State.STARTED; + } + } + } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index 28985c52cd..cf1c363a3c 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -21,6 +21,7 @@ package org.apache.qpid.amqp_1_0.jms.impl; import org.apache.qpid.amqp_1_0.client.Receiver; import org.apache.qpid.amqp_1_0.jms.MessageConsumer; import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; import javax.jms.Destination; import javax.jms.JMSException; @@ -171,4 +172,9 @@ public class MessageConsumerImpl implements MessageConsumer { return _noLocal; } + + public void start() + { + _receiver.setCredit(UnsignedInteger.valueOf(100), true); + } } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java index f2881404af..84cf1bb4c6 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java @@ -83,11 +83,16 @@ class MessageFactory { message = new MapMessageImpl(header, properties, appProperties, (Map) ((AmqpValue)bodySection).getValue(), footer, _session); } - else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof Map) + else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof List) { message = new StreamMessageImpl(header, properties, appProperties, (List) ((AmqpValue)bodySection).getValue(), footer, _session); } + else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof String) + { + message = new TextMessageImpl(header, properties, appProperties, + (String) ((AmqpValue)bodySection).getValue(), footer, _session); + } else if(bodySection instanceof Data) { message = new BytesMessageImpl(header, properties, appProperties, (Data) bodySection, footer, _session); diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index 9eb52dd4ed..ba44aafcee 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -119,7 +119,7 @@ public class SessionImpl implements Session public void close() throws JMSException { - //TODO + _session.close(); } public void recover() throws JMSException @@ -165,6 +165,10 @@ public class SessionImpl implements Session { messageConsumer = new MessageConsumerImpl(destination, this, selector, noLocal); addConsumer(messageConsumer); + if(_connection.isStarted()) + { + messageConsumer.start(); + } } return messageConsumer; } -- cgit v1.2.1