From 4e2c545f9570051b5ba00b3a7b666a23892fa1b4 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sun, 28 Aug 2011 22:34:40 +0000 Subject: NO-JIRA: AMQP 1-0 Sandbox - fix multiple sessions, add preliminary onMessage support, example git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1162604 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/amqp_1_0/jms/example/Hello.java | 141 +++++++++++++++++++++ .../qpid/amqp_1_0/jms/example/hello.properties | 28 ++++ .../amqp_1_0/jms/impl/MessageConsumerImpl.java | 26 +++- .../apache/qpid/amqp_1_0/jms/impl/SessionImpl.java | 139 +++++++++++++++++++- .../org/apache/qpid/amqp_1_0/client/Receiver.java | 26 +++- .../amqp_1_0/transport/ConnectionEndpoint.java | 1 + 6 files changed, 352 insertions(+), 9 deletions(-) create mode 100644 qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java create mode 100644 qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/hello.properties diff --git a/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java b/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java new file mode 100644 index 0000000000..5963068a69 --- /dev/null +++ b/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.jms.example; + +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Hashtable; +import java.util.Map; +import java.util.Properties; + + +public class Hello +{ + + public Hello() + { + } + + public static void main(String[] args) + { + try + { + Class.forName("org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory"); + + Hashtable env = new Hashtable(); + env.put("java.naming.provider.url", "hello.properties"); + env.put("java.naming.factory.initial", "org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory"); + + Context context = new InitialContext(env); + + ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("localhost"); + Connection connection = connectionFactory.createConnection(); + + Session producersession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = (Queue) context.lookup("queue"); + + + Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer messageConsumer = consumerSession.createConsumer(queue); + + messageConsumer.setMessageListener(new MessageListener() + { + public void onMessage(final Message message) + { + try + { + + if(message instanceof TextMessage) + { + System.out.println("Received text Message:"); + System.out.println("======================"); + System.out.println(((TextMessage) message).getText()); + } + else if(message instanceof MapMessage) + { + System.out.println("Received Map Message:"); + System.out.println("====================="); + + + MapMessage mapmessage = (MapMessage) message; + + Enumeration names = mapmessage.getMapNames(); + + while(names.hasMoreElements()) + { + String name = (String) names.nextElement(); + System.out.println(name + " -> " + mapmessage.getObject(name)); + } + + } + else if(message instanceof BytesMessage) + { + System.out.println("Received Bytes Message:"); + System.out.println("======================="); + System.out.println(((BytesMessage) message).readUTF()); + } + } + catch (JMSException e) + { + e.printStackTrace(); //TODO + } + + } + }); + + connection.start(); + + + MessageProducer messageProducer = producersession.createProducer(queue); + TextMessage message = producersession.createTextMessage("Hello world!"); + messageProducer.send(message); + + MapMessage mapmessage = producersession.createMapMessage(); + mapmessage.setBoolean("mybool", true); + mapmessage.setString("mystring", "hello"); + mapmessage.setLong("mylong", -25L); + + + messageProducer.send(mapmessage); + + BytesMessage bytesMessage = producersession.createBytesMessage(); + bytesMessage.writeUTF("This is a bytes message"); + + messageProducer.send(bytesMessage); + + + + Thread.sleep(50000L); + + connection.close(); + context.close(); + } + catch (Exception exp) + { + exp.printStackTrace(); + } + } +} diff --git a/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/hello.properties b/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/hello.properties new file mode 100644 index 0000000000..930c2c15cc --- /dev/null +++ b/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/hello.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +java.naming.factory.initial = org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.localhost = http://guest:guest@localhost/test?cliendId='test-client' + + +# Register an AMQP destination in JNDI +# destination.[jniName] = [Address Format] +queue.queue = queue diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index cf1c363a3c..f214b43997 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -18,6 +18,7 @@ */ package org.apache.qpid.amqp_1_0.jms.impl; +import org.apache.qpid.amqp_1_0.client.Message; import org.apache.qpid.amqp_1_0.client.Receiver; import org.apache.qpid.amqp_1_0.jms.MessageConsumer; import org.apache.qpid.amqp_1_0.type.Binary; @@ -36,6 +37,7 @@ public class MessageConsumerImpl implements MessageConsumer private SessionImpl _session; private Receiver _receiver; private Binary _lastUnackedMessage; + private MessageListener _messageListener; MessageConsumerImpl(final Destination destination, final SessionImpl session, @@ -68,14 +70,23 @@ public class MessageConsumerImpl implements MessageConsumer return _selector; } - public MessageListener getMessageListener() throws JMSException + public MessageListener getMessageListener() { - return null; //TODO + return _messageListener; } public void setMessageListener(final MessageListener messageListener) throws JMSException { - //TODO + _messageListener = messageListener; + _session.messageListenerSet( this ); + _receiver.setMessageArrivalListener(new Receiver.MessageArrivalListener() + { + + public void messageArrived(final Receiver receiver) + { + _session.messageArrived(MessageConsumerImpl.this); + } + }); } public MessageImpl receive() throws JMSException @@ -97,7 +108,7 @@ public class MessageConsumerImpl implements MessageConsumer private MessageImpl receiveImpl(long timeout) { - org.apache.qpid.amqp_1_0.client.Message msg = _receiver.receive(timeout); + org.apache.qpid.amqp_1_0.client.Message msg = receive0(timeout); if(msg != null) { preReceiveAction(msg); @@ -105,13 +116,18 @@ public class MessageConsumerImpl implements MessageConsumer return createJMSMessage(msg); } + Message receive0(final long timeout) + { + return _receiver.receive(timeout); + } + void acknowledge(final org.apache.qpid.amqp_1_0.client.Message msg) { _receiver.acknowledge(msg.getDeliveryTag()); } - private MessageImpl createJMSMessage(final org.apache.qpid.amqp_1_0.client.Message msg) + MessageImpl createJMSMessage(final org.apache.qpid.amqp_1_0.client.Message msg) { if(msg != null) { diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index ba44aafcee..01594a5a1c 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -19,6 +19,7 @@ package org.apache.qpid.amqp_1_0.jms.impl; import org.apache.qpid.amqp_1_0.client.Connection; +import org.apache.qpid.amqp_1_0.client.Message; import org.apache.qpid.amqp_1_0.jms.Session; import javax.jms.Destination; @@ -37,6 +38,10 @@ public class SessionImpl implements Session private org.apache.qpid.amqp_1_0.client.Session _session; private MessageFactory _messageFactory; private List _consumers = new ArrayList(); + private MessageListener _messageListener; + private Dispatcher _dispatcher = new Dispatcher(); + private Thread _dispatcherThread; + protected SessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode) { @@ -45,6 +50,9 @@ public class SessionImpl implements Session Connection clientConn = _connection.getClientConnection(); _session = clientConn.createSession(); _messageFactory = new MessageFactory(this); + + _dispatcherThread = new Thread(_dispatcher); + _dispatcherThread.start(); } public BytesMessageImpl createBytesMessage() throws JMSException @@ -119,7 +127,9 @@ public class SessionImpl implements Session public void close() throws JMSException { + _dispatcher.close(); _session.close(); + } public void recover() throws JMSException @@ -129,12 +139,19 @@ public class SessionImpl implements Session public MessageListener getMessageListener() throws JMSException { - return null; //TODO + return _messageListener; } public void setMessageListener(final MessageListener messageListener) throws JMSException { - //TODO + if(_messageListener != null) + { + + } + else + { + _messageListener = messageListener; + } } public void run() @@ -231,7 +248,11 @@ public class SessionImpl implements Session void start() { - //TODO + _dispatcher.start(); + for(MessageConsumerImpl consumer : _consumers) + { + consumer.start(); + } } org.apache.qpid.amqp_1_0.client.Session getClientSession() @@ -254,4 +275,116 @@ public class SessionImpl implements Session } } } + + void messageListenerSet(final MessageConsumerImpl messageConsumer) + { + _dispatcher.updateMessageListener(messageConsumer); + } + + public void messageArrived(final MessageConsumerImpl messageConsumer) + { + _dispatcher.messageArrivedAtConsumer(messageConsumer); + } + + + private class Dispatcher implements Runnable + { + + private final List _messageConsumerList = new ArrayList(); + + private boolean _closed; + private boolean _started; + + public void run() + { + synchronized(getLock()) + { + while(!_closed) + { + while(!_started || _messageConsumerList.isEmpty()) + { + try + { + getLock().wait(); + } + catch (InterruptedException e) + { + return; + } + } + while(_started && !_messageConsumerList.isEmpty()) + { + MessageConsumerImpl consumer = _messageConsumerList.remove(0); + MessageListener listener = consumer.getMessageListener(); + Message msg = consumer.receive0(0L); + + MessageImpl message = consumer.createJMSMessage(msg); + + if(message != null) + { + listener.onMessage(message); + if(_acknowledgeMode == AcknowledgeMode.AUTO_ACKNOWLEDGE + || _acknowledgeMode == AcknowledgeMode.DUPS_OK_ACKNOWLEDGE) + { + consumer.acknowledge(msg); + } + } + + } + } + + + } + + } + + private Object getLock() + { + return _session.getEndpoint().getLock(); + } + + public void messageArrivedAtConsumer(MessageConsumerImpl impl) + { + synchronized (getLock()) + { + _messageConsumerList.add(impl); + getLock().notifyAll(); + } + } + + public void close() + { + synchronized (getLock()) + { + _closed = true; + getLock().notifyAll(); + } + } + + public void updateMessageListener(final MessageConsumerImpl messageConsumer) + { + synchronized (getLock()) + { + getLock().notifyAll(); + } + } + + public void start() + { + synchronized (getLock()) + { + _started = true; + getLock().notifyAll(); + } + } + + public void stop() + { + synchronized (getLock()) + { + _started = false; + getLock().notifyAll(); + } + } + } } diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index f60e066c20..202c1a97d2 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -48,6 +48,7 @@ public class Receiver implements DeliveryStateHandler private Queue _prefetchQueue = new ConcurrentLinkedQueue(); private Map _unsettledMap = new HashMap(); + private MessageArrivalListener _messageArrivalListener; public Receiver(final Session session, final String linkName, @@ -110,6 +111,7 @@ public class Receiver implements DeliveryStateHandler @Override public void messageTransfer(final Transfer xfr) { _prefetchQueue.add(xfr); + postPrefetchAction(); } }); @@ -133,6 +135,14 @@ public class Receiver implements DeliveryStateHandler } } + private void postPrefetchAction() + { + if(_messageArrivalListener != null) + { + _messageArrivalListener.messageArrived(this); + } + } + public void setCredit(UnsignedInteger credit, boolean window) { _endpoint.setLinkCredit(credit); @@ -262,7 +272,7 @@ public class Receiver implements DeliveryStateHandler } catch (InterruptedException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + return null; } if(wait > 0L) { @@ -393,8 +403,22 @@ public class Receiver implements DeliveryStateHandler } + public void setMessageArrivalListener(final MessageArrivalListener messageArrivalListener) + { + synchronized(_endpoint.getLock()) + { + _messageArrivalListener = messageArrivalListener; + } + } + public static interface SettledAction { public void onSettled(Binary deliveryTag); } + + + public interface MessageArrivalListener + { + void messageArrived(Receiver receiver); + } } \ No newline at end of file diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index 85bcf2febf..05fbd74bde 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -439,6 +439,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour SessionEndpoint endpoint = new SessionEndpoint(this,begin); _receivingSessions[channel] = endpoint; + _sendingSessions[myChannelId] = endpoint; Begin beginToSend = new Begin(); -- cgit v1.2.1