summaryrefslogtreecommitdiff
path: root/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java')
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java172
1 files changed, 150 insertions, 22 deletions
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
index 587b12b51a..be1c2d6514 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
@@ -25,9 +25,8 @@ import org.apache.qpid.amqp_1_0.transport.Container;
import javax.jms.*;
import javax.jms.IllegalStateException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import javax.jms.Queue;
+import java.util.*;
public class ConnectionImpl implements Connection, QueueConnection, TopicConnection
{
@@ -43,16 +42,26 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
private boolean _isQueueConnection;
private boolean _isTopicConnection;
private final Collection<CloseTask> _closeTasks = new ArrayList<CloseTask>();
+ private final String _host;
+ private final int _port;
+ private final String _username;
+ private final String _password;
+ private final String _remoteHost;
+ private final boolean _ssl;
+ private String _clientId;
+ private String _queuePrefix;
+ private String _topicPrefix;
private static enum State
{
+ UNCONNECTED,
STOPPED,
STARTED,
CLOSED
}
- private volatile State _state = State.STOPPED;
+ private volatile State _state = State.UNCONNECTED;
public ConnectionImpl(String host, int port, String username, String password, String clientId) throws JMSException
{
@@ -66,20 +75,52 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
public ConnectionImpl(String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl) throws JMSException
{
- Container container = clientId == null ? new Container() : new Container(clientId);
- // TODO - authentication, containerId, clientId, ssl?, etc
- try
+ _host = host;
+ _port = port;
+ _username = username;
+ _password = password;
+ _clientId = clientId;
+ _remoteHost = remoteHost;
+ _ssl = ssl;
+ }
+
+ private void connect() throws JMSException
+ {
+ synchronized(_lock)
{
- _conn = new org.apache.qpid.amqp_1_0.client.Connection(host, port, username, password, container, remoteHost, ssl);
- // TODO - retrieve negotiated AMQP version
- _connectionMetaData = new ConnectionMetaDataImpl(1,0,0);
+ // already connected?
+ if( _state == State.UNCONNECTED )
+ {
+ _state = State.STOPPED;
+
+ Container container = _clientId == null ? new Container() : new Container(_clientId);
+ // TODO - authentication, containerId, clientId, ssl?, etc
+ try
+ {
+ _conn = new org.apache.qpid.amqp_1_0.client.Connection(_host,
+ _port, _username, _password, container, _remoteHost, _ssl);
+ // TODO - retrieve negotiated AMQP version
+ _connectionMetaData = new ConnectionMetaDataImpl(1,0,0);
+ }
+ catch (org.apache.qpid.amqp_1_0.client.Connection.ConnectionException e)
+ {
+ JMSException jmsEx = new JMSException(e.getMessage());
+ jmsEx.setLinkedException(e);
+ jmsEx.initCause(e);
+ throw jmsEx;
+ }
+ }
}
- catch (org.apache.qpid.amqp_1_0.client.Connection.ConnectionException e)
+ }
+
+ private void checkNotConnected(String msg) throws IllegalStateException
+ {
+ synchronized(_lock)
{
- JMSException jmsEx = new JMSException(e.getMessage());
- jmsEx.setLinkedException(e);
- jmsEx.initCause(e);
- throw jmsEx;
+ if( _state != State.UNCONNECTED )
+ {
+ throw new IllegalStateException(msg);
+ }
}
}
@@ -111,7 +152,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
{
throw new IllegalStateException("Cannot create a session on a closed connection");
}
-
+ connect();
SessionImpl session = new SessionImpl(this, acknowledgeMode);
session.setQueueSession(_isQueueConnection);
session.setTopicSession(_isTopicConnection);
@@ -125,14 +166,19 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
public String getClientID() throws JMSException
{
checkClosed();
- return _conn.getEndpoint().getContainer().getId();
+ return _clientId;
}
- public void setClientID(final String s) throws JMSException
+ public void setClientID(final String value) throws JMSException
{
- throw new IllegalStateException("Cannot set client-id to \""
- + s
- + "\"; client-id must be set on connection creation");
+ checkNotConnected("Cannot set client-id to \""
+ + value
+ + "\"; client-id must be set before the connection is used");
+ if( _clientId !=null )
+ {
+ throw new IllegalStateException("client-id has already been set");
+ }
+ _clientId = value;
}
public ConnectionMetaData getMetaData() throws JMSException
@@ -158,6 +204,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
synchronized(_lock)
{
checkClosed();
+ connect();
if(_state == State.STOPPED)
{
// TODO
@@ -187,6 +234,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
{
session.stop();
}
+ case UNCONNECTED:
_state = State.STOPPED;
break;
case CLOSED:
@@ -235,7 +283,9 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
{
task.onClose();
}
- _conn.close();
+ if(_conn != null && _state != State.UNCONNECTED ) {
+ _conn.close();
+ }
_state = State.CLOSED;
}
@@ -282,6 +332,10 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
final int i) throws JMSException
{
checkClosed();
+ if (_isQueueConnection)
+ {
+ throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
+ }
return null; //TODO
}
@@ -326,4 +380,78 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
{
_isTopicConnection = topicConnection;
}
+
+ public String getTopicPrefix()
+ {
+ return _topicPrefix;
+ }
+
+ public void setTopicPrefix(String topicPrefix)
+ {
+ _topicPrefix = topicPrefix;
+ }
+
+ public String getQueuePrefix()
+ {
+ return _queuePrefix;
+ }
+
+ public void setQueuePrefix(String queueprefix)
+ {
+ _queuePrefix = queueprefix;
+ }
+
+ DecodedDestination toDecodedDestination(DestinationImpl dest)
+ {
+ String address = dest.getAddress();
+ Set<String> kind = null;
+ Class clazz = dest.getClass();
+ if( clazz==QueueImpl.class )
+ {
+ kind = MessageImpl.JMS_QUEUE_ATTRIBUTES;
+ if( _queuePrefix!=null )
+ {
+ // Avoid double prefixing..
+ if( !address.startsWith(_queuePrefix) )
+ {
+ address = _queuePrefix+address;
+ }
+ }
+ }
+ else if( clazz==TopicImpl.class )
+ {
+ kind = MessageImpl.JMS_TOPIC_ATTRIBUTES;
+ if( _topicPrefix!=null )
+ {
+ // Avoid double prefixing..
+ if( !address.startsWith(_topicPrefix) )
+ {
+ address = _topicPrefix+address;
+ }
+ }
+ }
+ else if( clazz==TemporaryQueueImpl.class )
+ {
+ kind = MessageImpl.JMS_TEMP_QUEUE_ATTRIBUTES;
+ }
+ else if( clazz==TemporaryTopicImpl.class )
+ {
+ kind = MessageImpl.JMS_TEMP_TOPIC_ATTRIBUTES;
+ }
+ return new DecodedDestination(address, kind);
+ }
+
+ DecodedDestination toDecodedDestination(String address, Set<String> kind)
+ {
+ if( (kind == null || kind.equals(MessageImpl.JMS_QUEUE_ATTRIBUTES)) && _queuePrefix!=null && address.startsWith(_queuePrefix))
+ {
+ return new DecodedDestination(address.substring(_queuePrefix.length()), MessageImpl.JMS_QUEUE_ATTRIBUTES);
+ }
+ if( (kind == null || kind.equals(MessageImpl.JMS_TOPIC_ATTRIBUTES)) && _topicPrefix!=null && address.startsWith(_topicPrefix))
+ {
+ return new DecodedDestination(address.substring(_topicPrefix.length()), MessageImpl.JMS_TOPIC_ATTRIBUTES);
+ }
+ return new DecodedDestination(address, kind);
+ }
+
}