summaryrefslogtreecommitdiff
path: root/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java')
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java139
1 files changed, 136 insertions, 3 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
index ba44aafcee..01594a5a1c 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
@@ -19,6 +19,7 @@
package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.client.Connection;
+import org.apache.qpid.amqp_1_0.client.Message;
import org.apache.qpid.amqp_1_0.jms.Session;
import javax.jms.Destination;
@@ -37,6 +38,10 @@ public class SessionImpl implements Session
private org.apache.qpid.amqp_1_0.client.Session _session;
private MessageFactory _messageFactory;
private List<MessageConsumerImpl> _consumers = new ArrayList<MessageConsumerImpl>();
+ private MessageListener _messageListener;
+ private Dispatcher _dispatcher = new Dispatcher();
+ private Thread _dispatcherThread;
+
protected SessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
{
@@ -45,6 +50,9 @@ public class SessionImpl implements Session
Connection clientConn = _connection.getClientConnection();
_session = clientConn.createSession();
_messageFactory = new MessageFactory(this);
+
+ _dispatcherThread = new Thread(_dispatcher);
+ _dispatcherThread.start();
}
public BytesMessageImpl createBytesMessage() throws JMSException
@@ -119,7 +127,9 @@ public class SessionImpl implements Session
public void close() throws JMSException
{
+ _dispatcher.close();
_session.close();
+
}
public void recover() throws JMSException
@@ -129,12 +139,19 @@ public class SessionImpl implements Session
public MessageListener getMessageListener() throws JMSException
{
- return null; //TODO
+ return _messageListener;
}
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
- //TODO
+ if(_messageListener != null)
+ {
+
+ }
+ else
+ {
+ _messageListener = messageListener;
+ }
}
public void run()
@@ -231,7 +248,11 @@ public class SessionImpl implements Session
void start()
{
- //TODO
+ _dispatcher.start();
+ for(MessageConsumerImpl consumer : _consumers)
+ {
+ consumer.start();
+ }
}
org.apache.qpid.amqp_1_0.client.Session getClientSession()
@@ -254,4 +275,116 @@ public class SessionImpl implements Session
}
}
}
+
+ void messageListenerSet(final MessageConsumerImpl messageConsumer)
+ {
+ _dispatcher.updateMessageListener(messageConsumer);
+ }
+
+ public void messageArrived(final MessageConsumerImpl messageConsumer)
+ {
+ _dispatcher.messageArrivedAtConsumer(messageConsumer);
+ }
+
+
+ private class Dispatcher implements Runnable
+ {
+
+ private final List<MessageConsumerImpl> _messageConsumerList = new ArrayList<MessageConsumerImpl>();
+
+ private boolean _closed;
+ private boolean _started;
+
+ public void run()
+ {
+ synchronized(getLock())
+ {
+ while(!_closed)
+ {
+ while(!_started || _messageConsumerList.isEmpty())
+ {
+ try
+ {
+ getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ return;
+ }
+ }
+ while(_started && !_messageConsumerList.isEmpty())
+ {
+ MessageConsumerImpl consumer = _messageConsumerList.remove(0);
+ MessageListener listener = consumer.getMessageListener();
+ Message msg = consumer.receive0(0L);
+
+ MessageImpl message = consumer.createJMSMessage(msg);
+
+ if(message != null)
+ {
+ listener.onMessage(message);
+ if(_acknowledgeMode == AcknowledgeMode.AUTO_ACKNOWLEDGE
+ || _acknowledgeMode == AcknowledgeMode.DUPS_OK_ACKNOWLEDGE)
+ {
+ consumer.acknowledge(msg);
+ }
+ }
+
+ }
+ }
+
+
+ }
+
+ }
+
+ private Object getLock()
+ {
+ return _session.getEndpoint().getLock();
+ }
+
+ public void messageArrivedAtConsumer(MessageConsumerImpl impl)
+ {
+ synchronized (getLock())
+ {
+ _messageConsumerList.add(impl);
+ getLock().notifyAll();
+ }
+ }
+
+ public void close()
+ {
+ synchronized (getLock())
+ {
+ _closed = true;
+ getLock().notifyAll();
+ }
+ }
+
+ public void updateMessageListener(final MessageConsumerImpl messageConsumer)
+ {
+ synchronized (getLock())
+ {
+ getLock().notifyAll();
+ }
+ }
+
+ public void start()
+ {
+ synchronized (getLock())
+ {
+ _started = true;
+ getLock().notifyAll();
+ }
+ }
+
+ public void stop()
+ {
+ synchronized (getLock())
+ {
+ _started = false;
+ getLock().notifyAll();
+ }
+ }
+ }
}