summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-08-28 22:34:40 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-08-28 22:34:40 +0000
commit4e2c545f9570051b5ba00b3a7b666a23892fa1b4 (patch)
tree0a636ea33423abcd4f645ef33302b49457f73f79
parent80f7040fb49757f80b9450bf497cbb1ff4f4f154 (diff)
downloadqpid-python-4e2c545f9570051b5ba00b3a7b666a23892fa1b4.tar.gz
NO-JIRA: AMQP 1-0 Sandbox - fix multiple sessions, add preliminary onMessage support, example
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1162604 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java141
-rw-r--r--qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/hello.properties28
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java26
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java139
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java26
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java1
6 files changed, 352 insertions, 9 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java b/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java
new file mode 100644
index 0000000000..5963068a69
--- /dev/null
+++ b/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms.example;
+
+import javax.jms.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Properties;
+
+
+public class Hello
+{
+
+ public Hello()
+ {
+ }
+
+ public static void main(String[] args)
+ {
+ try
+ {
+ Class.forName("org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");
+
+ Hashtable env = new Hashtable();
+ env.put("java.naming.provider.url", "hello.properties");
+ env.put("java.naming.factory.initial", "org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");
+
+ Context context = new InitialContext(env);
+
+ ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("localhost");
+ Connection connection = connectionFactory.createConnection();
+
+ Session producersession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = (Queue) context.lookup("queue");
+
+
+ Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer messageConsumer = consumerSession.createConsumer(queue);
+
+ messageConsumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(final Message message)
+ {
+ try
+ {
+
+ if(message instanceof TextMessage)
+ {
+ System.out.println("Received text Message:");
+ System.out.println("======================");
+ System.out.println(((TextMessage) message).getText());
+ }
+ else if(message instanceof MapMessage)
+ {
+ System.out.println("Received Map Message:");
+ System.out.println("=====================");
+
+
+ MapMessage mapmessage = (MapMessage) message;
+
+ Enumeration names = mapmessage.getMapNames();
+
+ while(names.hasMoreElements())
+ {
+ String name = (String) names.nextElement();
+ System.out.println(name + " -> " + mapmessage.getObject(name));
+ }
+
+ }
+ else if(message instanceof BytesMessage)
+ {
+ System.out.println("Received Bytes Message:");
+ System.out.println("=======================");
+ System.out.println(((BytesMessage) message).readUTF());
+ }
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //TODO
+ }
+
+ }
+ });
+
+ connection.start();
+
+
+ MessageProducer messageProducer = producersession.createProducer(queue);
+ TextMessage message = producersession.createTextMessage("Hello world!");
+ messageProducer.send(message);
+
+ MapMessage mapmessage = producersession.createMapMessage();
+ mapmessage.setBoolean("mybool", true);
+ mapmessage.setString("mystring", "hello");
+ mapmessage.setLong("mylong", -25L);
+
+
+ messageProducer.send(mapmessage);
+
+ BytesMessage bytesMessage = producersession.createBytesMessage();
+ bytesMessage.writeUTF("This is a bytes message");
+
+ messageProducer.send(bytesMessage);
+
+
+
+ Thread.sleep(50000L);
+
+ connection.close();
+ context.close();
+ }
+ catch (Exception exp)
+ {
+ exp.printStackTrace();
+ }
+ }
+}
diff --git a/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/hello.properties b/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/hello.properties
new file mode 100644
index 0000000000..930c2c15cc
--- /dev/null
+++ b/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/hello.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+java.naming.factory.initial = org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory
+
+# register some connection factories
+# connectionfactory.[jndiname] = [ConnectionURL]
+connectionfactory.localhost = http://guest:guest@localhost/test?cliendId='test-client'
+
+
+# Register an AMQP destination in JNDI
+# destination.[jniName] = [Address Format]
+queue.queue = queue
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
index cf1c363a3c..f214b43997 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.qpid.amqp_1_0.jms.impl;
+import org.apache.qpid.amqp_1_0.client.Message;
import org.apache.qpid.amqp_1_0.client.Receiver;
import org.apache.qpid.amqp_1_0.jms.MessageConsumer;
import org.apache.qpid.amqp_1_0.type.Binary;
@@ -36,6 +37,7 @@ public class MessageConsumerImpl implements MessageConsumer
private SessionImpl _session;
private Receiver _receiver;
private Binary _lastUnackedMessage;
+ private MessageListener _messageListener;
MessageConsumerImpl(final Destination destination,
final SessionImpl session,
@@ -68,14 +70,23 @@ public class MessageConsumerImpl implements MessageConsumer
return _selector;
}
- public MessageListener getMessageListener() throws JMSException
+ public MessageListener getMessageListener()
{
- return null; //TODO
+ return _messageListener;
}
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
- //TODO
+ _messageListener = messageListener;
+ _session.messageListenerSet( this );
+ _receiver.setMessageArrivalListener(new Receiver.MessageArrivalListener()
+ {
+
+ public void messageArrived(final Receiver receiver)
+ {
+ _session.messageArrived(MessageConsumerImpl.this);
+ }
+ });
}
public MessageImpl receive() throws JMSException
@@ -97,7 +108,7 @@ public class MessageConsumerImpl implements MessageConsumer
private MessageImpl receiveImpl(long timeout)
{
- org.apache.qpid.amqp_1_0.client.Message msg = _receiver.receive(timeout);
+ org.apache.qpid.amqp_1_0.client.Message msg = receive0(timeout);
if(msg != null)
{
preReceiveAction(msg);
@@ -105,13 +116,18 @@ public class MessageConsumerImpl implements MessageConsumer
return createJMSMessage(msg);
}
+ Message receive0(final long timeout)
+ {
+ return _receiver.receive(timeout);
+ }
+
void acknowledge(final org.apache.qpid.amqp_1_0.client.Message msg)
{
_receiver.acknowledge(msg.getDeliveryTag());
}
- private MessageImpl createJMSMessage(final org.apache.qpid.amqp_1_0.client.Message msg)
+ MessageImpl createJMSMessage(final org.apache.qpid.amqp_1_0.client.Message msg)
{
if(msg != null)
{
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
index ba44aafcee..01594a5a1c 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
@@ -19,6 +19,7 @@
package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.client.Connection;
+import org.apache.qpid.amqp_1_0.client.Message;
import org.apache.qpid.amqp_1_0.jms.Session;
import javax.jms.Destination;
@@ -37,6 +38,10 @@ public class SessionImpl implements Session
private org.apache.qpid.amqp_1_0.client.Session _session;
private MessageFactory _messageFactory;
private List<MessageConsumerImpl> _consumers = new ArrayList<MessageConsumerImpl>();
+ private MessageListener _messageListener;
+ private Dispatcher _dispatcher = new Dispatcher();
+ private Thread _dispatcherThread;
+
protected SessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
{
@@ -45,6 +50,9 @@ public class SessionImpl implements Session
Connection clientConn = _connection.getClientConnection();
_session = clientConn.createSession();
_messageFactory = new MessageFactory(this);
+
+ _dispatcherThread = new Thread(_dispatcher);
+ _dispatcherThread.start();
}
public BytesMessageImpl createBytesMessage() throws JMSException
@@ -119,7 +127,9 @@ public class SessionImpl implements Session
public void close() throws JMSException
{
+ _dispatcher.close();
_session.close();
+
}
public void recover() throws JMSException
@@ -129,12 +139,19 @@ public class SessionImpl implements Session
public MessageListener getMessageListener() throws JMSException
{
- return null; //TODO
+ return _messageListener;
}
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
- //TODO
+ if(_messageListener != null)
+ {
+
+ }
+ else
+ {
+ _messageListener = messageListener;
+ }
}
public void run()
@@ -231,7 +248,11 @@ public class SessionImpl implements Session
void start()
{
- //TODO
+ _dispatcher.start();
+ for(MessageConsumerImpl consumer : _consumers)
+ {
+ consumer.start();
+ }
}
org.apache.qpid.amqp_1_0.client.Session getClientSession()
@@ -254,4 +275,116 @@ public class SessionImpl implements Session
}
}
}
+
+ void messageListenerSet(final MessageConsumerImpl messageConsumer)
+ {
+ _dispatcher.updateMessageListener(messageConsumer);
+ }
+
+ public void messageArrived(final MessageConsumerImpl messageConsumer)
+ {
+ _dispatcher.messageArrivedAtConsumer(messageConsumer);
+ }
+
+
+ private class Dispatcher implements Runnable
+ {
+
+ private final List<MessageConsumerImpl> _messageConsumerList = new ArrayList<MessageConsumerImpl>();
+
+ private boolean _closed;
+ private boolean _started;
+
+ public void run()
+ {
+ synchronized(getLock())
+ {
+ while(!_closed)
+ {
+ while(!_started || _messageConsumerList.isEmpty())
+ {
+ try
+ {
+ getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ return;
+ }
+ }
+ while(_started && !_messageConsumerList.isEmpty())
+ {
+ MessageConsumerImpl consumer = _messageConsumerList.remove(0);
+ MessageListener listener = consumer.getMessageListener();
+ Message msg = consumer.receive0(0L);
+
+ MessageImpl message = consumer.createJMSMessage(msg);
+
+ if(message != null)
+ {
+ listener.onMessage(message);
+ if(_acknowledgeMode == AcknowledgeMode.AUTO_ACKNOWLEDGE
+ || _acknowledgeMode == AcknowledgeMode.DUPS_OK_ACKNOWLEDGE)
+ {
+ consumer.acknowledge(msg);
+ }
+ }
+
+ }
+ }
+
+
+ }
+
+ }
+
+ private Object getLock()
+ {
+ return _session.getEndpoint().getLock();
+ }
+
+ public void messageArrivedAtConsumer(MessageConsumerImpl impl)
+ {
+ synchronized (getLock())
+ {
+ _messageConsumerList.add(impl);
+ getLock().notifyAll();
+ }
+ }
+
+ public void close()
+ {
+ synchronized (getLock())
+ {
+ _closed = true;
+ getLock().notifyAll();
+ }
+ }
+
+ public void updateMessageListener(final MessageConsumerImpl messageConsumer)
+ {
+ synchronized (getLock())
+ {
+ getLock().notifyAll();
+ }
+ }
+
+ public void start()
+ {
+ synchronized (getLock())
+ {
+ _started = true;
+ getLock().notifyAll();
+ }
+ }
+
+ public void stop()
+ {
+ synchronized (getLock())
+ {
+ _started = false;
+ getLock().notifyAll();
+ }
+ }
+ }
}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
index f60e066c20..202c1a97d2 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
@@ -48,6 +48,7 @@ public class Receiver implements DeliveryStateHandler
private Queue<Transfer> _prefetchQueue = new ConcurrentLinkedQueue<Transfer>();
private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>();
+ private MessageArrivalListener _messageArrivalListener;
public Receiver(final Session session,
final String linkName,
@@ -110,6 +111,7 @@ public class Receiver implements DeliveryStateHandler
@Override public void messageTransfer(final Transfer xfr)
{
_prefetchQueue.add(xfr);
+ postPrefetchAction();
}
});
@@ -133,6 +135,14 @@ public class Receiver implements DeliveryStateHandler
}
}
+ private void postPrefetchAction()
+ {
+ if(_messageArrivalListener != null)
+ {
+ _messageArrivalListener.messageArrived(this);
+ }
+ }
+
public void setCredit(UnsignedInteger credit, boolean window)
{
_endpoint.setLinkCredit(credit);
@@ -262,7 +272,7 @@ public class Receiver implements DeliveryStateHandler
}
catch (InterruptedException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ return null;
}
if(wait > 0L)
{
@@ -393,8 +403,22 @@ public class Receiver implements DeliveryStateHandler
}
+ public void setMessageArrivalListener(final MessageArrivalListener messageArrivalListener)
+ {
+ synchronized(_endpoint.getLock())
+ {
+ _messageArrivalListener = messageArrivalListener;
+ }
+ }
+
public static interface SettledAction
{
public void onSettled(Binary deliveryTag);
}
+
+
+ public interface MessageArrivalListener
+ {
+ void messageArrived(Receiver receiver);
+ }
} \ No newline at end of file
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
index 85bcf2febf..05fbd74bde 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
@@ -439,6 +439,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
SessionEndpoint endpoint = new SessionEndpoint(this,begin);
_receivingSessions[channel] = endpoint;
+ _sendingSessions[myChannelId] = endpoint;
Begin beginToSend = new Begin();