diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2011-08-28 22:34:40 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2011-08-28 22:34:40 +0000 |
commit | 4e2c545f9570051b5ba00b3a7b666a23892fa1b4 (patch) | |
tree | 0a636ea33423abcd4f645ef33302b49457f73f79 | |
parent | 80f7040fb49757f80b9450bf497cbb1ff4f4f154 (diff) | |
download | qpid-python-4e2c545f9570051b5ba00b3a7b666a23892fa1b4.tar.gz |
NO-JIRA: AMQP 1-0 Sandbox - fix multiple sessions, add preliminary onMessage support, example
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1162604 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 352 insertions, 9 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java b/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java new file mode 100644 index 0000000000..5963068a69 --- /dev/null +++ b/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.jms.example; + +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Hashtable; +import java.util.Map; +import java.util.Properties; + + +public class Hello +{ + + public Hello() + { + } + + public static void main(String[] args) + { + try + { + Class.forName("org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory"); + + Hashtable env = new Hashtable(); + env.put("java.naming.provider.url", "hello.properties"); + env.put("java.naming.factory.initial", "org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory"); + + Context context = new InitialContext(env); + + ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("localhost"); + Connection connection = connectionFactory.createConnection(); + + Session producersession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = (Queue) context.lookup("queue"); + + + Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer messageConsumer = consumerSession.createConsumer(queue); + + messageConsumer.setMessageListener(new MessageListener() + { + public void onMessage(final Message message) + { + try + { + + if(message instanceof TextMessage) + { + System.out.println("Received text Message:"); + System.out.println("======================"); + System.out.println(((TextMessage) message).getText()); + } + else if(message instanceof MapMessage) + { + System.out.println("Received Map Message:"); + System.out.println("====================="); + + + MapMessage mapmessage = (MapMessage) message; + + Enumeration names = mapmessage.getMapNames(); + + while(names.hasMoreElements()) + { + String name = (String) names.nextElement(); + System.out.println(name + " -> " + mapmessage.getObject(name)); + } + + } + else if(message instanceof BytesMessage) + { + System.out.println("Received Bytes Message:"); + System.out.println("======================="); + System.out.println(((BytesMessage) message).readUTF()); + } + } + catch (JMSException e) + { + e.printStackTrace(); //TODO + } + + } + }); + + connection.start(); + + + MessageProducer messageProducer = producersession.createProducer(queue); + TextMessage message = producersession.createTextMessage("Hello world!"); + messageProducer.send(message); + + MapMessage mapmessage = producersession.createMapMessage(); + mapmessage.setBoolean("mybool", true); + mapmessage.setString("mystring", "hello"); + mapmessage.setLong("mylong", -25L); + + + messageProducer.send(mapmessage); + + BytesMessage bytesMessage = producersession.createBytesMessage(); + bytesMessage.writeUTF("This is a bytes message"); + + messageProducer.send(bytesMessage); + + + + Thread.sleep(50000L); + + connection.close(); + context.close(); + } + catch (Exception exp) + { + exp.printStackTrace(); + } + } +} diff --git a/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/hello.properties b/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/hello.properties new file mode 100644 index 0000000000..930c2c15cc --- /dev/null +++ b/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/hello.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +java.naming.factory.initial = org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.localhost = http://guest:guest@localhost/test?cliendId='test-client' + + +# Register an AMQP destination in JNDI +# destination.[jniName] = [Address Format] +queue.queue = queue diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index cf1c363a3c..f214b43997 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -18,6 +18,7 @@ */
package org.apache.qpid.amqp_1_0.jms.impl;
+import org.apache.qpid.amqp_1_0.client.Message;
import org.apache.qpid.amqp_1_0.client.Receiver;
import org.apache.qpid.amqp_1_0.jms.MessageConsumer;
import org.apache.qpid.amqp_1_0.type.Binary;
@@ -36,6 +37,7 @@ public class MessageConsumerImpl implements MessageConsumer private SessionImpl _session;
private Receiver _receiver;
private Binary _lastUnackedMessage;
+ private MessageListener _messageListener;
MessageConsumerImpl(final Destination destination,
final SessionImpl session,
@@ -68,14 +70,23 @@ public class MessageConsumerImpl implements MessageConsumer return _selector;
}
- public MessageListener getMessageListener() throws JMSException
+ public MessageListener getMessageListener()
{
- return null; //TODO
+ return _messageListener;
}
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
- //TODO
+ _messageListener = messageListener;
+ _session.messageListenerSet( this );
+ _receiver.setMessageArrivalListener(new Receiver.MessageArrivalListener()
+ {
+
+ public void messageArrived(final Receiver receiver)
+ {
+ _session.messageArrived(MessageConsumerImpl.this);
+ }
+ });
}
public MessageImpl receive() throws JMSException
@@ -97,7 +108,7 @@ public class MessageConsumerImpl implements MessageConsumer private MessageImpl receiveImpl(long timeout)
{
- org.apache.qpid.amqp_1_0.client.Message msg = _receiver.receive(timeout);
+ org.apache.qpid.amqp_1_0.client.Message msg = receive0(timeout);
if(msg != null)
{
preReceiveAction(msg);
@@ -105,13 +116,18 @@ public class MessageConsumerImpl implements MessageConsumer return createJMSMessage(msg);
}
+ Message receive0(final long timeout)
+ {
+ return _receiver.receive(timeout);
+ }
+
void acknowledge(final org.apache.qpid.amqp_1_0.client.Message msg)
{
_receiver.acknowledge(msg.getDeliveryTag());
}
- private MessageImpl createJMSMessage(final org.apache.qpid.amqp_1_0.client.Message msg)
+ MessageImpl createJMSMessage(final org.apache.qpid.amqp_1_0.client.Message msg)
{
if(msg != null)
{
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index ba44aafcee..01594a5a1c 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -19,6 +19,7 @@ package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.client.Connection;
+import org.apache.qpid.amqp_1_0.client.Message;
import org.apache.qpid.amqp_1_0.jms.Session;
import javax.jms.Destination;
@@ -37,6 +38,10 @@ public class SessionImpl implements Session private org.apache.qpid.amqp_1_0.client.Session _session;
private MessageFactory _messageFactory;
private List<MessageConsumerImpl> _consumers = new ArrayList<MessageConsumerImpl>();
+ private MessageListener _messageListener;
+ private Dispatcher _dispatcher = new Dispatcher();
+ private Thread _dispatcherThread;
+
protected SessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
{
@@ -45,6 +50,9 @@ public class SessionImpl implements Session Connection clientConn = _connection.getClientConnection();
_session = clientConn.createSession();
_messageFactory = new MessageFactory(this);
+
+ _dispatcherThread = new Thread(_dispatcher);
+ _dispatcherThread.start();
}
public BytesMessageImpl createBytesMessage() throws JMSException
@@ -119,7 +127,9 @@ public class SessionImpl implements Session public void close() throws JMSException
{
+ _dispatcher.close();
_session.close();
+
}
public void recover() throws JMSException
@@ -129,12 +139,19 @@ public class SessionImpl implements Session public MessageListener getMessageListener() throws JMSException
{
- return null; //TODO
+ return _messageListener;
}
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
- //TODO
+ if(_messageListener != null)
+ {
+
+ }
+ else
+ {
+ _messageListener = messageListener;
+ }
}
public void run()
@@ -231,7 +248,11 @@ public class SessionImpl implements Session void start()
{
- //TODO
+ _dispatcher.start();
+ for(MessageConsumerImpl consumer : _consumers)
+ {
+ consumer.start();
+ }
}
org.apache.qpid.amqp_1_0.client.Session getClientSession()
@@ -254,4 +275,116 @@ public class SessionImpl implements Session }
}
}
+
+ void messageListenerSet(final MessageConsumerImpl messageConsumer)
+ {
+ _dispatcher.updateMessageListener(messageConsumer);
+ }
+
+ public void messageArrived(final MessageConsumerImpl messageConsumer)
+ {
+ _dispatcher.messageArrivedAtConsumer(messageConsumer);
+ }
+
+
+ private class Dispatcher implements Runnable
+ {
+
+ private final List<MessageConsumerImpl> _messageConsumerList = new ArrayList<MessageConsumerImpl>();
+
+ private boolean _closed;
+ private boolean _started;
+
+ public void run()
+ {
+ synchronized(getLock())
+ {
+ while(!_closed)
+ {
+ while(!_started || _messageConsumerList.isEmpty())
+ {
+ try
+ {
+ getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ return;
+ }
+ }
+ while(_started && !_messageConsumerList.isEmpty())
+ {
+ MessageConsumerImpl consumer = _messageConsumerList.remove(0);
+ MessageListener listener = consumer.getMessageListener();
+ Message msg = consumer.receive0(0L);
+
+ MessageImpl message = consumer.createJMSMessage(msg);
+
+ if(message != null)
+ {
+ listener.onMessage(message);
+ if(_acknowledgeMode == AcknowledgeMode.AUTO_ACKNOWLEDGE
+ || _acknowledgeMode == AcknowledgeMode.DUPS_OK_ACKNOWLEDGE)
+ {
+ consumer.acknowledge(msg);
+ }
+ }
+
+ }
+ }
+
+
+ }
+
+ }
+
+ private Object getLock()
+ {
+ return _session.getEndpoint().getLock();
+ }
+
+ public void messageArrivedAtConsumer(MessageConsumerImpl impl)
+ {
+ synchronized (getLock())
+ {
+ _messageConsumerList.add(impl);
+ getLock().notifyAll();
+ }
+ }
+
+ public void close()
+ {
+ synchronized (getLock())
+ {
+ _closed = true;
+ getLock().notifyAll();
+ }
+ }
+
+ public void updateMessageListener(final MessageConsumerImpl messageConsumer)
+ {
+ synchronized (getLock())
+ {
+ getLock().notifyAll();
+ }
+ }
+
+ public void start()
+ {
+ synchronized (getLock())
+ {
+ _started = true;
+ getLock().notifyAll();
+ }
+ }
+
+ public void stop()
+ {
+ synchronized (getLock())
+ {
+ _started = false;
+ getLock().notifyAll();
+ }
+ }
+ }
}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index f60e066c20..202c1a97d2 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -48,6 +48,7 @@ public class Receiver implements DeliveryStateHandler private Queue<Transfer> _prefetchQueue = new ConcurrentLinkedQueue<Transfer>();
private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>();
+ private MessageArrivalListener _messageArrivalListener;
public Receiver(final Session session,
final String linkName,
@@ -110,6 +111,7 @@ public class Receiver implements DeliveryStateHandler @Override public void messageTransfer(final Transfer xfr)
{
_prefetchQueue.add(xfr);
+ postPrefetchAction();
}
});
@@ -133,6 +135,14 @@ public class Receiver implements DeliveryStateHandler }
}
+ private void postPrefetchAction()
+ {
+ if(_messageArrivalListener != null)
+ {
+ _messageArrivalListener.messageArrived(this);
+ }
+ }
+
public void setCredit(UnsignedInteger credit, boolean window)
{
_endpoint.setLinkCredit(credit);
@@ -262,7 +272,7 @@ public class Receiver implements DeliveryStateHandler }
catch (InterruptedException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ return null;
}
if(wait > 0L)
{
@@ -393,8 +403,22 @@ public class Receiver implements DeliveryStateHandler }
+ public void setMessageArrivalListener(final MessageArrivalListener messageArrivalListener)
+ {
+ synchronized(_endpoint.getLock())
+ {
+ _messageArrivalListener = messageArrivalListener;
+ }
+ }
+
public static interface SettledAction
{
public void onSettled(Binary deliveryTag);
}
+
+
+ public interface MessageArrivalListener
+ {
+ void messageArrived(Receiver receiver);
+ }
}
\ No newline at end of file diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index 85bcf2febf..05fbd74bde 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -439,6 +439,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour SessionEndpoint endpoint = new SessionEndpoint(this,begin);
_receivingSessions[channel] = endpoint;
+ _sendingSessions[myChannelId] = endpoint;
Begin beginToSend = new Begin();
|