summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-11-21 16:18:12 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-11-21 16:18:12 +0000
commitbe7491cfc7b25c78687198dd85ad9dc9451a2f2d (patch)
tree78ac9e3a6c7ff27033a09305edf131424bbe2848
parentbb26f591df0cb17a0f7238c76f7e39b504e080bd (diff)
downloadqpid-python-be7491cfc7b25c78687198dd85ad9dc9451a2f2d.tar.gz
Merged r1544129 from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.26@1544240 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java145
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java61
2 files changed, 109 insertions, 97 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
index 20454ace65..976ae10c56 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
@@ -175,43 +175,48 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
connect();
started = true;
}
+ }
- try
+ try
+ {
+ SessionImpl session = new SessionImpl(this, acknowledgeMode);
+ session.setQueueSession(_isQueueConnection);
+ session.setTopicSession(_isTopicConnection);
+
+ boolean connectionStarted = false;
+ synchronized(_lock)
{
- SessionImpl session = new SessionImpl(this, acknowledgeMode);
- session.setQueueSession(_isQueueConnection);
- session.setTopicSession(_isTopicConnection);
+ checkClosed();
_sessions.add(session);
-
- if(_state == State.STARTED)
- {
- session.start();
- }
-
- return session;
+ connectionStarted = _state == State.STARTED;
}
- catch(JMSException e)
+
+ if(connectionStarted)
{
- Error remoteError;
- if(started
- && e.getLinkedException() instanceof ConnectionErrorException
- && (remoteError = ((ConnectionErrorException)e.getLinkedException()).getRemoteError()).getCondition() == ConnectionError.REDIRECT)
- {
- String networkHost = (String) remoteError.getInfo().get(Symbol.valueOf("network-host"));
- int port = (Integer) remoteError.getInfo().get(Symbol.valueOf("port"));
- String hostName = (String) remoteError.getInfo().get(Symbol.valueOf("hostname"));
- reconnect(networkHost,port,hostName);
- return createSession(acknowledgeMode);
-
- }
- else
- {
- throw e;
- }
+ session.start();
}
+
+ return session;
}
+ catch(JMSException e)
+ {
+ Error remoteError;
+ if(started
+ && e.getLinkedException() instanceof ConnectionErrorException
+ && (remoteError = ((ConnectionErrorException)e.getLinkedException()).getRemoteError()).getCondition() == ConnectionError.REDIRECT)
+ {
+ String networkHost = (String) remoteError.getInfo().get(Symbol.valueOf("network-host"));
+ int port = (Integer) remoteError.getInfo().get(Symbol.valueOf("port"));
+ String hostName = (String) remoteError.getInfo().get(Symbol.valueOf("hostname"));
+ reconnect(networkHost,port,hostName);
+ return createSession(acknowledgeMode);
-
+ }
+ else
+ {
+ throw e;
+ }
+ }
}
void removeSession(SessionImpl session)
@@ -272,6 +277,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
public void start() throws JMSException
{
+ List<SessionImpl> stoppedSessions = null;
synchronized(_lock)
{
checkClosed();
@@ -281,30 +287,30 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
// TODO
_state = State.STARTED;
-
- for(SessionImpl session : _sessions)
- {
- session.start();
- }
-
+ stoppedSessions = new ArrayList<SessionImpl>(_sessions);
}
_lock.notifyAll();
}
+ if (stoppedSessions != null)
+ {
+ for(SessionImpl session : stoppedSessions)
+ {
+ session.start();
+ }
+ }
}
public void stop() throws JMSException
{
+ List<SessionImpl> startedSessions = null;
synchronized(_lock)
{
switch(_state)
{
case STARTED:
- for(SessionImpl session : _sessions)
- {
- session.stop();
- }
+ startedSessions = new ArrayList<SessionImpl>(_sessions);
case UNCONNECTED:
_state = State.STOPPED;
break;
@@ -314,6 +320,14 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
_lock.notifyAll();
}
+
+ if (startedSessions != null)
+ {
+ for(SessionImpl session : startedSessions)
+ {
+ session.stop();
+ }
+ }
}
@@ -341,39 +355,34 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
public void close() throws JMSException
{
- Object outerLock;
- if(_conn != null)
+ List<SessionImpl> sessions = null;
+ List<CloseTask> closeTasks = null;
+ boolean closeConnection = false;
+ synchronized(_lock)
{
- outerLock = _conn.getEndpoint().getLock();
+ if(_state != State.CLOSED)
+ {
+ _state = State.CLOSED;
+ sessions = new ArrayList<SessionImpl>(_sessions);
+ closeTasks = new ArrayList<CloseTask>(_closeTasks);
+ closeConnection = _conn != null && _state != State.UNCONNECTED;
+ }
+
+ _lock.notifyAll();
}
- else
+
+ if (sessions != null)
{
- outerLock = _lock;
- }
-
- synchronized (outerLock)
- {
- synchronized(_lock)
+ for(SessionImpl session : sessions)
{
- if(_state != State.CLOSED)
- {
- stop();
- List<SessionImpl> sessions = new ArrayList<SessionImpl>(_sessions);
- for(SessionImpl session : sessions)
- {
- session.close();
- }
- for(CloseTask task : _closeTasks)
- {
- task.onClose();
- }
- if(_conn != null && _state != State.UNCONNECTED ) {
- _conn.close();
- }
- _state = State.CLOSED;
- }
-
- _lock.notifyAll();
+ session.close();
+ }
+ for(CloseTask task : closeTasks)
+ {
+ task.onClose();
+ }
+ if(closeConnection) {
+ _conn.close();
}
}
}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
index 5b9a67503b..ce1ce512a2 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
@@ -114,35 +114,38 @@ public class Session
}
- public synchronized SendingLinkEndpoint createSendingLinkEndpoint(final String linkName,
- final Target target,
- final Source source,
- AcknowledgeMode mode,
- Map<Binary, Outcome> unsettled,
- final DeliveryStateHandler deliveryStateHandler)
- {
- SendingLinkEndpoint link = this.getEndpoint().createSendingLinkEndpoint(linkName, source, target,
- unsettled, deliveryStateHandler);
-
- switch(mode)
- {
- case ALO:
- link.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- link.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- break;
- case AMO:
- link.setSendingSettlementMode(SenderSettleMode.SETTLED);
- break;
- case EO:
- link.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- link.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
- break;
-
- }
-
- link.attach();
-
- return link;
+ public SendingLinkEndpoint createSendingLinkEndpoint(final String linkName,
+ final Target target,
+ final Source source,
+ AcknowledgeMode mode,
+ Map<Binary, Outcome> unsettled,
+ final DeliveryStateHandler deliveryStateHandler)
+ {
+ SessionEndpoint endpoint = this.getEndpoint();
+ synchronized(endpoint.getLock())
+ {
+ SendingLinkEndpoint link = endpoint.createSendingLinkEndpoint(linkName, source, target,
+ unsettled, deliveryStateHandler);
+
+ switch(mode)
+ {
+ case ALO:
+ link.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ link.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+ break;
+ case AMO:
+ link.setSendingSettlementMode(SenderSettleMode.SETTLED);
+ break;
+ case EO:
+ link.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ link.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
+ break;
+
+ }
+
+ link.attach();
+ return link;
+ }
}
public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException