From be7491cfc7b25c78687198dd85ad9dc9451a2f2d Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 21 Nov 2013 16:18:12 +0000 Subject: Merged r1544129 from trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.26@1544240 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/amqp_1_0/jms/impl/ConnectionImpl.java | 145 +++++++++++---------- .../org/apache/qpid/amqp_1_0/client/Session.java | 61 ++++----- 2 files changed, 109 insertions(+), 97 deletions(-) diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java index 20454ace65..976ae10c56 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java @@ -175,43 +175,48 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect connect(); started = true; } + } - try + try + { + SessionImpl session = new SessionImpl(this, acknowledgeMode); + session.setQueueSession(_isQueueConnection); + session.setTopicSession(_isTopicConnection); + + boolean connectionStarted = false; + synchronized(_lock) { - SessionImpl session = new SessionImpl(this, acknowledgeMode); - session.setQueueSession(_isQueueConnection); - session.setTopicSession(_isTopicConnection); + checkClosed(); _sessions.add(session); - - if(_state == State.STARTED) - { - session.start(); - } - - return session; + connectionStarted = _state == State.STARTED; } - catch(JMSException e) + + if(connectionStarted) { - Error remoteError; - if(started - && e.getLinkedException() instanceof ConnectionErrorException - && (remoteError = ((ConnectionErrorException)e.getLinkedException()).getRemoteError()).getCondition() == ConnectionError.REDIRECT) - { - String networkHost = (String) remoteError.getInfo().get(Symbol.valueOf("network-host")); - int port = (Integer) remoteError.getInfo().get(Symbol.valueOf("port")); - String hostName = (String) remoteError.getInfo().get(Symbol.valueOf("hostname")); - reconnect(networkHost,port,hostName); - return createSession(acknowledgeMode); - - } - else - { - throw e; - } + session.start(); } + + return session; } + catch(JMSException e) + { + Error remoteError; + if(started + && e.getLinkedException() instanceof ConnectionErrorException + && (remoteError = ((ConnectionErrorException)e.getLinkedException()).getRemoteError()).getCondition() == ConnectionError.REDIRECT) + { + String networkHost = (String) remoteError.getInfo().get(Symbol.valueOf("network-host")); + int port = (Integer) remoteError.getInfo().get(Symbol.valueOf("port")); + String hostName = (String) remoteError.getInfo().get(Symbol.valueOf("hostname")); + reconnect(networkHost,port,hostName); + return createSession(acknowledgeMode); - + } + else + { + throw e; + } + } } void removeSession(SessionImpl session) @@ -272,6 +277,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect public void start() throws JMSException { + List stoppedSessions = null; synchronized(_lock) { checkClosed(); @@ -281,30 +287,30 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect // TODO _state = State.STARTED; - - for(SessionImpl session : _sessions) - { - session.start(); - } - + stoppedSessions = new ArrayList(_sessions); } _lock.notifyAll(); } + if (stoppedSessions != null) + { + for(SessionImpl session : stoppedSessions) + { + session.start(); + } + } } public void stop() throws JMSException { + List startedSessions = null; synchronized(_lock) { switch(_state) { case STARTED: - for(SessionImpl session : _sessions) - { - session.stop(); - } + startedSessions = new ArrayList(_sessions); case UNCONNECTED: _state = State.STOPPED; break; @@ -314,6 +320,14 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect _lock.notifyAll(); } + + if (startedSessions != null) + { + for(SessionImpl session : startedSessions) + { + session.stop(); + } + } } @@ -341,39 +355,34 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect public void close() throws JMSException { - Object outerLock; - if(_conn != null) + List sessions = null; + List closeTasks = null; + boolean closeConnection = false; + synchronized(_lock) { - outerLock = _conn.getEndpoint().getLock(); + if(_state != State.CLOSED) + { + _state = State.CLOSED; + sessions = new ArrayList(_sessions); + closeTasks = new ArrayList(_closeTasks); + closeConnection = _conn != null && _state != State.UNCONNECTED; + } + + _lock.notifyAll(); } - else + + if (sessions != null) { - outerLock = _lock; - } - - synchronized (outerLock) - { - synchronized(_lock) + for(SessionImpl session : sessions) { - if(_state != State.CLOSED) - { - stop(); - List sessions = new ArrayList(_sessions); - for(SessionImpl session : sessions) - { - session.close(); - } - for(CloseTask task : _closeTasks) - { - task.onClose(); - } - if(_conn != null && _state != State.UNCONNECTED ) { - _conn.close(); - } - _state = State.CLOSED; - } - - _lock.notifyAll(); + session.close(); + } + for(CloseTask task : closeTasks) + { + task.onClose(); + } + if(closeConnection) { + _conn.close(); } } } diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java index 5b9a67503b..ce1ce512a2 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java @@ -114,35 +114,38 @@ public class Session } - public synchronized SendingLinkEndpoint createSendingLinkEndpoint(final String linkName, - final Target target, - final Source source, - AcknowledgeMode mode, - Map unsettled, - final DeliveryStateHandler deliveryStateHandler) - { - SendingLinkEndpoint link = this.getEndpoint().createSendingLinkEndpoint(linkName, source, target, - unsettled, deliveryStateHandler); - - switch(mode) - { - case ALO: - link.setSendingSettlementMode(SenderSettleMode.UNSETTLED); - link.setReceivingSettlementMode(ReceiverSettleMode.FIRST); - break; - case AMO: - link.setSendingSettlementMode(SenderSettleMode.SETTLED); - break; - case EO: - link.setSendingSettlementMode(SenderSettleMode.UNSETTLED); - link.setReceivingSettlementMode(ReceiverSettleMode.SECOND); - break; - - } - - link.attach(); - - return link; + public SendingLinkEndpoint createSendingLinkEndpoint(final String linkName, + final Target target, + final Source source, + AcknowledgeMode mode, + Map unsettled, + final DeliveryStateHandler deliveryStateHandler) + { + SessionEndpoint endpoint = this.getEndpoint(); + synchronized(endpoint.getLock()) + { + SendingLinkEndpoint link = endpoint.createSendingLinkEndpoint(linkName, source, target, + unsettled, deliveryStateHandler); + + switch(mode) + { + case ALO: + link.setSendingSettlementMode(SenderSettleMode.UNSETTLED); + link.setReceivingSettlementMode(ReceiverSettleMode.FIRST); + break; + case AMO: + link.setSendingSettlementMode(SenderSettleMode.SETTLED); + break; + case EO: + link.setSendingSettlementMode(SenderSettleMode.UNSETTLED); + link.setReceivingSettlementMode(ReceiverSettleMode.SECOND); + break; + + } + + link.attach(); + return link; + } } public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException -- cgit v1.2.1