diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-03-30 15:53:18 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-03-30 15:53:18 +0000 |
commit | f50a093a9423f12b69d82996ec432b9198f90f27 (patch) | |
tree | a6f8530babee103ac60a42bb6a39e7f1fae87e66 | |
parent | 85c15e0200a785a7879f10f12aaa6f96d964553e (diff) | |
download | qpid-python-f50a093a9423f12b69d82996ec432b9198f90f27.tar.gz |
added state support to the new client and modified the example to illustrate it
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@524144 13f79535-47bb-0310-9956-ffa450edef68
20 files changed, 1303 insertions, 1149 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 5e566a5fe8..ec52c252b0 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -24,10 +24,6 @@ import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; - -import java.util.HashMap; -import java.util.Map; public class AMQDataBlockDecoder { @@ -68,13 +64,11 @@ public class AMQDataBlockDecoder BodyFactory bodyFactory; if (type == AMQRequestBody.TYPE) { - AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); - bodyFactory = new AMQRequestBodyFactory(protocolSession); + bodyFactory = new AMQRequestBodyFactory(); } else if (type == AMQResponseBody.TYPE) { - AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); - bodyFactory = new AMQResponseBodyFactory(protocolSession); + bodyFactory = new AMQResponseBodyFactory(); } else { diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java index 6067e2fce5..e6395e5a28 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -22,21 +22,19 @@ package org.apache.qpid.framing; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -public class AMQMethodBodyFactory implements BodyFactory +public class AMQMethodBodyFactory implements BodyFactory, ProtocolVersionList { private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class); - private final AMQVersionAwareProtocolSession _protocolSession; + VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]); - public AMQMethodBodyFactory(AMQVersionAwareProtocolSession protocolSession) + public AMQMethodBodyFactory() { - _protocolSession = protocolSession; } public AMQMethodBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { - return _protocolSession.getRegistry().get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize); + return _registry.get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java index 3bc16601b6..04d8c99af3 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java @@ -21,7 +21,6 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; public class AMQRequestBody extends AMQBody { @@ -31,12 +30,10 @@ public class AMQRequestBody extends AMQBody protected long requestId; protected long responseMark; protected AMQMethodBody methodPayload; - protected AMQVersionAwareProtocolSession protocolSession; - + // Constructor - public AMQRequestBody(AMQVersionAwareProtocolSession protocolSession) + public AMQRequestBody() { - this.protocolSession = protocolSession; } public AMQRequestBody(long requestId, long responseMark, @@ -45,7 +42,6 @@ public class AMQRequestBody extends AMQBody this.requestId = requestId; this.responseMark = responseMark; this.methodPayload = methodPayload; - protocolSession = null; } @@ -75,17 +71,12 @@ public class AMQRequestBody extends AMQBody protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException - { - if (protocolSession == null) - { - throw new AMQFrameDecodingException("Cannot call populateFromBuffer() without using correct constructor."); - } - + { requestId = EncodingUtils.readLong(buffer); responseMark = EncodingUtils.readLong(buffer); int reserved = EncodingUtils.readInteger(buffer); // reserved, throw away - AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory(protocolSession); + AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory(); methodPayload = methodBodyFactory.createBody(buffer, size); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java index 9d47ccd68e..a11e8587e8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java @@ -21,20 +21,16 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; public class AMQRequestBodyFactory implements BodyFactory -{ - private final AMQVersionAwareProtocolSession protocolSession; - - public AMQRequestBodyFactory(AMQVersionAwareProtocolSession protocolSession) +{ + public AMQRequestBodyFactory() { - this.protocolSession = protocolSession; } public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { - AMQRequestBody rb = new AMQRequestBody(protocolSession); + AMQRequestBody rb = new AMQRequestBody(); rb.populateFromBuffer(in, bodySize); return rb; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java index 7b35aaeb86..c27a76d922 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java @@ -21,7 +21,6 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; public class AMQResponseBody extends AMQBody { @@ -32,12 +31,10 @@ public class AMQResponseBody extends AMQBody protected long requestId; protected int batchOffset; protected AMQMethodBody methodPayload; - protected AMQVersionAwareProtocolSession protocolSession; // Constructor - public AMQResponseBody(AMQVersionAwareProtocolSession protocolSession) + public AMQResponseBody() { - this.protocolSession = protocolSession; } public AMQResponseBody(long responseId, long requestId, @@ -47,7 +44,6 @@ public class AMQResponseBody extends AMQBody this.requestId = requestId; this.batchOffset = batchOffset; this.methodPayload = methodPayload; - protocolSession = null; } // Field methods @@ -77,16 +73,13 @@ public class AMQResponseBody extends AMQBody protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException - { - if (protocolSession == null) - throw new AMQFrameDecodingException("Cannot call populateFromBuffer() without using correct constructor."); - + { responseId = EncodingUtils.readLong(buffer); requestId = EncodingUtils.readLong(buffer); // XXX batchOffset = EncodingUtils.readInteger(buffer); - AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory(protocolSession); + AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory(); methodPayload = methodBodyFactory.createBody(buffer, size); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java index 4209aad11f..bf94afbe17 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java @@ -25,16 +25,13 @@ import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; public class AMQResponseBodyFactory implements BodyFactory { - private final AMQVersionAwareProtocolSession protocolSession; - - public AMQResponseBodyFactory(AMQVersionAwareProtocolSession protocolSession) + public AMQResponseBodyFactory() { - this.protocolSession = protocolSession; } public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { - AMQResponseBody rb = new AMQResponseBody(protocolSession); + AMQResponseBody rb = new AMQResponseBody(); rb.populateFromBuffer(in, bodySize); return rb; } diff --git a/java/newclient/.project b/java/newclient/.project index f1c93ad7a9..4d42311982 100644 --- a/java/newclient/.project +++ b/java/newclient/.project @@ -1,7 +1,10 @@ <projectDescription> <name>qpid-newclient</name> <comment/> - <projects/> + <projects> + <project>qpid-broker</project> + <project>qpid-common</project> + </projects> <buildSpec> <buildCommand> <name>org.eclipse.jdt.core.javabuilder</name> diff --git a/java/newclient/pom.xml b/java/newclient/pom.xml index d2d27f3fd7..5a82feb1d2 100644 --- a/java/newclient/pom.xml +++ b/java/newclient/pom.xml @@ -58,6 +58,12 @@ </dependency> <dependency> + <groupId>commons-configuration</groupId> + <artifactId>commons-configuration</artifactId> + <version>1.3</version> + </dependency> + + <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> </dependency> diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java index 7ba9120415..e480f38b93 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java @@ -40,7 +40,10 @@ import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; import org.apache.qpid.nclient.amqp.state.AMQPState; +import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; +import org.apache.qpid.nclient.amqp.state.AMQPStateManager; +import org.apache.qpid.nclient.amqp.state.AMQPStateType; import org.apache.qpid.nclient.config.ClientConfiguration; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.Phase; @@ -58,295 +61,309 @@ import org.apache.qpid.nclient.util.AMQPValidator; public class AMQPChannel extends AMQPStateMachine implements AMQPMethodListener { - private static final Logger _logger = Logger.getLogger(AMQPChannel.class); + private static final Logger _logger = Logger.getLogger(AMQPChannel.class); - // the channelId assigned for this channel - private int _channelId; + // the channelId assigned for this channel + private int _channelId; - private Phase _phase; + private Phase _phase; - private AMQPState _currentState; + private AMQPState _currentState; - private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND }; + private AMQPStateManager _stateManager; - private final AMQPState[] _validResumeStates = new AMQPState[] { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED }; + private final AMQPState[] _validCloseStates = new AMQPState[] + { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND }; - // The wait period until a server sends a respond - private long _serverTimeOut = 1000; + private final AMQPState[] _validResumeStates = new AMQPState[] + { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED }; - private final Lock _lock = new ReentrantLock(); + // The wait period until a server sends a respond + private long _serverTimeOut = 1000; - private final Condition _channelNotOpend = _lock.newCondition(); + private final Lock _lock = new ReentrantLock(); - private final Condition _channelNotClosed = _lock.newCondition(); + private final Condition _channelNotOpend = _lock.newCondition(); - private final Condition _channelFlowNotResponded = _lock.newCondition(); + private final Condition _channelNotClosed = _lock.newCondition(); - private final Condition _channelNotResumed = _lock.newCondition(); + private final Condition _channelFlowNotResponded = _lock.newCondition(); - private ChannelOpenOkBody _channelOpenOkBody; + private final Condition _channelNotResumed = _lock.newCondition(); - private ChannelCloseOkBody _channelCloseOkBody; + private ChannelOpenOkBody _channelOpenOkBody; - private ChannelFlowOkBody _channelFlowOkBody; + private ChannelCloseOkBody _channelCloseOkBody; - private ChannelOkBody _channelOkBody; - - private ChannelCloseBody _channelCloseBody; + private ChannelFlowOkBody _channelFlowOkBody; - protected AMQPChannel(int channelId, Phase phase) - { - _channelId = channelId; - _phase = phase; - _currentState = AMQPState.CHANNEL_NOT_OPENED; - _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); - } + private ChannelOkBody _channelOkBody; - /** - * ------------------------------------------- - * API Methods - * -------------------------------------------- - */ + private ChannelCloseBody _channelCloseBody; - /** - * Opens the channel - */ - public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException - { - _lock.lock(); - try + protected AMQPChannel(int channelId, Phase phase, AMQPStateManager stateManager) { - _channelOpenOkBody = null; - checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _channelNotOpend.await(); - checkIfConnectionClosed(); - AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time"); - _currentState = AMQPState.CHANNEL_OPENED; - return _channelOpenOkBody; + _channelId = channelId; + _phase = phase; + _stateManager = stateManager; + _currentState = AMQPState.CHANNEL_NOT_OPENED; + _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); } - catch (Exception e) - { - throw new AMQPException("Error in channel.open", e); - } - finally - { - _lock.unlock(); - } - } - - /** - * Close the channel - */ - public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException - { - _lock.lock(); - try - { - _channelCloseOkBody = null; - checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _channelNotClosed.await(); - AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time"); - _currentState = AMQPState.CHANNEL_CLOSED; - return _channelCloseOkBody; - } - catch (Exception e) - { - throw new AMQPException("Error in channel.close", e); - } - finally - { - _lock.unlock(); - } - } - - /** - * Channel Flow - */ - public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException - { - _lock.lock(); - try - { - _channelFlowOkBody = null; - if (channelFlowBody.active) - { - checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED); - } - else - { - checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND); - } - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _channelFlowNotResponded.await(); - checkIfConnectionClosed(); - AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time"); - handleChannelFlowState(_channelFlowOkBody.active); - return _channelFlowOkBody; - } - catch (Exception e) - { - throw new AMQPException("Error in channel.flow", e); - } - finally - { - _lock.unlock(); - } - } - - /** - * Close the channel - */ - public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException - { - _lock.lock(); - try + + /** + * ------------------------------------------- + * API Methods + * -------------------------------------------- + */ + + /** + * Opens the channel + */ + public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException { - _channelOkBody = null; - checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _channelNotResumed.await(); - checkIfConnectionClosed(); - AMQPValidator.throwExceptionOnNull(_channelOkBody, - "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time"); - _currentState = AMQPState.CHANNEL_OPENED; - return _channelOkBody; + _lock.lock(); + try + { + _channelOpenOkBody = null; + checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _channelNotOpend.await(); + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time"); + notifyState(AMQPState.CHANNEL_OPENED); + _currentState = AMQPState.CHANNEL_OPENED; + return _channelOpenOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in channel.open", e); + } + finally + { + _lock.unlock(); + } } - catch (Exception e) + + /** + * Close the channel + */ + public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException { - throw new AMQPException("Error in channel.resume", e); + _lock.lock(); + try + { + _channelCloseOkBody = null; + checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _channelNotClosed.await(); + AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time"); + notifyState(AMQPState.CHANNEL_CLOSED); + _currentState = AMQPState.CHANNEL_CLOSED; + return _channelCloseOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in channel.close", e); + } + finally + { + _lock.unlock(); + } } - finally + + /** + * Channel Flow + */ + public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException { - _lock.unlock(); + _lock.lock(); + try + { + _channelFlowOkBody = null; + if (channelFlowBody.active) + { + checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED); + } + else + { + checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND); + } + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _channelFlowNotResponded.await(); + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time"); + handleChannelFlowState(_channelFlowOkBody.active); + return _channelFlowOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in channel.flow", e); + } + finally + { + _lock.unlock(); + } } - } - - /** - * ------------------------------------------- - * AMQPMethodListener methods - * -------------------------------------------- - */ - public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException - { - _lock.lock(); - try + + /** + * Close the channel + */ + public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException { - if (evt.getMethod() instanceof ChannelOpenOkBody) - { - _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod(); - _channelNotOpend.signal(); - return true; - } - else if (evt.getMethod() instanceof ChannelCloseOkBody) - { - _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod(); - _channelNotClosed.signal(); - return true; - } - else if (evt.getMethod() instanceof ChannelCloseBody) - { - _channelCloseBody = (ChannelCloseBody)evt.getMethod(); - // release the correct lock as u may have some conditions waiting. - // while an error occured and the broker has sent a close. - releaseLocks(); - handleChannelClose(_channelCloseBody); - return true; - } - else if (evt.getMethod() instanceof ChannelFlowOkBody) - { - _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod(); - _channelFlowNotResponded.signal(); - return true; - } - else if (evt.getMethod() instanceof ChannelFlowBody) - { - handleChannelFlow((ChannelFlowBody) evt.getMethod()); - return true; - } - else if (evt.getMethod() instanceof ChannelOkBody) - { - _channelOkBody = (ChannelOkBody) evt.getMethod(); - // In this case the only method expecting channel-ok is channel-resume - // haven't implemented ping and pong. - _channelNotResumed.signal(); - return true; - } - else - { - return false; - } + _lock.lock(); + try + { + _channelOkBody = null; + checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _channelNotResumed.await(); + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_channelOkBody, + "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time"); + notifyState(AMQPState.CHANNEL_OPENED); + _currentState = AMQPState.CHANNEL_OPENED; + return _channelOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in channel.resume", e); + } + finally + { + _lock.unlock(); + } } - finally + + /** + * ------------------------------------------- + * AMQPMethodListener methods + * -------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException { - _lock.unlock(); + _lock.lock(); + try + { + if (evt.getMethod() instanceof ChannelOpenOkBody) + { + _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod(); + _channelNotOpend.signal(); + return true; + } + else if (evt.getMethod() instanceof ChannelCloseOkBody) + { + _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod(); + _channelNotClosed.signal(); + return true; + } + else if (evt.getMethod() instanceof ChannelCloseBody) + { + _channelCloseBody = (ChannelCloseBody) evt.getMethod(); + // release the correct lock as u may have some conditions waiting. + // while an error occured and the broker has sent a close. + releaseLocks(); + handleChannelClose(_channelCloseBody); + return true; + } + else if (evt.getMethod() instanceof ChannelFlowOkBody) + { + _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod(); + _channelFlowNotResponded.signal(); + return true; + } + else if (evt.getMethod() instanceof ChannelFlowBody) + { + handleChannelFlow((ChannelFlowBody) evt.getMethod()); + return true; + } + else if (evt.getMethod() instanceof ChannelOkBody) + { + _channelOkBody = (ChannelOkBody) evt.getMethod(); + // In this case the only method expecting channel-ok is channel-resume + // haven't implemented ping and pong. + _channelNotResumed.signal(); + return true; + } + else + { + return false; + } + } + finally + { + _lock.unlock(); + } } - } - - private void handleChannelClose(ChannelCloseBody channelCloseBody) - { - _currentState = AMQPState.CHANNEL_CLOSED; - // handle channel related cleanup - } - - private void releaseLocks() - { - if(_currentState == AMQPState.CHANNEL_NOT_OPENED) + + private void handleChannelClose(ChannelCloseBody channelCloseBody) throws AMQPException { - _channelNotOpend.signal(); - _channelNotResumed.signal(); // It could be a channel.resume call + notifyState(AMQPState.CHANNEL_CLOSED); + _currentState = AMQPState.CHANNEL_CLOSED; + // handle channel related cleanup } - else if(_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND) + + private void releaseLocks() { - _channelFlowNotResponded.signal(); + if (_currentState == AMQPState.CHANNEL_NOT_OPENED) + { + _channelNotOpend.signal(); + _channelNotResumed.signal(); // It could be a channel.resume call + } + else if (_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND) + { + _channelFlowNotResponded.signal(); + } + else if (_currentState == AMQPState.CHANNEL_CLOSED) + { + _channelNotResumed.signal(); + } } - else if(_currentState == AMQPState.CHANNEL_CLOSED) + + private void checkIfConnectionClosed() throws AMQPException { - _channelNotResumed.signal(); + if (_channelCloseBody != null) + { + String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() + " with reply code (" + + _channelCloseBody.getReplyCode() + ") " + "caused by class " + _channelCloseBody.getClassId() + " and method " + + _channelCloseBody.getMethod(); + + throw new AMQPException(error); + } } - } - - private void checkIfConnectionClosed()throws AMQPException - { - if (_channelCloseBody != null) + + private void handleChannelFlow(ChannelFlowBody channelFlowBody)throws AMQPException { - String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() + - " with reply code (" + _channelCloseBody.getReplyCode() + ") " + - "caused by class " + _channelCloseBody.getClassId() + - " and method " + _channelCloseBody.getMethod(); - - throw new AMQPException(error); + _lock.lock(); + try + { + handleChannelFlowState(channelFlowBody.active); + } + finally + { + _lock.unlock(); + } } - } - private void handleChannelFlow(ChannelFlowBody channelFlowBody) - { - _lock.lock(); - try + private void handleChannelFlowState(boolean flow)throws AMQPException { - handleChannelFlowState(channelFlowBody.active); + notifyState((flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND); + _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND; } - finally + + private void notifyState(AMQPState newState) throws AMQPException { - _lock.unlock(); + _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE)); } - } - - private void handleChannelFlowState(boolean flow) - { - _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND; - } } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java index a3f9b2c24d..f90a5231a2 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java @@ -78,178 +78,178 @@ import org.apache.qpid.url.URLSyntaxException; */ public class AMQPClassFactory { - //Need an event manager per connection - private AMQPEventManager _eventManager = new QpidEventManager(); - - // Need a state manager per connection - private AMQPStateManager _stateManager = new QpidStateManager(); - - //Need a phase pipe per connection - private Phase _phase; - - //One instance per connection - private AMQPConnection _amqpConnection; - - public AMQPClassFactory() - { - - } - - public AMQPConnection createConnection(String urlStr,ConnectionType type)throws AMQPException, URLSyntaxException - { - AMQPConnectionURL url = new AMQPConnectionURL(urlStr); - return createConnectionClass(url,type); - } - - public AMQPConnection createConnectionClass(ConnectionURL url,ConnectionType type)throws AMQPException - { - if (_amqpConnection == null) + //Need an event manager per connection + private AMQPEventManager _eventManager = new QpidEventManager(); + + // Need a state manager per connection + private AMQPStateManager _stateManager = new QpidStateManager(); + + //Need a phase pipe per connection + private Phase _phase; + + //One instance per connection + private AMQPConnection _amqpConnection; + + public AMQPClassFactory() + { + + } + + public AMQPConnection createConnection(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException + { + AMQPConnectionURL url = new AMQPConnectionURL(urlStr); + return createConnectionClass(url, type); + } + + public AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException { - PhaseContext ctx = new DefaultPhaseContext(); - ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager); - - TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type,ctx); - _amqpConnection = new AMQPConnection(conn); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class,_amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class,_amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class,_amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class,_amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class,_amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class,_amqpConnection); + if (_amqpConnection == null) + { + PhaseContext ctx = new DefaultPhaseContext(); + ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager); + + TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type, ctx); + _amqpConnection = new AMQPConnection(conn, _stateManager); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection); + } + return _amqpConnection; } - return _amqpConnection; - } - - public AMQPChannel createChannelClass(int channel)throws AMQPException - { - checkIfConnectionStarted(); - AMQPChannel amqpChannel = new AMQPChannel(channel,_phase); - _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class,amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelCloseBody.class,amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class,amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelFlowBody.class,amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class,amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelOkBody.class,amqpChannel); - return amqpChannel; - } - - public void destroyChannelClass(int channel,AMQPChannel amqpChannel)throws AMQPException - { - _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class,amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class,amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class,amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class,amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class,amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelOkBody.class,amqpChannel); - } - - public AMQPExchange createExchangeClass(int channel)throws AMQPException - { - checkIfConnectionStarted(); - AMQPExchange amqpExchange = new AMQPExchange(channel,_phase); - _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class,amqpExchange); - _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class,amqpExchange); - return amqpExchange; - } - - public void destoryExchangeClass(int channel, AMQPExchange amqpExchange)throws AMQPException - { - _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class,amqpExchange); - _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class,amqpExchange); - } - - public AMQPQueue createQueueClass(int channel)throws AMQPException - { - checkIfConnectionStarted(); - AMQPQueue amqpQueue = new AMQPQueue(channel,_phase); - _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class,amqpQueue); - _eventManager.addMethodEventListener(channel, QueueBindOkBody.class,amqpQueue); - _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class,amqpQueue); - _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class,amqpQueue); - _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class,amqpQueue); - return amqpQueue; - } - - public void destroyQueueClass(int channel,AMQPQueue amqpQueue)throws AMQPException - { - _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class,amqpQueue); - _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class,amqpQueue); - _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class,amqpQueue); - _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class,amqpQueue); - _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class,amqpQueue); - } - - public AMQPMessage createMessageClass(int channel,AMQPMessageCallBack messageCb)throws AMQPException - { - checkIfConnectionStarted(); - AMQPMessage amqpMessage = new AMQPMessage(channel,_phase,messageCb); - _eventManager.addMethodEventListener(channel, MessageAppendBody.class,amqpMessage); - _eventManager.addMethodEventListener(channel, MessageCancelBody.class,amqpMessage); - _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class,amqpMessage); - _eventManager.addMethodEventListener(channel, MessageCloseBody.class,amqpMessage); - _eventManager.addMethodEventListener(channel, MessageGetBody.class,amqpMessage); - _eventManager.addMethodEventListener(channel, MessageOffsetBody.class,amqpMessage); - _eventManager.addMethodEventListener(channel, MessageOkBody.class,amqpMessage); - _eventManager.addMethodEventListener(channel, MessageOpenBody.class,amqpMessage); - _eventManager.addMethodEventListener(channel, MessageRecoverBody.class,amqpMessage); - _eventManager.addMethodEventListener(channel, MessageRejectBody.class,amqpMessage); - _eventManager.addMethodEventListener(channel, MessageResumeBody.class,amqpMessage); - _eventManager.addMethodEventListener(channel, MessageQosBody.class,amqpMessage); - _eventManager.addMethodEventListener(channel, MessageTransferBody.class,amqpMessage); - - return amqpMessage; - } - - public void destoryMessageClass(int channel,AMQPMessage amqpMessage)throws AMQPException - { - _eventManager.removeMethodEventListener(channel, MessageAppendBody.class,amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageCancelBody.class,amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class,amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageCloseBody.class,amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageGetBody.class,amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class,amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageOkBody.class,amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageOpenBody.class,amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class,amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageRejectBody.class,amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageResumeBody.class,amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageQosBody.class,amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageTransferBody.class,amqpMessage); - } - - //This class should register as a state listener for AMQPConnection - private void checkIfConnectionStarted() throws AMQPException - { - if (_phase == null) + + public AMQPChannel createChannelClass(int channel) throws AMQPException + { + checkIfConnectionStarted(); + AMQPChannel amqpChannel = new AMQPChannel(channel, _phase,_stateManager); + _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelCloseBody.class, amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelFlowBody.class, amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelOkBody.class, amqpChannel); + return amqpChannel; + } + + public void destroyChannelClass(int channel, AMQPChannel amqpChannel) throws AMQPException + { + _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelOkBody.class, amqpChannel); + } + + public AMQPExchange createExchangeClass(int channel) throws AMQPException + { + checkIfConnectionStarted(); + AMQPExchange amqpExchange = new AMQPExchange(channel, _phase); + _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange); + _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange); + return amqpExchange; + } + + public void destoryExchangeClass(int channel, AMQPExchange amqpExchange) throws AMQPException + { + _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange); + _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange); + } + + public AMQPQueue createQueueClass(int channel) throws AMQPException + { + checkIfConnectionStarted(); + AMQPQueue amqpQueue = new AMQPQueue(channel, _phase); + _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue); + _eventManager.addMethodEventListener(channel, QueueBindOkBody.class, amqpQueue); + _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue); + _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue); + _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue); + return amqpQueue; + } + + public void destroyQueueClass(int channel, AMQPQueue amqpQueue) throws AMQPException + { + _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue); + _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, amqpQueue); + _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue); + _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue); + _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue); + } + + public AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException + { + checkIfConnectionStarted(); + AMQPMessage amqpMessage = new AMQPMessage(channel, _phase, messageCb); + _eventManager.addMethodEventListener(channel, MessageAppendBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageCancelBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageCloseBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageGetBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageOffsetBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageOkBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageOpenBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageRecoverBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageRejectBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageResumeBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageQosBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageTransferBody.class, amqpMessage); + + return amqpMessage; + } + + public void destoryMessageClass(int channel, AMQPMessage amqpMessage) throws AMQPException + { + _eventManager.removeMethodEventListener(channel, MessageAppendBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageCancelBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageCloseBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageGetBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageOkBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageOpenBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageRejectBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageResumeBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageQosBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageTransferBody.class, amqpMessage); + } + + //This class should register as a state listener for AMQPConnection + private void checkIfConnectionStarted() throws AMQPException + { + if (_phase == null) + { + _phase = _amqpConnection.getPhasePipe(); + + if (_phase == null) + { + throw new AMQPException("Cannot create a channel until connection is ready"); + } + } + } + + /** + * Extention point + * Other interested parties can obtain a reference to the event manager + * and add listeners to get notified of events + * + */ + public AMQPEventManager getEventManager() + { + return _eventManager; + } + + /** + * Extention point + * Other interested parties can obtain a reference to the state manager + * and add listeners to get notified of state changes + * + */ + public AMQPStateManager getStateManager() { - _phase = _amqpConnection.getPhasePipe(); - - if (_phase == null) - { - throw new AMQPException("Cannot create a channel until connection is ready"); - } + return _stateManager; } - } - - /** - * Extention point - * Other interested parties can obtain a reference to the event manager - * and add listeners to get notified of events - * - */ - public AMQPEventManager getEventManager() - { - return _eventManager; - } - - /** - * Extention point - * Other interested parties can obtain a reference to the state manager - * and add listeners to get notified of state changes - * - */ - public AMQPStateManager getStateManager() - { - return null; - } } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java index aea25a403b..1d6541ec3e 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java @@ -40,7 +40,10 @@ import org.apache.qpid.framing.ConnectionTuneOkBody; import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; import org.apache.qpid.nclient.amqp.state.AMQPState; +import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; +import org.apache.qpid.nclient.amqp.state.AMQPStateManager; +import org.apache.qpid.nclient.amqp.state.AMQPStateType; import org.apache.qpid.nclient.config.ClientConfiguration; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.Phase; @@ -55,366 +58,383 @@ import org.apache.qpid.nclient.util.AMQPValidator; */ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListener { - private static final Logger _logger = Logger.getLogger(AMQPConnection.class); + private static final Logger _logger = Logger.getLogger(AMQPConnection.class); - private Phase _phase; + private Phase _phase; - private TransportConnection _connection; + private TransportConnection _connection; - private long _correlationId; + private long _correlationId; - private AMQPState _currentState; + private AMQPState _currentState; - private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, - AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, AMQPState.CONNECTION_OPEN, }; + private AMQPStateManager _stateManager; - // The wait period until a server sends a respond - private long _serverTimeOut = 1000; + private final AMQPState[] _validCloseStates = new AMQPState[] + { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, + AMQPState.CONNECTION_OPEN, }; - private final Lock _lock = new ReentrantLock(); + // The wait period until a server sends a respond + private long _serverTimeOut = 1000; - private final Condition _connectionNotStarted = _lock.newCondition(); + private final Lock _lock = new ReentrantLock(); - private final Condition _connectionNotSecure = _lock.newCondition(); + private final Condition _connectionNotStarted = _lock.newCondition(); - private final Condition _connectionNotTuned = _lock.newCondition(); + private final Condition _connectionNotSecure = _lock.newCondition(); - private final Condition _connectionNotOpened = _lock.newCondition(); + private final Condition _connectionNotTuned = _lock.newCondition(); - private final Condition _connectionNotClosed = _lock.newCondition(); + private final Condition _connectionNotOpened = _lock.newCondition(); - private ConnectionStartBody _connectionStartBody; + private final Condition _connectionNotClosed = _lock.newCondition(); - private ConnectionSecureBody _connectionSecureBody; + private ConnectionStartBody _connectionStartBody; - private ConnectionTuneBody _connectionTuneBody; + private ConnectionSecureBody _connectionSecureBody; - private ConnectionOpenOkBody _connectionOpenOkBody; + private ConnectionTuneBody _connectionTuneBody; - private ConnectionCloseOkBody _connectionCloseOkBody; - - private ConnectionCloseBody _connectionCloseBody; + private ConnectionOpenOkBody _connectionOpenOkBody; - protected AMQPConnection(TransportConnection connection) - { - _connection = connection; - _currentState = AMQPState.CONNECTION_UNDEFINED; - _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); - } + private ConnectionCloseOkBody _connectionCloseOkBody; - /** - * ------------------------------------------- - * API Methods - * -------------------------------------------- - */ + private ConnectionCloseBody _connectionCloseBody; - /** - * Opens the TCP connection and let the formalities begin. - */ - public ConnectionStartBody openTCPConnection() throws AMQPException - { - _lock.lock(); - // open the TCP connection - try + protected AMQPConnection(TransportConnection connection, AMQPStateManager stateManager) { - _connectionStartBody = null; - checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED); - _phase = _connection.connect(); - - // waiting for ConnectionStartBody or error in connection - //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _connectionNotStarted.await(); - - checkIfConnectionClosed(); - AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time"); - _currentState = AMQPState.CONNECTION_NOT_STARTED; - return _connectionStartBody; + _connection = connection; + _stateManager = stateManager; + _currentState = AMQPState.CONNECTION_UNDEFINED; + _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); } - catch (InterruptedException e) - { - throw new AMQPException("Error opening connection to broker", e); - } - finally - { - _lock.unlock(); - } - } - - /** - * The current java broker implementation can send a connection tune body - * as a response to the startOk. Not sure if that is the correct behaviour. - */ - public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException - { - _lock.lock(); - try - { - _connectionSecureBody = null; - checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId); - _phase.messageSent(msg); - //_connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _connectionNotSecure.await(); - //AMQPValidator.throwExceptionOnNull(_connectionSecureBody, "The broker didn't send the ConnectionSecureBody in time"); - //_currentState = AMQPState.CONNECTION_NOT_SECURE; - - checkIfConnectionClosed(); - if (_connectionTuneBody != null) - { - _currentState = AMQPState.CONNECTION_NOT_TUNED; - return _connectionTuneBody; - } - else if (_connectionSecureBody != null) - { // oops the server sent another challenge - _currentState = AMQPState.CONNECTION_NOT_SECURE; - return _connectionSecureBody; - } - else - { - throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); - } - } - catch (InterruptedException e) - { - throw new AMQPException("Error in connection.startOk", e); - } - finally - { - _lock.unlock(); - } - } - - /** - * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could - * issue a new challenge - */ - public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException - { - _lock.lock(); - try - { - _connectionTuneBody = null; - _connectionSecureBody = null; - checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED); - - _connectionSecureBody = null; // The server could send a fresh challenge - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId); - _phase.messageSent(msg); - - //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _connectionNotTuned.await(); - checkIfConnectionClosed(); - - if (_connectionTuneBody != null) - { - _currentState = AMQPState.CONNECTION_NOT_TUNED; - return _connectionTuneBody; - } - else if (_connectionSecureBody != null) - { // oops the server sent another challenge - _currentState = AMQPState.CONNECTION_NOT_SECURE; - return _connectionSecureBody; - } - else - { - throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); - } - } - catch (InterruptedException e) - { - throw new AMQPException("Error in connection.secureOk", e); - } - finally - { - _lock.unlock(); - } - } - public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException - { - _lock.lock(); - try - { - checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED); - _connectionSecureBody = null; - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId); - _phase.messageSent(msg); - _currentState = AMQPState.CONNECTION_NOT_OPENED; - } - finally - { - _lock.unlock(); - } - } + /** + * ------------------------------------------- + * API Methods + * -------------------------------------------- + */ - public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException - { - _lock.lock(); - try + /** + * Opens the TCP connection and let the formalities begin. + */ + public ConnectionStartBody openTCPConnection() throws AMQPException { - // If the broker sends a connection close due to an error with the - // Connection tune ok, then this call will verify that - checkIfConnectionClosed(); - - _connectionOpenOkBody = null; - checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _connectionNotOpened.await(); - - checkIfConnectionClosed(); - AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time"); - _currentState = AMQPState.CONNECTION_OPEN; - return _connectionOpenOkBody; + _lock.lock(); + // open the TCP connection + try + { + _connectionStartBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED); + _phase = _connection.connect(); + + // waiting for ConnectionStartBody or error in connection + //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotStarted.await(); + + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time"); + notifyState(AMQPState.CONNECTION_NOT_STARTED); + _currentState = AMQPState.CONNECTION_NOT_STARTED; + return _connectionStartBody; + } + catch (InterruptedException e) + { + throw new AMQPException("Error opening connection to broker", e); + } + finally + { + _lock.unlock(); + } } - catch (InterruptedException e) - { - throw new AMQPException("Error in connection.open", e); - } - finally - { - _lock.unlock(); - } - } - public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException - { - _lock.lock(); - try + /** + * The current java broker implementation can send a connection tune body + * as a response to the startOk. Not sure if that is the correct behaviour. + */ + public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException { - _connectionCloseOkBody = null; - checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); - AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time"); - _currentState = AMQPState.CONNECTION_CLOSED; - return _connectionCloseOkBody; + _lock.lock(); + try + { + _connectionSecureBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId); + _phase.messageSent(msg); + // _connectionNotSecure.await(_serverTimeOut,TimeUnit.MILLISECONDS); + _connectionNotSecure.await(); + + checkIfConnectionClosed(); + if (_connectionTuneBody != null) + { + notifyState(AMQPState.CONNECTION_NOT_TUNED); + _currentState = AMQPState.CONNECTION_NOT_TUNED; + return _connectionTuneBody; + } + else if (_connectionSecureBody != null) + { // oops the server sent another challenge + notifyState(AMQPState.CONNECTION_NOT_SECURE); + _currentState = AMQPState.CONNECTION_NOT_SECURE; + return _connectionSecureBody; + } + else + { + throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); + } + } + catch (InterruptedException e) + { + throw new AMQPException("Error in connection.startOk", e); + } + finally + { + _lock.unlock(); + } } - catch (InterruptedException e) + + /** + * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could + * issue a new challenge + */ + public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException { - throw new AMQPException("Error in connection.close", e); + _lock.lock(); + try + { + _connectionTuneBody = null; + _connectionSecureBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED); + + _connectionSecureBody = null; // The server could send a fresh challenge + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId); + _phase.messageSent(msg); + + //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotTuned.await(); + checkIfConnectionClosed(); + + if (_connectionTuneBody != null) + { + notifyState(AMQPState.CONNECTION_NOT_TUNED); + _currentState = AMQPState.CONNECTION_NOT_TUNED; + return _connectionTuneBody; + } + else if (_connectionSecureBody != null) + { // oops the server sent another challenge + notifyState(AMQPState.CONNECTION_NOT_SECURE); + _currentState = AMQPState.CONNECTION_NOT_SECURE; + return _connectionSecureBody; + } + else + { + throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); + } + } + catch (InterruptedException e) + { + throw new AMQPException("Error in connection.secureOk", e); + } + finally + { + _lock.unlock(); + } } - finally + + public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException { - _lock.unlock(); + _lock.lock(); + try + { + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED); + _connectionSecureBody = null; + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId); + _phase.messageSent(msg); + notifyState(AMQPState.CONNECTION_NOT_OPENED); + _currentState = AMQPState.CONNECTION_NOT_OPENED; + } + finally + { + _lock.unlock(); + } } - } - - /** - * ------------------------------------------- AMQMethodListener methods - * -------------------------------------------- - */ - public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException - { - _lock.lock(); - try + + public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException { - _correlationId = evt.getCorrelationId(); - - if (evt.getMethod() instanceof ConnectionStartBody) - { - _connectionStartBody = (ConnectionStartBody) evt.getMethod(); - _connectionNotStarted.signalAll(); - return true; - } - else if (evt.getMethod() instanceof ConnectionSecureBody) - { - _connectionSecureBody = (ConnectionSecureBody) evt.getMethod(); - _connectionNotSecure.signal(); - _connectionNotTuned.signal(); // in case the server has sent another chanllenge - return true; - } - else if (evt.getMethod() instanceof ConnectionTuneBody) - { - _connectionTuneBody = (ConnectionTuneBody) evt.getMethod(); - _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk - _connectionNotTuned.signal(); - return true; - } - else if (evt.getMethod() instanceof ConnectionOpenOkBody) - { - _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod(); - _connectionNotOpened.signal(); - return true; - } - else if (evt.getMethod() instanceof ConnectionCloseOkBody) - { - _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod(); - _connectionNotClosed.signal(); - return true; - } - else if (evt.getMethod() instanceof ConnectionCloseBody) - { - _connectionCloseBody = (ConnectionCloseBody)evt.getMethod(); - // release the correct lock as u may have some conditions waiting. - // while an error occured and the broker has sent a close. - releaseLocks(); - handleClose(); - return true; - } - else - { - return false; - } + _lock.lock(); + try + { + // If the broker sends a connection close due to an error with the + // Connection tune ok, then this call will verify that + checkIfConnectionClosed(); + + _connectionOpenOkBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotOpened.await(); + + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time"); + notifyState(AMQPState.CONNECTION_OPEN); + _currentState = AMQPState.CONNECTION_OPEN; + return _connectionOpenOkBody; + } + catch (InterruptedException e) + { + throw new AMQPException("Error in connection.open", e); + } + finally + { + _lock.unlock(); + } } - finally + + public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException { - _lock.unlock(); - } - } - - private void handleClose() throws AMQPException - { - try - { - _currentState = AMQPState.CONNECTION_CLOSING; - // do the required cleanup and send a ConnectionCloseOkBody + _lock.lock(); + try + { + _connectionCloseOkBody = null; + checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time"); + notifyState(AMQPState.CONNECTION_CLOSED); + _currentState = AMQPState.CONNECTION_CLOSED; + return _connectionCloseOkBody; + } + catch (InterruptedException e) + { + throw new AMQPException("Error in connection.close", e); + } + finally + { + _lock.unlock(); + } } - catch (Exception e) + + /** + * ------------------------------------------- + * AMQMethodListener methods + * -------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException { - throw new AMQPException("Error handling connection.close from broker", e); + _lock.lock(); + try + { + _correlationId = evt.getCorrelationId(); + + if (evt.getMethod() instanceof ConnectionStartBody) + { + _connectionStartBody = (ConnectionStartBody) evt.getMethod(); + _connectionNotStarted.signalAll(); + return true; + } + else if (evt.getMethod() instanceof ConnectionSecureBody) + { + _connectionSecureBody = (ConnectionSecureBody) evt.getMethod(); + _connectionNotSecure.signal(); + _connectionNotTuned.signal(); // in case the server has sent another chanllenge + return true; + } + else if (evt.getMethod() instanceof ConnectionTuneBody) + { + _connectionTuneBody = (ConnectionTuneBody) evt.getMethod(); + _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk + _connectionNotTuned.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionOpenOkBody) + { + _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod(); + _connectionNotOpened.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionCloseOkBody) + { + _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod(); + _connectionNotClosed.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionCloseBody) + { + _connectionCloseBody = (ConnectionCloseBody) evt.getMethod(); + // release the correct lock as u may have some conditions waiting. + // while an error occured and the broker has sent a close. + releaseLocks(); + handleClose(); + return true; + } + else + { + return false; + } + } + finally + { + _lock.unlock(); + } } - } - - private void checkIfConnectionClosed()throws AMQPException - { - if (_connectionCloseBody != null) + + + public Phase getPhasePipe() { - String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + - " with reply code (" + _connectionCloseBody.getReplyCode() + ") " + - "caused by class " + _connectionCloseBody.getClassId() + - " and method " + _connectionCloseBody.getMethod(); - - throw new AMQPException(error); + return _phase; } - } - - private void releaseLocks() - { - if(_currentState == AMQPState.CONNECTION_NOT_OPENED) + + private void handleClose() throws AMQPException { - _connectionNotOpened.signal(); + try + { + notifyState(AMQPState.CONNECTION_CLOSING); + _currentState = AMQPState.CONNECTION_CLOSING; + // do the required cleanup and send a ConnectionCloseOkBody + } + catch (Exception e) + { + throw new AMQPException("Error handling connection.close from broker", e); + } } - else if(_currentState == AMQPState.CONNECTION_UNDEFINED) + + private void checkIfConnectionClosed() throws AMQPException { - _connectionNotStarted.signal(); + if (_connectionCloseBody != null) + { + String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + " with reply code (" + + _connectionCloseBody.getReplyCode() + ") " + "caused by class " + _connectionCloseBody.getClassId() + " and method " + + _connectionCloseBody.getMethod(); + + throw new AMQPException(error); + } } - else if(_currentState == AMQPState.CONNECTION_NOT_STARTED) + + private void releaseLocks() { - _connectionNotSecure.signal(); + if (_currentState == AMQPState.CONNECTION_NOT_OPENED) + { + _connectionNotOpened.signal(); + } + else if (_currentState == AMQPState.CONNECTION_UNDEFINED) + { + _connectionNotStarted.signal(); + } + else if (_currentState == AMQPState.CONNECTION_NOT_STARTED) + { + _connectionNotSecure.signal(); + } + else if (_currentState == AMQPState.CONNECTION_NOT_SECURE) + { + _connectionNotTuned.signal(); + } } - else if(_currentState == AMQPState.CONNECTION_NOT_SECURE) + + private void notifyState(AMQPState newState) throws AMQPException { - _connectionNotTuned.signal(); + _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CONNECTION_STATE)); } - } - - public Phase getPhasePipe() - { - return _phase; - } }
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java index e49524ee30..20c4921582 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java @@ -20,23 +20,65 @@ */ package org.apache.qpid.nclient.amqp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; import org.apache.qpid.nclient.amqp.state.AMQPStateListener; import org.apache.qpid.nclient.amqp.state.AMQPStateManager; +import org.apache.qpid.nclient.amqp.state.AMQPStateType; +import org.apache.qpid.nclient.core.AMQPException; public class QpidStateManager implements AMQPStateManager { - public void addListener(AMQPStateListener l) throws AMQException - { - // TODO Auto-generated method stub + private static final Logger _logger = Logger.getLogger(QpidStateManager.class); - } + private Map<AMQPStateType, List<AMQPStateListener>> _listernerMap = new ConcurrentHashMap<AMQPStateType, List<AMQPStateListener>>(); - public void removeListener(AMQPStateListener l) throws AMQException + public void addListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException { - // TODO Auto-generated method stub + List<AMQPStateListener> list; + if(_listernerMap.containsKey(stateType)) + { + list = _listernerMap.get(stateType); + } + else + { + list = new ArrayList<AMQPStateListener>(); + _listernerMap.put(stateType, list); + } + list.add(l); + } + public void removeListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException + { + if(_listernerMap.containsKey(stateType)) + { + List<AMQPStateListener> list = _listernerMap.get(stateType); + list.remove(l); + } + } + + public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException + { + + if(_listernerMap.containsKey(event.getStateType())) + { + List<AMQPStateListener> list = _listernerMap.get(event.getStateType()); + for(AMQPStateListener l: list) + { + l.stateChanged(event); + } + } + else + { + _logger.warn("There are no registered listerners for state type" + event.getStateType()); + } } } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java new file mode 100644 index 0000000000..43f096a062 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java @@ -0,0 +1,19 @@ +package org.apache.qpid.nclient.amqp.sample; + +import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; +import org.apache.qpid.nclient.amqp.state.AMQPStateListener; +import org.apache.qpid.nclient.core.AMQPException; + +public class StateHelper implements AMQPStateListener +{ + + public void stateChanged(AMQPStateChangedEvent event) throws AMQPException + { + String s = event.getStateType() + " changed state from " + + event.getOldState() + " to " + event.getNewState(); + + System.out.println(s); + + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java index 166d565f81..f994a99ec3 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java @@ -65,6 +65,7 @@ import org.apache.qpid.nclient.amqp.AMQPConnection; import org.apache.qpid.nclient.amqp.AMQPExchange; import org.apache.qpid.nclient.amqp.AMQPMessage; import org.apache.qpid.nclient.amqp.AMQPQueue; +import org.apache.qpid.nclient.amqp.state.AMQPStateType; import org.apache.qpid.nclient.transport.AMQPConnectionURL; import org.apache.qpid.nclient.transport.ConnectionURL; import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType; @@ -80,371 +81,371 @@ import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionTy @SuppressWarnings("unused") public class TestClient { - private byte _major; - private byte _minor; - private ConnectionURL _url; - private static int _channel = 2; - // Need a Class factory per connection - private AMQPClassFactory _classFactory = new AMQPClassFactory(); - private int _ticket; - - public AMQPConnection openConnection() throws Exception - { - //_url = new AMQPConnectionURL("amqp://guest:guest@test/localhost?brokerlist='vm://:3'"); - - _url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'"); - return _classFactory.createConnectionClass(_url,ConnectionType.TCP); - } - - public void handleConnectionNegotiation(AMQPConnection con) throws Exception - { - // ConnectionStartBody - ConnectionStartBody connectionStartBody = con.openTCPConnection(); - _major = connectionStartBody.getMajor(); - _minor = connectionStartBody.getMinor(); - - FieldTable clientProperties = FieldTableFactory.newFieldTable(); - clientProperties.put(new AMQShortString(ClientProperties.instance.toString()),"Test"); // setting only the client id - - final String locales = new String(connectionStartBody.getLocales(), "utf8"); - final StringTokenizer tokenizer = new StringTokenizer(locales, " "); - - final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms()); - - SaslClient sc = Sasl.createSaslClient(new String[]{mechanism}, - null, "AMQP", "localhost", - null, SecurityHelper.createCallbackHandler(mechanism,_url)); - - ConnectionStartOkBody connectionStartOkBody = - ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties, - new AMQShortString(tokenizer.nextToken()), - new AMQShortString(mechanism), - (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null)); - // ConnectionSecureBody - AMQMethodBody body = con.startOk(connectionStartOkBody); - ConnectionTuneBody connectionTuneBody; - - if (body instanceof ConnectionSecureBody) + private byte _major; + + private byte _minor; + + private ConnectionURL _url; + + private static int _channel = 2; + + // Need a Class factory per connection + private AMQPClassFactory _classFactory = new AMQPClassFactory(); + + private int _ticket; + + public AMQPConnection openConnection() throws Exception + { + //_url = new AMQPConnectionURL("amqp://guest:guest@test/localhost?brokerlist='vm://:3'"); + + _url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'"); + return _classFactory.createConnectionClass(_url, ConnectionType.TCP); + } + + public void handleConnectionNegotiation(AMQPConnection con) throws Exception + { + StateHelper stateHelper = new StateHelper(); + _classFactory.getStateManager().addListener(AMQPStateType.CONNECTION_STATE, stateHelper); + _classFactory.getStateManager().addListener(AMQPStateType.CHANNEL_STATE, stateHelper); + + //ConnectionStartBody + ConnectionStartBody connectionStartBody = con.openTCPConnection(); + _major = connectionStartBody.getMajor(); + _minor = connectionStartBody.getMinor(); + + FieldTable clientProperties = FieldTableFactory.newFieldTable(); + clientProperties.put(new AMQShortString(ClientProperties.instance.toString()), "Test"); // setting only the client id + + final String locales = new String(connectionStartBody.getLocales(), "utf8"); + final StringTokenizer tokenizer = new StringTokenizer(locales, " "); + + final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms()); + + SaslClient sc = Sasl.createSaslClient(new String[] + { mechanism }, null, "AMQP", "localhost", null, SecurityHelper.createCallbackHandler(mechanism, _url)); + + ConnectionStartOkBody connectionStartOkBody = ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties, new AMQShortString( + tokenizer.nextToken()), new AMQShortString(mechanism), (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null)); + // ConnectionSecureBody + AMQMethodBody body = con.startOk(connectionStartOkBody); + ConnectionTuneBody connectionTuneBody; + + if (body instanceof ConnectionSecureBody) + { + ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody) body; + ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(_major, _minor, sc + .evaluateChallenge(connectionSecureBody.getChallenge())); + //Assuming the server is not going to send another challenge + connectionTuneBody = (ConnectionTuneBody) con.secureOk(connectionSecureOkBody); + + } + else + { + connectionTuneBody = (ConnectionTuneBody) body; + } + + // Using broker supplied values + ConnectionTuneOkBody connectionTuneOkBody = ConnectionTuneOkBody.createMethodBody(_major, _minor, connectionTuneBody.getChannelMax(), + connectionTuneBody.getFrameMax(), connectionTuneBody.getHeartbeat()); + con.tuneOk(connectionTuneOkBody); + + ConnectionOpenBody connectionOpenBody = ConnectionOpenBody.createMethodBody(_major, _minor, null, true, new AMQShortString(_url + .getVirtualHost())); + + ConnectionOpenOkBody connectionOpenOkBody = con.open(connectionOpenBody); + } + + public void handleChannelNegotiation() throws Exception + { + AMQPChannel channel = _classFactory.createChannelClass(_channel); + + ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1")); + ChannelOpenOkBody channelOpenOkBody = channel.open(channelOpenBody); + + //lets have some fun + ChannelFlowBody channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, false); + + ChannelFlowOkBody channelFlowOkBody = channel.flow(channelFlowBody); + System.out.println("Channel is " + (channelFlowOkBody.getActive() ? "active" : "suspend")); + + channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, true); + channelFlowOkBody = channel.flow(channelFlowBody); + System.out.println("Channel is " + (channelFlowOkBody.getActive() ? "active" : "suspend")); + } + + public void createExchange() throws Exception + { + AMQPExchange exchange = _classFactory.createExchangeClass(_channel); + + ExchangeDeclareBody exchangeDeclareBody = ExchangeDeclareBody.createMethodBody(_major, _minor, null, // arguments + false,//auto delete + false,// durable + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME), true, //internal + false,// nowait + false,// passive + _ticket, new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)); + + AMQPCallBack cb = createCallBackWithMessage("Broker has created the exchange"); + exchange.declare(exchangeDeclareBody, cb); + // Blocking for response + while (!cb.isComplete()) + { + } + } + + public void createAndBindQueue() throws Exception + { + AMQPQueue queue = _classFactory.createQueueClass(_channel); + + QueueDeclareBody queueDeclareBody = QueueDeclareBody.createMethodBody(_major, _minor, null, //arguments + false,//auto delete + false,// durable + false, //exclusive, + false, //nowait, + false, //passive, + new AMQShortString("MyTestQueue"), 0); + + AMQPCallBack cb = new AMQPCallBack() + { + + @Override + public void brokerResponded(AMQMethodBody body) + { + QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody) body; + System.out.println("[Broker has created the queue, " + "message count " + queueDeclareOkBody.getMessageCount() + "consumer count " + + queueDeclareOkBody.getConsumerCount() + "]\n"); + } + + @Override + public void brokerRespondedWithError(AMQException e) + { + } + + }; + + queue.declare(queueDeclareBody, cb); + //Blocking for response + while (!cb.isComplete()) + { + } + + QueueBindBody queueBindBody = QueueBindBody.createMethodBody(_major, _minor, null, //arguments + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),//exchange + false, //nowait + new AMQShortString("MyTestQueue"), //queue + new AMQShortString("RH"), //routingKey + 0 //ticket + ); + + cb = createCallBackWithMessage("Broker has bound the queue"); + queue.bind(queueBindBody, cb); + //Blocking for response + while (!cb.isComplete()) + { + } + } + + public void purgeQueue() throws Exception { - ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody)body; - ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody( - _major,_minor,sc.evaluateChallenge(connectionSecureBody.getChallenge())); - //Assuming the server is not going to send another challenge - connectionTuneBody = (ConnectionTuneBody)con.secureOk(connectionSecureOkBody); - + AMQPQueue queue = _classFactory.createQueueClass(_channel); + + QueuePurgeBody queuePurgeBody = QueuePurgeBody.createMethodBody(_major, _minor, false, //nowait + new AMQShortString("MyTestQueue"), //queue + 0 //ticket + ); + + AMQPCallBack cb = new AMQPCallBack() + { + + @Override + public void brokerResponded(AMQMethodBody body) + { + QueuePurgeOkBody queuePurgeOkBody = (QueuePurgeOkBody) body; + System.out.println("[Broker has purged the queue, message count " + queuePurgeOkBody.getMessageCount() + "]\n"); + } + + @Override + public void brokerRespondedWithError(AMQException e) + { + } + + }; + + queue.purge(queuePurgeBody, cb); + //Blocking for response + while (!cb.isComplete()) + { + } + } - else + + public void deleteQueue() throws Exception { - connectionTuneBody = (ConnectionTuneBody)body; + AMQPQueue queue = _classFactory.createQueueClass(_channel); + + QueueDeleteBody queueDeleteBody = QueueDeleteBody.createMethodBody(_major, _minor, false, //ifEmpty + false, //ifUnused + false, //nowait + new AMQShortString("MyTestQueue"), //queue + 0 //ticket + ); + + AMQPCallBack cb = new AMQPCallBack() + { + + @Override + public void brokerResponded(AMQMethodBody body) + { + QueueDeleteOkBody queueDeleteOkBody = (QueueDeleteOkBody) body; + System.out.println("[Broker has deleted the queue, message count " + queueDeleteOkBody.getMessageCount() + "]\n"); + } + + @Override + public void brokerRespondedWithError(AMQException e) + { + } + + }; + + queue.delete(queueDeleteBody, cb); + //Blocking for response + while (!cb.isComplete()) + { + } + } - - - // Using broker supplied values - ConnectionTuneOkBody connectionTuneOkBody = - ConnectionTuneOkBody.createMethodBody(_major,_minor, - connectionTuneBody.getChannelMax(), - connectionTuneBody.getFrameMax(), - connectionTuneBody.getHeartbeat()); - con.tuneOk(connectionTuneOkBody); - - ConnectionOpenBody connectionOpenBody = - ConnectionOpenBody.createMethodBody(_major,_minor,null, true,new AMQShortString(_url.getVirtualHost())); - - ConnectionOpenOkBody connectionOpenOkBody = con.open(connectionOpenBody); - } - - public void handleChannelNegotiation() throws Exception - { - AMQPChannel channel = _classFactory.createChannelClass(_channel); - - ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1")); - ChannelOpenOkBody channelOpenOkBody = channel.open(channelOpenBody); - - //lets have some fun - ChannelFlowBody channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, false); - - ChannelFlowOkBody channelFlowOkBody = channel.flow(channelFlowBody); - System.out.println("Channel is " + (channelFlowOkBody.getActive()? "active" : "suspend")); - - channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, true); - channelFlowOkBody = channel.flow(channelFlowBody); - System.out.println("Channel is " + (channelFlowOkBody.getActive()? "active" : "suspend")); - } - - public void createExchange() throws Exception - { - AMQPExchange exchange = _classFactory.createExchangeClass(_channel); - - ExchangeDeclareBody exchangeDeclareBody = - ExchangeDeclareBody.createMethodBody(_major, _minor, - null, // arguments - false,//auto delete - false,// durable - new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME), - true, //internal - false,// nowait - false,// passive - _ticket, - new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)); - - AMQPCallBack cb = createCallBackWithMessage("Broker has created the exchange"); - exchange.declare(exchangeDeclareBody, cb); - // Blocking for response - while (!cb.isComplete()){} - } - - - public void createAndBindQueue()throws Exception - { - AMQPQueue queue = _classFactory.createQueueClass(_channel); - - QueueDeclareBody queueDeclareBody = - QueueDeclareBody.createMethodBody(_major, _minor, - null, //arguments - false,//auto delete - false,// durable - false, //exclusive, - false, //nowait, - false, //passive, - new AMQShortString("MyTestQueue"), - 0); - - AMQPCallBack cb = new AMQPCallBack(){ - - @Override - public void brokerResponded(AMQMethodBody body) - { - QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody)body; - System.out.println("[Broker has created the queue, " + - "message count " + queueDeclareOkBody.getMessageCount() + - "consumer count " + queueDeclareOkBody.getConsumerCount() + "]\n"); - } - - @Override - public void brokerRespondedWithError(AMQException e) - { - } - - }; - - queue.declare(queueDeclareBody, cb); - //Blocking for response - while (!cb.isComplete()){} - - QueueBindBody queueBindBody = - QueueBindBody.createMethodBody(_major, _minor, - null, //arguments - new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),//exchange - false, //nowait - new AMQShortString("MyTestQueue"), //queue - new AMQShortString("RH"), //routingKey - 0 //ticket - ); - - cb = createCallBackWithMessage("Broker has bound the queue"); - queue.bind(queueBindBody, cb); - //Blocking for response - while (!cb.isComplete()){} - } - - public void purgeQueue()throws Exception - { - AMQPQueue queue = _classFactory.createQueueClass(_channel); - - QueuePurgeBody queuePurgeBody = - QueuePurgeBody.createMethodBody(_major, _minor, - false, //nowait - new AMQShortString("MyTestQueue"), //queue - 0 //ticket - ); - - AMQPCallBack cb = new AMQPCallBack(){ - - @Override - public void brokerResponded(AMQMethodBody body) - { - QueuePurgeOkBody queuePurgeOkBody = (QueuePurgeOkBody)body; - System.out.println("[Broker has purged the queue, message count " + queuePurgeOkBody.getMessageCount() + "]\n"); - } - - @Override - public void brokerRespondedWithError(AMQException e) - { - } - - }; - - queue.purge(queuePurgeBody, cb); - //Blocking for response - while (!cb.isComplete()){} - - } - - public void deleteQueue()throws Exception - { - AMQPQueue queue = _classFactory.createQueueClass(_channel); - - QueueDeleteBody queueDeleteBody = - QueueDeleteBody.createMethodBody(_major, _minor, - false, //ifEmpty - false, //ifUnused - false, //nowait - new AMQShortString("MyTestQueue"), //queue - 0 //ticket - ); - - AMQPCallBack cb = new AMQPCallBack(){ - - @Override - public void brokerResponded(AMQMethodBody body) - { - QueueDeleteOkBody queueDeleteOkBody = (QueueDeleteOkBody)body; - System.out.println("[Broker has deleted the queue, message count " + queueDeleteOkBody.getMessageCount() + "]\n"); - } - - @Override - public void brokerRespondedWithError(AMQException e) - { - } - - }; - - queue.delete(queueDeleteBody, cb); - //Blocking for response - while (!cb.isComplete()){} - - } - - public void publishAndSubscribe() throws Exception - { - AMQPMessage message = _classFactory.createMessageClass(_channel,new MessageHelper()); - MessageConsumeBody messageConsumeBody = MessageConsumeBody.createMethodBody(_major, _minor, - new AMQShortString("myClient"),// destination - false, //exclusive - null, //filter - false, //noAck, - false, //noLocal, - new AMQShortString("MyTestQueue"), //queue - 0 //ticket - ); - - AMQPCallBack cb = createCallBackWithMessage("Broker has accepted the consume"); - message.consume(messageConsumeBody, cb); - //Blocking for response - while (!cb.isComplete()){} - - // Sending 5 messages serially - for (int i=0; i<5; i++) + + public void publishAndSubscribe() throws Exception { - cb = createCallBackWithMessage("Broker has accepted msg " + i); - message.transfer(createMessages("Test" + i),cb); - while (!cb.isComplete()){} + AMQPMessage message = _classFactory.createMessageClass(_channel, new MessageHelper()); + MessageConsumeBody messageConsumeBody = MessageConsumeBody.createMethodBody(_major, _minor, new AMQShortString("myClient"),// destination + false, //exclusive + null, //filter + false, //noAck, + false, //noLocal, + new AMQShortString("MyTestQueue"), //queue + 0 //ticket + ); + + AMQPCallBack cb = createCallBackWithMessage("Broker has accepted the consume"); + message.consume(messageConsumeBody, cb); + //Blocking for response + while (!cb.isComplete()) + { + } + + // Sending 5 messages serially + for (int i = 0; i < 5; i++) + { + cb = createCallBackWithMessage("Broker has accepted msg " + i); + message.transfer(createMessages("Test" + i), cb); + while (!cb.isComplete()) + { + } + } + + MessageCancelBody messageCancelBody = MessageCancelBody.createMethodBody(_major, _minor, new AMQShortString("myClient")); + + AMQPCallBack cb2 = createCallBackWithMessage("Broker has accepted the consume cancel"); + message.cancel(messageCancelBody, cb2); + } - - MessageCancelBody messageCancelBody = MessageCancelBody.createMethodBody(_major, _minor, new AMQShortString("myClient")); - - AMQPCallBack cb2 = createCallBackWithMessage("Broker has accepted the consume cancel"); - message.cancel(messageCancelBody, cb2); - - } - - private MessageTransferBody createMessages(String content) throws Exception - { - FieldTable headers = FieldTableFactory.newFieldTable(); - headers.setAsciiString(new AMQShortString("Test"), System.currentTimeMillis() + ""); - - MessageTransferBody messageTransferBody = - MessageTransferBody.createMethodBody(_major, _minor, - new AMQShortString("testApp"), //appId - headers, //applicationHeaders - new Content(Content.TypeEnum.INLINE_T,content.getBytes()), //body - new AMQShortString(""), //contentEncoding, - new AMQShortString("text/plain"), //contentType - new AMQShortString("testApp"), //correlationId - (short)1, //deliveryMode non persistant - new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination - new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange - 0l, //expiration - false, //immediate - false, //mandatory - new AMQShortString(UUID.randomUUID().toString()), //messageId - (short)0, //priority - false, //redelivered - new AMQShortString("RH"), //replyTo - new AMQShortString("RH"), //routingKey, - "abc".getBytes(), //securityToken - 0, //ticket - System.currentTimeMillis(), //timestamp - new AMQShortString(""), //transactionId - 0l, //ttl, - new AMQShortString("Hello") //userId - ); - - return messageTransferBody; - - } - - public void publishAndGet() throws Exception - { - AMQPMessage message = _classFactory.createMessageClass(_channel,new MessageHelper()); - AMQPCallBack cb = createCallBackWithMessage("Broker has accepted msg 5"); - - MessageGetBody messageGetBody = - MessageGetBody.createMethodBody(_major, _minor, - new AMQShortString("myClient"), - false, //noAck - new AMQShortString("MyTestQueue"), //queue - 0 //ticket - ); - - //AMQPMessage message = _classFactory.createMessage(_channel,new MessageHelper()); - message.transfer(createMessages("Test"),cb); - while(!cb.isComplete()){} - - cb = createCallBackWithMessage("Broker has accepted get"); - message.get(messageGetBody, cb); - } - - // Creates a gneric call back and prints the given message - private AMQPCallBack createCallBackWithMessage(final String msg) - { - AMQPCallBack cb = new AMQPCallBack(){ - - @Override - public void brokerResponded(AMQMethodBody body) - { - System.out.println(msg); - } - - @Override - public void brokerRespondedWithError(AMQException e) - { - } - - }; - - return cb; - } - - public static void main(String[] args) - { - TestClient test = new TestClient(); - try + + private MessageTransferBody createMessages(String content) throws Exception { - AMQPConnection con = test.openConnection(); - test.handleConnectionNegotiation(con); - test.handleChannelNegotiation(); - test.createExchange(); - test.createAndBindQueue(); - test.publishAndSubscribe(); - test.purgeQueue(); - test.publishAndGet(); - test.deleteQueue(); + FieldTable headers = FieldTableFactory.newFieldTable(); + headers.setAsciiString(new AMQShortString("Test"), System.currentTimeMillis() + ""); + + MessageTransferBody messageTransferBody = MessageTransferBody.createMethodBody(_major, _minor, new AMQShortString("testApp"), //appId + headers, //applicationHeaders + new Content(Content.TypeEnum.INLINE_T, content.getBytes()), //body + new AMQShortString(""), //contentEncoding, + new AMQShortString("text/plain"), //contentType + new AMQShortString("testApp"), //correlationId + (short) 1, //deliveryMode non persistant + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange + 0l, //expiration + false, //immediate + false, //mandatory + new AMQShortString(UUID.randomUUID().toString()), //messageId + (short) 0, //priority + false, //redelivered + new AMQShortString("RH"), //replyTo + new AMQShortString("RH"), //routingKey, + "abc".getBytes(), //securityToken + 0, //ticket + System.currentTimeMillis(), //timestamp + new AMQShortString(""), //transactionId + 0l, //ttl, + new AMQShortString("Hello") //userId + ); + + return messageTransferBody; + } - catch (Exception e) + + public void publishAndGet() throws Exception + { + AMQPMessage message = _classFactory.createMessageClass(_channel, new MessageHelper()); + AMQPCallBack cb = createCallBackWithMessage("Broker has accepted msg 5"); + + MessageGetBody messageGetBody = MessageGetBody.createMethodBody(_major, _minor, new AMQShortString("myClient"), false, //noAck + new AMQShortString("MyTestQueue"), //queue + 0 //ticket + ); + + //AMQPMessage message = _classFactory.createMessage(_channel,new MessageHelper()); + message.transfer(createMessages("Test"), cb); + while (!cb.isComplete()) + { + } + + cb = createCallBackWithMessage("Broker has accepted get"); + message.get(messageGetBody, cb); + } + + // Creates a gneric call back and prints the given message + private AMQPCallBack createCallBackWithMessage(final String msg) + { + AMQPCallBack cb = new AMQPCallBack() + { + + @Override + public void brokerResponded(AMQMethodBody body) + { + System.out.println(msg); + } + + @Override + public void brokerRespondedWithError(AMQException e) + { + } + + }; + + return cb; + } + + public static void main(String[] args) { - e.printStackTrace(); + TestClient test = new TestClient(); + try + { + AMQPConnection con = test.openConnection(); + test.handleConnectionNegotiation(con); + test.handleChannelNegotiation(); + test.createExchange(); + test.createAndBindQueue(); + test.publishAndSubscribe(); + test.purgeQueue(); + test.publishAndGet(); + test.deleteQueue(); + } + catch (Exception e) + { + e.printStackTrace(); + } } - } } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java index 3555704c4f..061ec5a849 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java @@ -38,7 +38,8 @@ public class AMQPState public String toString() { - return "AMQState: id = " + _id + " name: " + _name; + //return "AMQState: id = " + _id + " name: " + _name; + return _name; // looks better with loggin } // Connection state diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java new file mode 100644 index 0000000000..506d267a9e --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.state; + +public class AMQPStateChangedEvent +{ + private AMQPState _oldState; + + private AMQPState _newState; + + private AMQPStateType _stateType; + + public AMQPStateChangedEvent(AMQPState oldState, AMQPState newState, AMQPStateType stateType) + { + _oldState = oldState; + _newState = newState; + _stateType = stateType; + } + + public AMQPState getNewState() + { + return _newState; + } + + public void setNewState(AMQPState newState) + { + this._newState = newState; + } + + public AMQPState getOldState() + { + return _oldState; + } + + public void setOldState(AMQPState oldState) + { + this._oldState = oldState; + } + + public AMQPStateType getStateType() + { + return _stateType; + } + + public void setStateType(AMQPStateType stateType) + { + this._stateType = stateType; + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java index 67f854caf9..974f707504 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java @@ -4,5 +4,5 @@ import org.apache.qpid.nclient.core.AMQPException; public interface AMQPStateListener { - public void stateChanged(AMQPState oldState, AMQPState newState) throws AMQPException; + public void stateChanged(AMQPStateChangedEvent event) throws AMQPException; } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java index 2956a19e66..9bc60b658e 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java @@ -1,11 +1,13 @@ package org.apache.qpid.nclient.amqp.state; import org.apache.qpid.AMQException; +import org.apache.qpid.nclient.core.AMQPException; public interface AMQPStateManager { - - public void addListener(AMQPStateListener l)throws AMQException; + public void addListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException; + + public void removeListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException; - public void removeListener(AMQPStateListener l)throws AMQException; + public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException; }
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java index cbae4aafee..e190d639f2 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java @@ -40,7 +40,7 @@ public class AMQPStateType public String toString() { - return "AMQState: id = " + _typeId + " name: " + _typeName; + return _typeName; } // Connection state diff --git a/java/pom.xml b/java/pom.xml index 2150e61861..62a49c2c40 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -130,6 +130,7 @@ <module>common</module> <module>broker</module> <module>client</module> + <module>newclient</module> <module>cluster</module> <module>systests</module> <module>perftests</module> @@ -382,7 +383,7 @@ <dependency> <groupId>commons-configuration</groupId> <artifactId>commons-configuration</artifactId> - <version>1.2</version> + <version>1.3</version> </dependency> <dependency> <groupId>commons-lang</groupId> @@ -475,6 +476,11 @@ </dependency> <dependency> <groupId>org.apache.qpid</groupId> + <artifactId>qpid-newclient</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> <artifactId>qpid-broker</artifactId> <version>${project.version}</version> </dependency> |