summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-03-30 15:53:18 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-03-30 15:53:18 +0000
commitf50a093a9423f12b69d82996ec432b9198f90f27 (patch)
treea6f8530babee103ac60a42bb6a39e7f1fae87e66
parent85c15e0200a785a7879f10f12aaa6f96d964553e (diff)
downloadqpid-python-f50a093a9423f12b69d82996ec432b9198f90f27.tar.gz
added state support to the new client and modified the example to illustrate it
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@524144 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java17
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java7
-rw-r--r--java/newclient/.project5
-rw-r--r--java/newclient/pom.xml6
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java513
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java340
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java644
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java54
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java19
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java713
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java3
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java68
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java2
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java8
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java2
-rw-r--r--java/pom.xml8
20 files changed, 1303 insertions, 1149 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
index 5e566a5fe8..ec52c252b0 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -24,10 +24,6 @@ import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
-import java.util.HashMap;
-import java.util.Map;
public class AMQDataBlockDecoder
{
@@ -68,13 +64,11 @@ public class AMQDataBlockDecoder
BodyFactory bodyFactory;
if (type == AMQRequestBody.TYPE)
{
- AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
- bodyFactory = new AMQRequestBodyFactory(protocolSession);
+ bodyFactory = new AMQRequestBodyFactory();
}
else if (type == AMQResponseBody.TYPE)
{
- AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
- bodyFactory = new AMQResponseBodyFactory(protocolSession);
+ bodyFactory = new AMQResponseBodyFactory();
}
else
{
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
index 6067e2fce5..e6395e5a28 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
@@ -22,21 +22,19 @@ package org.apache.qpid.framing;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-public class AMQMethodBodyFactory implements BodyFactory
+public class AMQMethodBodyFactory implements BodyFactory, ProtocolVersionList
{
private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class);
- private final AMQVersionAwareProtocolSession _protocolSession;
+ VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]);
- public AMQMethodBodyFactory(AMQVersionAwareProtocolSession protocolSession)
+ public AMQMethodBodyFactory()
{
- _protocolSession = protocolSession;
}
public AMQMethodBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
- return _protocolSession.getRegistry().get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize);
+ return _registry.get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
index 3bc16601b6..04d8c99af3 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
@@ -21,7 +21,6 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class AMQRequestBody extends AMQBody
{
@@ -31,12 +30,10 @@ public class AMQRequestBody extends AMQBody
protected long requestId;
protected long responseMark;
protected AMQMethodBody methodPayload;
- protected AMQVersionAwareProtocolSession protocolSession;
-
+
// Constructor
- public AMQRequestBody(AMQVersionAwareProtocolSession protocolSession)
+ public AMQRequestBody()
{
- this.protocolSession = protocolSession;
}
public AMQRequestBody(long requestId, long responseMark,
@@ -45,7 +42,6 @@ public class AMQRequestBody extends AMQBody
this.requestId = requestId;
this.responseMark = responseMark;
this.methodPayload = methodPayload;
- protocolSession = null;
}
@@ -75,17 +71,12 @@ public class AMQRequestBody extends AMQBody
protected void populateFromBuffer(ByteBuffer buffer, long size)
throws AMQFrameDecodingException
- {
- if (protocolSession == null)
- {
- throw new AMQFrameDecodingException("Cannot call populateFromBuffer() without using correct constructor.");
- }
-
+ {
requestId = EncodingUtils.readLong(buffer);
responseMark = EncodingUtils.readLong(buffer);
int reserved = EncodingUtils.readInteger(buffer); // reserved, throw away
- AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory(protocolSession);
+ AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory();
methodPayload = methodBodyFactory.createBody(buffer, size);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java
index 9d47ccd68e..a11e8587e8 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java
@@ -21,20 +21,16 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class AMQRequestBodyFactory implements BodyFactory
-{
- private final AMQVersionAwareProtocolSession protocolSession;
-
- public AMQRequestBodyFactory(AMQVersionAwareProtocolSession protocolSession)
+{
+ public AMQRequestBodyFactory()
{
- this.protocolSession = protocolSession;
}
public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
- AMQRequestBody rb = new AMQRequestBody(protocolSession);
+ AMQRequestBody rb = new AMQRequestBody();
rb.populateFromBuffer(in, bodySize);
return rb;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
index 7b35aaeb86..c27a76d922 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
@@ -21,7 +21,6 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class AMQResponseBody extends AMQBody
{
@@ -32,12 +31,10 @@ public class AMQResponseBody extends AMQBody
protected long requestId;
protected int batchOffset;
protected AMQMethodBody methodPayload;
- protected AMQVersionAwareProtocolSession protocolSession;
// Constructor
- public AMQResponseBody(AMQVersionAwareProtocolSession protocolSession)
+ public AMQResponseBody()
{
- this.protocolSession = protocolSession;
}
public AMQResponseBody(long responseId, long requestId,
@@ -47,7 +44,6 @@ public class AMQResponseBody extends AMQBody
this.requestId = requestId;
this.batchOffset = batchOffset;
this.methodPayload = methodPayload;
- protocolSession = null;
}
// Field methods
@@ -77,16 +73,13 @@ public class AMQResponseBody extends AMQBody
protected void populateFromBuffer(ByteBuffer buffer, long size)
throws AMQFrameDecodingException
- {
- if (protocolSession == null)
- throw new AMQFrameDecodingException("Cannot call populateFromBuffer() without using correct constructor.");
-
+ {
responseId = EncodingUtils.readLong(buffer);
requestId = EncodingUtils.readLong(buffer);
// XXX
batchOffset = EncodingUtils.readInteger(buffer);
- AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory(protocolSession);
+ AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory();
methodPayload = methodBodyFactory.createBody(buffer, size);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java
index 4209aad11f..bf94afbe17 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java
@@ -25,16 +25,13 @@ import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class AMQResponseBodyFactory implements BodyFactory
{
- private final AMQVersionAwareProtocolSession protocolSession;
-
- public AMQResponseBodyFactory(AMQVersionAwareProtocolSession protocolSession)
+ public AMQResponseBodyFactory()
{
- this.protocolSession = protocolSession;
}
public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
- AMQResponseBody rb = new AMQResponseBody(protocolSession);
+ AMQResponseBody rb = new AMQResponseBody();
rb.populateFromBuffer(in, bodySize);
return rb;
}
diff --git a/java/newclient/.project b/java/newclient/.project
index f1c93ad7a9..4d42311982 100644
--- a/java/newclient/.project
+++ b/java/newclient/.project
@@ -1,7 +1,10 @@
<projectDescription>
<name>qpid-newclient</name>
<comment/>
- <projects/>
+ <projects>
+ <project>qpid-broker</project>
+ <project>qpid-common</project>
+ </projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
diff --git a/java/newclient/pom.xml b/java/newclient/pom.xml
index d2d27f3fd7..5a82feb1d2 100644
--- a/java/newclient/pom.xml
+++ b/java/newclient/pom.xml
@@ -58,6 +58,12 @@
</dependency>
<dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ <version>1.3</version>
+ </dependency>
+
+ <dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
index 7ba9120415..e480f38b93 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
@@ -40,7 +40,10 @@ import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
@@ -58,295 +61,309 @@ import org.apache.qpid.nclient.util.AMQPValidator;
public class AMQPChannel extends AMQPStateMachine implements AMQPMethodListener
{
- private static final Logger _logger = Logger.getLogger(AMQPChannel.class);
+ private static final Logger _logger = Logger.getLogger(AMQPChannel.class);
- // the channelId assigned for this channel
- private int _channelId;
+ // the channelId assigned for this channel
+ private int _channelId;
- private Phase _phase;
+ private Phase _phase;
- private AMQPState _currentState;
+ private AMQPState _currentState;
- private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND };
+ private AMQPStateManager _stateManager;
- private final AMQPState[] _validResumeStates = new AMQPState[] { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED };
+ private final AMQPState[] _validCloseStates = new AMQPState[]
+ { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND };
- // The wait period until a server sends a respond
- private long _serverTimeOut = 1000;
+ private final AMQPState[] _validResumeStates = new AMQPState[]
+ { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED };
- private final Lock _lock = new ReentrantLock();
+ // The wait period until a server sends a respond
+ private long _serverTimeOut = 1000;
- private final Condition _channelNotOpend = _lock.newCondition();
+ private final Lock _lock = new ReentrantLock();
- private final Condition _channelNotClosed = _lock.newCondition();
+ private final Condition _channelNotOpend = _lock.newCondition();
- private final Condition _channelFlowNotResponded = _lock.newCondition();
+ private final Condition _channelNotClosed = _lock.newCondition();
- private final Condition _channelNotResumed = _lock.newCondition();
+ private final Condition _channelFlowNotResponded = _lock.newCondition();
- private ChannelOpenOkBody _channelOpenOkBody;
+ private final Condition _channelNotResumed = _lock.newCondition();
- private ChannelCloseOkBody _channelCloseOkBody;
+ private ChannelOpenOkBody _channelOpenOkBody;
- private ChannelFlowOkBody _channelFlowOkBody;
+ private ChannelCloseOkBody _channelCloseOkBody;
- private ChannelOkBody _channelOkBody;
-
- private ChannelCloseBody _channelCloseBody;
+ private ChannelFlowOkBody _channelFlowOkBody;
- protected AMQPChannel(int channelId, Phase phase)
- {
- _channelId = channelId;
- _phase = phase;
- _currentState = AMQPState.CHANNEL_NOT_OPENED;
- _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
- }
+ private ChannelOkBody _channelOkBody;
- /**
- * -------------------------------------------
- * API Methods
- * --------------------------------------------
- */
+ private ChannelCloseBody _channelCloseBody;
- /**
- * Opens the channel
- */
- public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException
- {
- _lock.lock();
- try
+ protected AMQPChannel(int channelId, Phase phase, AMQPStateManager stateManager)
{
- _channelOpenOkBody = null;
- checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _channelNotOpend.await();
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
- _currentState = AMQPState.CHANNEL_OPENED;
- return _channelOpenOkBody;
+ _channelId = channelId;
+ _phase = phase;
+ _stateManager = stateManager;
+ _currentState = AMQPState.CHANNEL_NOT_OPENED;
+ _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
}
- catch (Exception e)
- {
- throw new AMQPException("Error in channel.open", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * Close the channel
- */
- public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _channelCloseOkBody = null;
- checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _channelNotClosed.await();
- AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time");
- _currentState = AMQPState.CHANNEL_CLOSED;
- return _channelCloseOkBody;
- }
- catch (Exception e)
- {
- throw new AMQPException("Error in channel.close", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * Channel Flow
- */
- public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _channelFlowOkBody = null;
- if (channelFlowBody.active)
- {
- checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED);
- }
- else
- {
- checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND);
- }
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _channelFlowNotResponded.await();
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
- handleChannelFlowState(_channelFlowOkBody.active);
- return _channelFlowOkBody;
- }
- catch (Exception e)
- {
- throw new AMQPException("Error in channel.flow", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * Close the channel
- */
- public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException
- {
- _lock.lock();
- try
+
+ /**
+ * -------------------------------------------
+ * API Methods
+ * --------------------------------------------
+ */
+
+ /**
+ * Opens the channel
+ */
+ public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException
{
- _channelOkBody = null;
- checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _channelNotResumed.await();
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_channelOkBody,
- "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
- _currentState = AMQPState.CHANNEL_OPENED;
- return _channelOkBody;
+ _lock.lock();
+ try
+ {
+ _channelOpenOkBody = null;
+ checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelNotOpend.await();
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
+ notifyState(AMQPState.CHANNEL_OPENED);
+ _currentState = AMQPState.CHANNEL_OPENED;
+ return _channelOpenOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.open", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- catch (Exception e)
+
+ /**
+ * Close the channel
+ */
+ public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException
{
- throw new AMQPException("Error in channel.resume", e);
+ _lock.lock();
+ try
+ {
+ _channelCloseOkBody = null;
+ checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelNotClosed.await();
+ AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time");
+ notifyState(AMQPState.CHANNEL_CLOSED);
+ _currentState = AMQPState.CHANNEL_CLOSED;
+ return _channelCloseOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.close", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- finally
+
+ /**
+ * Channel Flow
+ */
+ public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException
{
- _lock.unlock();
+ _lock.lock();
+ try
+ {
+ _channelFlowOkBody = null;
+ if (channelFlowBody.active)
+ {
+ checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED);
+ }
+ else
+ {
+ checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND);
+ }
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelFlowNotResponded.await();
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
+ handleChannelFlowState(_channelFlowOkBody.active);
+ return _channelFlowOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.flow", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- }
-
- /**
- * -------------------------------------------
- * AMQPMethodListener methods
- * --------------------------------------------
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- _lock.lock();
- try
+
+ /**
+ * Close the channel
+ */
+ public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException
{
- if (evt.getMethod() instanceof ChannelOpenOkBody)
- {
- _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod();
- _channelNotOpend.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ChannelCloseOkBody)
- {
- _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod();
- _channelNotClosed.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ChannelCloseBody)
- {
- _channelCloseBody = (ChannelCloseBody)evt.getMethod();
- // release the correct lock as u may have some conditions waiting.
- // while an error occured and the broker has sent a close.
- releaseLocks();
- handleChannelClose(_channelCloseBody);
- return true;
- }
- else if (evt.getMethod() instanceof ChannelFlowOkBody)
- {
- _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod();
- _channelFlowNotResponded.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ChannelFlowBody)
- {
- handleChannelFlow((ChannelFlowBody) evt.getMethod());
- return true;
- }
- else if (evt.getMethod() instanceof ChannelOkBody)
- {
- _channelOkBody = (ChannelOkBody) evt.getMethod();
- // In this case the only method expecting channel-ok is channel-resume
- // haven't implemented ping and pong.
- _channelNotResumed.signal();
- return true;
- }
- else
- {
- return false;
- }
+ _lock.lock();
+ try
+ {
+ _channelOkBody = null;
+ checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelNotResumed.await();
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_channelOkBody,
+ "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
+ notifyState(AMQPState.CHANNEL_OPENED);
+ _currentState = AMQPState.CHANNEL_OPENED;
+ return _channelOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.resume", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- finally
+
+ /**
+ * -------------------------------------------
+ * AMQPMethodListener methods
+ * --------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
{
- _lock.unlock();
+ _lock.lock();
+ try
+ {
+ if (evt.getMethod() instanceof ChannelOpenOkBody)
+ {
+ _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod();
+ _channelNotOpend.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelCloseOkBody)
+ {
+ _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod();
+ _channelNotClosed.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelCloseBody)
+ {
+ _channelCloseBody = (ChannelCloseBody) evt.getMethod();
+ // release the correct lock as u may have some conditions waiting.
+ // while an error occured and the broker has sent a close.
+ releaseLocks();
+ handleChannelClose(_channelCloseBody);
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelFlowOkBody)
+ {
+ _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod();
+ _channelFlowNotResponded.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelFlowBody)
+ {
+ handleChannelFlow((ChannelFlowBody) evt.getMethod());
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelOkBody)
+ {
+ _channelOkBody = (ChannelOkBody) evt.getMethod();
+ // In this case the only method expecting channel-ok is channel-resume
+ // haven't implemented ping and pong.
+ _channelNotResumed.signal();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- }
-
- private void handleChannelClose(ChannelCloseBody channelCloseBody)
- {
- _currentState = AMQPState.CHANNEL_CLOSED;
- // handle channel related cleanup
- }
-
- private void releaseLocks()
- {
- if(_currentState == AMQPState.CHANNEL_NOT_OPENED)
+
+ private void handleChannelClose(ChannelCloseBody channelCloseBody) throws AMQPException
{
- _channelNotOpend.signal();
- _channelNotResumed.signal(); // It could be a channel.resume call
+ notifyState(AMQPState.CHANNEL_CLOSED);
+ _currentState = AMQPState.CHANNEL_CLOSED;
+ // handle channel related cleanup
}
- else if(_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND)
+
+ private void releaseLocks()
{
- _channelFlowNotResponded.signal();
+ if (_currentState == AMQPState.CHANNEL_NOT_OPENED)
+ {
+ _channelNotOpend.signal();
+ _channelNotResumed.signal(); // It could be a channel.resume call
+ }
+ else if (_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND)
+ {
+ _channelFlowNotResponded.signal();
+ }
+ else if (_currentState == AMQPState.CHANNEL_CLOSED)
+ {
+ _channelNotResumed.signal();
+ }
}
- else if(_currentState == AMQPState.CHANNEL_CLOSED)
+
+ private void checkIfConnectionClosed() throws AMQPException
{
- _channelNotResumed.signal();
+ if (_channelCloseBody != null)
+ {
+ String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() + " with reply code ("
+ + _channelCloseBody.getReplyCode() + ") " + "caused by class " + _channelCloseBody.getClassId() + " and method "
+ + _channelCloseBody.getMethod();
+
+ throw new AMQPException(error);
+ }
}
- }
-
- private void checkIfConnectionClosed()throws AMQPException
- {
- if (_channelCloseBody != null)
+
+ private void handleChannelFlow(ChannelFlowBody channelFlowBody)throws AMQPException
{
- String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() +
- " with reply code (" + _channelCloseBody.getReplyCode() + ") " +
- "caused by class " + _channelCloseBody.getClassId() +
- " and method " + _channelCloseBody.getMethod();
-
- throw new AMQPException(error);
+ _lock.lock();
+ try
+ {
+ handleChannelFlowState(channelFlowBody.active);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- }
- private void handleChannelFlow(ChannelFlowBody channelFlowBody)
- {
- _lock.lock();
- try
+ private void handleChannelFlowState(boolean flow)throws AMQPException
{
- handleChannelFlowState(channelFlowBody.active);
+ notifyState((flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND);
+ _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND;
}
- finally
+
+ private void notifyState(AMQPState newState) throws AMQPException
{
- _lock.unlock();
+ _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE));
}
- }
-
- private void handleChannelFlowState(boolean flow)
- {
- _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND;
- }
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
index a3f9b2c24d..f90a5231a2 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
@@ -78,178 +78,178 @@ import org.apache.qpid.url.URLSyntaxException;
*/
public class AMQPClassFactory
{
- //Need an event manager per connection
- private AMQPEventManager _eventManager = new QpidEventManager();
-
- // Need a state manager per connection
- private AMQPStateManager _stateManager = new QpidStateManager();
-
- //Need a phase pipe per connection
- private Phase _phase;
-
- //One instance per connection
- private AMQPConnection _amqpConnection;
-
- public AMQPClassFactory()
- {
-
- }
-
- public AMQPConnection createConnection(String urlStr,ConnectionType type)throws AMQPException, URLSyntaxException
- {
- AMQPConnectionURL url = new AMQPConnectionURL(urlStr);
- return createConnectionClass(url,type);
- }
-
- public AMQPConnection createConnectionClass(ConnectionURL url,ConnectionType type)throws AMQPException
- {
- if (_amqpConnection == null)
+ //Need an event manager per connection
+ private AMQPEventManager _eventManager = new QpidEventManager();
+
+ // Need a state manager per connection
+ private AMQPStateManager _stateManager = new QpidStateManager();
+
+ //Need a phase pipe per connection
+ private Phase _phase;
+
+ //One instance per connection
+ private AMQPConnection _amqpConnection;
+
+ public AMQPClassFactory()
+ {
+
+ }
+
+ public AMQPConnection createConnection(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException
+ {
+ AMQPConnectionURL url = new AMQPConnectionURL(urlStr);
+ return createConnectionClass(url, type);
+ }
+
+ public AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException
{
- PhaseContext ctx = new DefaultPhaseContext();
- ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager);
-
- TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type,ctx);
- _amqpConnection = new AMQPConnection(conn);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class,_amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class,_amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class,_amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class,_amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class,_amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class,_amqpConnection);
+ if (_amqpConnection == null)
+ {
+ PhaseContext ctx = new DefaultPhaseContext();
+ ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager);
+
+ TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type, ctx);
+ _amqpConnection = new AMQPConnection(conn, _stateManager);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection);
+ }
+ return _amqpConnection;
}
- return _amqpConnection;
- }
-
- public AMQPChannel createChannelClass(int channel)throws AMQPException
- {
- checkIfConnectionStarted();
- AMQPChannel amqpChannel = new AMQPChannel(channel,_phase);
- _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class,amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelCloseBody.class,amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class,amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelFlowBody.class,amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class,amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelOkBody.class,amqpChannel);
- return amqpChannel;
- }
-
- public void destroyChannelClass(int channel,AMQPChannel amqpChannel)throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class,amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class,amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class,amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class,amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class,amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelOkBody.class,amqpChannel);
- }
-
- public AMQPExchange createExchangeClass(int channel)throws AMQPException
- {
- checkIfConnectionStarted();
- AMQPExchange amqpExchange = new AMQPExchange(channel,_phase);
- _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class,amqpExchange);
- _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class,amqpExchange);
- return amqpExchange;
- }
-
- public void destoryExchangeClass(int channel, AMQPExchange amqpExchange)throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class,amqpExchange);
- _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class,amqpExchange);
- }
-
- public AMQPQueue createQueueClass(int channel)throws AMQPException
- {
- checkIfConnectionStarted();
- AMQPQueue amqpQueue = new AMQPQueue(channel,_phase);
- _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class,amqpQueue);
- _eventManager.addMethodEventListener(channel, QueueBindOkBody.class,amqpQueue);
- _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class,amqpQueue);
- _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class,amqpQueue);
- _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class,amqpQueue);
- return amqpQueue;
- }
-
- public void destroyQueueClass(int channel,AMQPQueue amqpQueue)throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class,amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class,amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class,amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class,amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class,amqpQueue);
- }
-
- public AMQPMessage createMessageClass(int channel,AMQPMessageCallBack messageCb)throws AMQPException
- {
- checkIfConnectionStarted();
- AMQPMessage amqpMessage = new AMQPMessage(channel,_phase,messageCb);
- _eventManager.addMethodEventListener(channel, MessageAppendBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageCancelBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageCloseBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageGetBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageOffsetBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageOkBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageOpenBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageRecoverBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageRejectBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageResumeBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageQosBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageTransferBody.class,amqpMessage);
-
- return amqpMessage;
- }
-
- public void destoryMessageClass(int channel,AMQPMessage amqpMessage)throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, MessageAppendBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageCancelBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageCloseBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageGetBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageOkBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageOpenBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageRejectBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageResumeBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageQosBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageTransferBody.class,amqpMessage);
- }
-
- //This class should register as a state listener for AMQPConnection
- private void checkIfConnectionStarted() throws AMQPException
- {
- if (_phase == null)
+
+ public AMQPChannel createChannelClass(int channel) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPChannel amqpChannel = new AMQPChannel(channel, _phase,_stateManager);
+ _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
+ return amqpChannel;
+ }
+
+ public void destroyChannelClass(int channel, AMQPChannel amqpChannel) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
+ }
+
+ public AMQPExchange createExchangeClass(int channel) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPExchange amqpExchange = new AMQPExchange(channel, _phase);
+ _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
+ _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
+ return amqpExchange;
+ }
+
+ public void destoryExchangeClass(int channel, AMQPExchange amqpExchange) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
+ _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
+ }
+
+ public AMQPQueue createQueueClass(int channel) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPQueue amqpQueue = new AMQPQueue(channel, _phase);
+ _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
+ return amqpQueue;
+ }
+
+ public void destroyQueueClass(int channel, AMQPQueue amqpQueue) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
+ }
+
+ public AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPMessage amqpMessage = new AMQPMessage(channel, _phase, messageCb);
+ _eventManager.addMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageGetBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOkBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageQosBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
+
+ return amqpMessage;
+ }
+
+ public void destoryMessageClass(int channel, AMQPMessage amqpMessage) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageGetBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOkBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageQosBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
+ }
+
+ //This class should register as a state listener for AMQPConnection
+ private void checkIfConnectionStarted() throws AMQPException
+ {
+ if (_phase == null)
+ {
+ _phase = _amqpConnection.getPhasePipe();
+
+ if (_phase == null)
+ {
+ throw new AMQPException("Cannot create a channel until connection is ready");
+ }
+ }
+ }
+
+ /**
+ * Extention point
+ * Other interested parties can obtain a reference to the event manager
+ * and add listeners to get notified of events
+ *
+ */
+ public AMQPEventManager getEventManager()
+ {
+ return _eventManager;
+ }
+
+ /**
+ * Extention point
+ * Other interested parties can obtain a reference to the state manager
+ * and add listeners to get notified of state changes
+ *
+ */
+ public AMQPStateManager getStateManager()
{
- _phase = _amqpConnection.getPhasePipe();
-
- if (_phase == null)
- {
- throw new AMQPException("Cannot create a channel until connection is ready");
- }
+ return _stateManager;
}
- }
-
- /**
- * Extention point
- * Other interested parties can obtain a reference to the event manager
- * and add listeners to get notified of events
- *
- */
- public AMQPEventManager getEventManager()
- {
- return _eventManager;
- }
-
- /**
- * Extention point
- * Other interested parties can obtain a reference to the state manager
- * and add listeners to get notified of state changes
- *
- */
- public AMQPStateManager getStateManager()
- {
- return null;
- }
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
index aea25a403b..1d6541ec3e 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
@@ -40,7 +40,10 @@ import org.apache.qpid.framing.ConnectionTuneOkBody;
import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
@@ -55,366 +58,383 @@ import org.apache.qpid.nclient.util.AMQPValidator;
*/
public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListener
{
- private static final Logger _logger = Logger.getLogger(AMQPConnection.class);
+ private static final Logger _logger = Logger.getLogger(AMQPConnection.class);
- private Phase _phase;
+ private Phase _phase;
- private TransportConnection _connection;
+ private TransportConnection _connection;
- private long _correlationId;
+ private long _correlationId;
- private AMQPState _currentState;
+ private AMQPState _currentState;
- private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE,
- AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, AMQPState.CONNECTION_OPEN, };
+ private AMQPStateManager _stateManager;
- // The wait period until a server sends a respond
- private long _serverTimeOut = 1000;
+ private final AMQPState[] _validCloseStates = new AMQPState[]
+ { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED,
+ AMQPState.CONNECTION_OPEN, };
- private final Lock _lock = new ReentrantLock();
+ // The wait period until a server sends a respond
+ private long _serverTimeOut = 1000;
- private final Condition _connectionNotStarted = _lock.newCondition();
+ private final Lock _lock = new ReentrantLock();
- private final Condition _connectionNotSecure = _lock.newCondition();
+ private final Condition _connectionNotStarted = _lock.newCondition();
- private final Condition _connectionNotTuned = _lock.newCondition();
+ private final Condition _connectionNotSecure = _lock.newCondition();
- private final Condition _connectionNotOpened = _lock.newCondition();
+ private final Condition _connectionNotTuned = _lock.newCondition();
- private final Condition _connectionNotClosed = _lock.newCondition();
+ private final Condition _connectionNotOpened = _lock.newCondition();
- private ConnectionStartBody _connectionStartBody;
+ private final Condition _connectionNotClosed = _lock.newCondition();
- private ConnectionSecureBody _connectionSecureBody;
+ private ConnectionStartBody _connectionStartBody;
- private ConnectionTuneBody _connectionTuneBody;
+ private ConnectionSecureBody _connectionSecureBody;
- private ConnectionOpenOkBody _connectionOpenOkBody;
+ private ConnectionTuneBody _connectionTuneBody;
- private ConnectionCloseOkBody _connectionCloseOkBody;
-
- private ConnectionCloseBody _connectionCloseBody;
+ private ConnectionOpenOkBody _connectionOpenOkBody;
- protected AMQPConnection(TransportConnection connection)
- {
- _connection = connection;
- _currentState = AMQPState.CONNECTION_UNDEFINED;
- _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
- }
+ private ConnectionCloseOkBody _connectionCloseOkBody;
- /**
- * -------------------------------------------
- * API Methods
- * --------------------------------------------
- */
+ private ConnectionCloseBody _connectionCloseBody;
- /**
- * Opens the TCP connection and let the formalities begin.
- */
- public ConnectionStartBody openTCPConnection() throws AMQPException
- {
- _lock.lock();
- // open the TCP connection
- try
+ protected AMQPConnection(TransportConnection connection, AMQPStateManager stateManager)
{
- _connectionStartBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED);
- _phase = _connection.connect();
-
- // waiting for ConnectionStartBody or error in connection
- //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotStarted.await();
-
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time");
- _currentState = AMQPState.CONNECTION_NOT_STARTED;
- return _connectionStartBody;
+ _connection = connection;
+ _stateManager = stateManager;
+ _currentState = AMQPState.CONNECTION_UNDEFINED;
+ _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
}
- catch (InterruptedException e)
- {
- throw new AMQPException("Error opening connection to broker", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * The current java broker implementation can send a connection tune body
- * as a response to the startOk. Not sure if that is the correct behaviour.
- */
- public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _connectionSecureBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
- _phase.messageSent(msg);
- //_connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotSecure.await();
- //AMQPValidator.throwExceptionOnNull(_connectionSecureBody, "The broker didn't send the ConnectionSecureBody in time");
- //_currentState = AMQPState.CONNECTION_NOT_SECURE;
-
- checkIfConnectionClosed();
- if (_connectionTuneBody != null)
- {
- _currentState = AMQPState.CONNECTION_NOT_TUNED;
- return _connectionTuneBody;
- }
- else if (_connectionSecureBody != null)
- { // oops the server sent another challenge
- _currentState = AMQPState.CONNECTION_NOT_SECURE;
- return _connectionSecureBody;
- }
- else
- {
- throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
- }
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.startOk", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
- * issue a new challenge
- */
- public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _connectionTuneBody = null;
- _connectionSecureBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
-
- _connectionSecureBody = null; // The server could send a fresh challenge
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
- _phase.messageSent(msg);
-
- //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotTuned.await();
- checkIfConnectionClosed();
-
- if (_connectionTuneBody != null)
- {
- _currentState = AMQPState.CONNECTION_NOT_TUNED;
- return _connectionTuneBody;
- }
- else if (_connectionSecureBody != null)
- { // oops the server sent another challenge
- _currentState = AMQPState.CONNECTION_NOT_SECURE;
- return _connectionSecureBody;
- }
- else
- {
- throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
- }
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.secureOk", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
- public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED);
- _connectionSecureBody = null;
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId);
- _phase.messageSent(msg);
- _currentState = AMQPState.CONNECTION_NOT_OPENED;
- }
- finally
- {
- _lock.unlock();
- }
- }
+ /**
+ * -------------------------------------------
+ * API Methods
+ * --------------------------------------------
+ */
- public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException
- {
- _lock.lock();
- try
+ /**
+ * Opens the TCP connection and let the formalities begin.
+ */
+ public ConnectionStartBody openTCPConnection() throws AMQPException
{
- // If the broker sends a connection close due to an error with the
- // Connection tune ok, then this call will verify that
- checkIfConnectionClosed();
-
- _connectionOpenOkBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotOpened.await();
-
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time");
- _currentState = AMQPState.CONNECTION_OPEN;
- return _connectionOpenOkBody;
+ _lock.lock();
+ // open the TCP connection
+ try
+ {
+ _connectionStartBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED);
+ _phase = _connection.connect();
+
+ // waiting for ConnectionStartBody or error in connection
+ //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotStarted.await();
+
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time");
+ notifyState(AMQPState.CONNECTION_NOT_STARTED);
+ _currentState = AMQPState.CONNECTION_NOT_STARTED;
+ return _connectionStartBody;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error opening connection to broker", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.open", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
- public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException
- {
- _lock.lock();
- try
+ /**
+ * The current java broker implementation can send a connection tune body
+ * as a response to the startOk. Not sure if that is the correct behaviour.
+ */
+ public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
{
- _connectionCloseOkBody = null;
- checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
- _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time");
- _currentState = AMQPState.CONNECTION_CLOSED;
- return _connectionCloseOkBody;
+ _lock.lock();
+ try
+ {
+ _connectionSecureBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
+ _phase.messageSent(msg);
+ // _connectionNotSecure.await(_serverTimeOut,TimeUnit.MILLISECONDS);
+ _connectionNotSecure.await();
+
+ checkIfConnectionClosed();
+ if (_connectionTuneBody != null)
+ {
+ notifyState(AMQPState.CONNECTION_NOT_TUNED);
+ _currentState = AMQPState.CONNECTION_NOT_TUNED;
+ return _connectionTuneBody;
+ }
+ else if (_connectionSecureBody != null)
+ { // oops the server sent another challenge
+ notifyState(AMQPState.CONNECTION_NOT_SECURE);
+ _currentState = AMQPState.CONNECTION_NOT_SECURE;
+ return _connectionSecureBody;
+ }
+ else
+ {
+ throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.startOk", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- catch (InterruptedException e)
+
+ /**
+ * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
+ * issue a new challenge
+ */
+ public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException
{
- throw new AMQPException("Error in connection.close", e);
+ _lock.lock();
+ try
+ {
+ _connectionTuneBody = null;
+ _connectionSecureBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
+
+ _connectionSecureBody = null; // The server could send a fresh challenge
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
+ _phase.messageSent(msg);
+
+ //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotTuned.await();
+ checkIfConnectionClosed();
+
+ if (_connectionTuneBody != null)
+ {
+ notifyState(AMQPState.CONNECTION_NOT_TUNED);
+ _currentState = AMQPState.CONNECTION_NOT_TUNED;
+ return _connectionTuneBody;
+ }
+ else if (_connectionSecureBody != null)
+ { // oops the server sent another challenge
+ notifyState(AMQPState.CONNECTION_NOT_SECURE);
+ _currentState = AMQPState.CONNECTION_NOT_SECURE;
+ return _connectionSecureBody;
+ }
+ else
+ {
+ throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.secureOk", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- finally
+
+ public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException
{
- _lock.unlock();
+ _lock.lock();
+ try
+ {
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED);
+ _connectionSecureBody = null;
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId);
+ _phase.messageSent(msg);
+ notifyState(AMQPState.CONNECTION_NOT_OPENED);
+ _currentState = AMQPState.CONNECTION_NOT_OPENED;
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- }
-
- /**
- * ------------------------------------------- AMQMethodListener methods
- * --------------------------------------------
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- _lock.lock();
- try
+
+ public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException
{
- _correlationId = evt.getCorrelationId();
-
- if (evt.getMethod() instanceof ConnectionStartBody)
- {
- _connectionStartBody = (ConnectionStartBody) evt.getMethod();
- _connectionNotStarted.signalAll();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionSecureBody)
- {
- _connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
- _connectionNotSecure.signal();
- _connectionNotTuned.signal(); // in case the server has sent another chanllenge
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionTuneBody)
- {
- _connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
- _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk
- _connectionNotTuned.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionOpenOkBody)
- {
- _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
- _connectionNotOpened.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionCloseOkBody)
- {
- _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
- _connectionNotClosed.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionCloseBody)
- {
- _connectionCloseBody = (ConnectionCloseBody)evt.getMethod();
- // release the correct lock as u may have some conditions waiting.
- // while an error occured and the broker has sent a close.
- releaseLocks();
- handleClose();
- return true;
- }
- else
- {
- return false;
- }
+ _lock.lock();
+ try
+ {
+ // If the broker sends a connection close due to an error with the
+ // Connection tune ok, then this call will verify that
+ checkIfConnectionClosed();
+
+ _connectionOpenOkBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotOpened.await();
+
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time");
+ notifyState(AMQPState.CONNECTION_OPEN);
+ _currentState = AMQPState.CONNECTION_OPEN;
+ return _connectionOpenOkBody;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.open", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- finally
+
+ public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException
{
- _lock.unlock();
- }
- }
-
- private void handleClose() throws AMQPException
- {
- try
- {
- _currentState = AMQPState.CONNECTION_CLOSING;
- // do the required cleanup and send a ConnectionCloseOkBody
+ _lock.lock();
+ try
+ {
+ _connectionCloseOkBody = null;
+ checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+ _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time");
+ notifyState(AMQPState.CONNECTION_CLOSED);
+ _currentState = AMQPState.CONNECTION_CLOSED;
+ return _connectionCloseOkBody;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.close", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- catch (Exception e)
+
+ /**
+ * -------------------------------------------
+ * AMQMethodListener methods
+ * --------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
{
- throw new AMQPException("Error handling connection.close from broker", e);
+ _lock.lock();
+ try
+ {
+ _correlationId = evt.getCorrelationId();
+
+ if (evt.getMethod() instanceof ConnectionStartBody)
+ {
+ _connectionStartBody = (ConnectionStartBody) evt.getMethod();
+ _connectionNotStarted.signalAll();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionSecureBody)
+ {
+ _connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
+ _connectionNotSecure.signal();
+ _connectionNotTuned.signal(); // in case the server has sent another chanllenge
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionTuneBody)
+ {
+ _connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
+ _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk
+ _connectionNotTuned.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionOpenOkBody)
+ {
+ _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
+ _connectionNotOpened.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionCloseOkBody)
+ {
+ _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
+ _connectionNotClosed.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionCloseBody)
+ {
+ _connectionCloseBody = (ConnectionCloseBody) evt.getMethod();
+ // release the correct lock as u may have some conditions waiting.
+ // while an error occured and the broker has sent a close.
+ releaseLocks();
+ handleClose();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- }
-
- private void checkIfConnectionClosed()throws AMQPException
- {
- if (_connectionCloseBody != null)
+
+
+ public Phase getPhasePipe()
{
- String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() +
- " with reply code (" + _connectionCloseBody.getReplyCode() + ") " +
- "caused by class " + _connectionCloseBody.getClassId() +
- " and method " + _connectionCloseBody.getMethod();
-
- throw new AMQPException(error);
+ return _phase;
}
- }
-
- private void releaseLocks()
- {
- if(_currentState == AMQPState.CONNECTION_NOT_OPENED)
+
+ private void handleClose() throws AMQPException
{
- _connectionNotOpened.signal();
+ try
+ {
+ notifyState(AMQPState.CONNECTION_CLOSING);
+ _currentState = AMQPState.CONNECTION_CLOSING;
+ // do the required cleanup and send a ConnectionCloseOkBody
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error handling connection.close from broker", e);
+ }
}
- else if(_currentState == AMQPState.CONNECTION_UNDEFINED)
+
+ private void checkIfConnectionClosed() throws AMQPException
{
- _connectionNotStarted.signal();
+ if (_connectionCloseBody != null)
+ {
+ String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + " with reply code ("
+ + _connectionCloseBody.getReplyCode() + ") " + "caused by class " + _connectionCloseBody.getClassId() + " and method "
+ + _connectionCloseBody.getMethod();
+
+ throw new AMQPException(error);
+ }
}
- else if(_currentState == AMQPState.CONNECTION_NOT_STARTED)
+
+ private void releaseLocks()
{
- _connectionNotSecure.signal();
+ if (_currentState == AMQPState.CONNECTION_NOT_OPENED)
+ {
+ _connectionNotOpened.signal();
+ }
+ else if (_currentState == AMQPState.CONNECTION_UNDEFINED)
+ {
+ _connectionNotStarted.signal();
+ }
+ else if (_currentState == AMQPState.CONNECTION_NOT_STARTED)
+ {
+ _connectionNotSecure.signal();
+ }
+ else if (_currentState == AMQPState.CONNECTION_NOT_SECURE)
+ {
+ _connectionNotTuned.signal();
+ }
}
- else if(_currentState == AMQPState.CONNECTION_NOT_SECURE)
+
+ private void notifyState(AMQPState newState) throws AMQPException
{
- _connectionNotTuned.signal();
+ _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CONNECTION_STATE));
}
- }
-
- public Phase getPhasePipe()
- {
- return _phase;
- }
} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java
index e49524ee30..20c4921582 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java
@@ -20,23 +20,65 @@
*/
package org.apache.qpid.nclient.amqp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
import org.apache.qpid.nclient.amqp.state.AMQPStateListener;
import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
+import org.apache.qpid.nclient.core.AMQPException;
public class QpidStateManager implements AMQPStateManager
{
- public void addListener(AMQPStateListener l) throws AMQException
- {
- // TODO Auto-generated method stub
+ private static final Logger _logger = Logger.getLogger(QpidStateManager.class);
- }
+ private Map<AMQPStateType, List<AMQPStateListener>> _listernerMap = new ConcurrentHashMap<AMQPStateType, List<AMQPStateListener>>();
- public void removeListener(AMQPStateListener l) throws AMQException
+ public void addListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException
{
- // TODO Auto-generated method stub
+ List<AMQPStateListener> list;
+ if(_listernerMap.containsKey(stateType))
+ {
+ list = _listernerMap.get(stateType);
+ }
+ else
+ {
+ list = new ArrayList<AMQPStateListener>();
+ _listernerMap.put(stateType, list);
+ }
+ list.add(l);
+ }
+ public void removeListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException
+ {
+ if(_listernerMap.containsKey(stateType))
+ {
+ List<AMQPStateListener> list = _listernerMap.get(stateType);
+ list.remove(l);
+ }
+ }
+
+ public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException
+ {
+
+ if(_listernerMap.containsKey(event.getStateType()))
+ {
+ List<AMQPStateListener> list = _listernerMap.get(event.getStateType());
+ for(AMQPStateListener l: list)
+ {
+ l.stateChanged(event);
+ }
+ }
+ else
+ {
+ _logger.warn("There are no registered listerners for state type" + event.getStateType());
+ }
}
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java
new file mode 100644
index 0000000000..43f096a062
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java
@@ -0,0 +1,19 @@
+package org.apache.qpid.nclient.amqp.sample;
+
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
+import org.apache.qpid.nclient.amqp.state.AMQPStateListener;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class StateHelper implements AMQPStateListener
+{
+
+ public void stateChanged(AMQPStateChangedEvent event) throws AMQPException
+ {
+ String s = event.getStateType() + " changed state from " +
+ event.getOldState() + " to " + event.getNewState();
+
+ System.out.println(s);
+
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
index 166d565f81..f994a99ec3 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
@@ -65,6 +65,7 @@ import org.apache.qpid.nclient.amqp.AMQPConnection;
import org.apache.qpid.nclient.amqp.AMQPExchange;
import org.apache.qpid.nclient.amqp.AMQPMessage;
import org.apache.qpid.nclient.amqp.AMQPQueue;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
import org.apache.qpid.nclient.transport.AMQPConnectionURL;
import org.apache.qpid.nclient.transport.ConnectionURL;
import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
@@ -80,371 +81,371 @@ import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionTy
@SuppressWarnings("unused")
public class TestClient
{
- private byte _major;
- private byte _minor;
- private ConnectionURL _url;
- private static int _channel = 2;
- // Need a Class factory per connection
- private AMQPClassFactory _classFactory = new AMQPClassFactory();
- private int _ticket;
-
- public AMQPConnection openConnection() throws Exception
- {
- //_url = new AMQPConnectionURL("amqp://guest:guest@test/localhost?brokerlist='vm://:3'");
-
- _url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'");
- return _classFactory.createConnectionClass(_url,ConnectionType.TCP);
- }
-
- public void handleConnectionNegotiation(AMQPConnection con) throws Exception
- {
- // ConnectionStartBody
- ConnectionStartBody connectionStartBody = con.openTCPConnection();
- _major = connectionStartBody.getMajor();
- _minor = connectionStartBody.getMinor();
-
- FieldTable clientProperties = FieldTableFactory.newFieldTable();
- clientProperties.put(new AMQShortString(ClientProperties.instance.toString()),"Test"); // setting only the client id
-
- final String locales = new String(connectionStartBody.getLocales(), "utf8");
- final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
-
- final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms());
-
- SaslClient sc = Sasl.createSaslClient(new String[]{mechanism},
- null, "AMQP", "localhost",
- null, SecurityHelper.createCallbackHandler(mechanism,_url));
-
- ConnectionStartOkBody connectionStartOkBody =
- ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties,
- new AMQShortString(tokenizer.nextToken()),
- new AMQShortString(mechanism),
- (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null));
- // ConnectionSecureBody
- AMQMethodBody body = con.startOk(connectionStartOkBody);
- ConnectionTuneBody connectionTuneBody;
-
- if (body instanceof ConnectionSecureBody)
+ private byte _major;
+
+ private byte _minor;
+
+ private ConnectionURL _url;
+
+ private static int _channel = 2;
+
+ // Need a Class factory per connection
+ private AMQPClassFactory _classFactory = new AMQPClassFactory();
+
+ private int _ticket;
+
+ public AMQPConnection openConnection() throws Exception
+ {
+ //_url = new AMQPConnectionURL("amqp://guest:guest@test/localhost?brokerlist='vm://:3'");
+
+ _url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'");
+ return _classFactory.createConnectionClass(_url, ConnectionType.TCP);
+ }
+
+ public void handleConnectionNegotiation(AMQPConnection con) throws Exception
+ {
+ StateHelper stateHelper = new StateHelper();
+ _classFactory.getStateManager().addListener(AMQPStateType.CONNECTION_STATE, stateHelper);
+ _classFactory.getStateManager().addListener(AMQPStateType.CHANNEL_STATE, stateHelper);
+
+ //ConnectionStartBody
+ ConnectionStartBody connectionStartBody = con.openTCPConnection();
+ _major = connectionStartBody.getMajor();
+ _minor = connectionStartBody.getMinor();
+
+ FieldTable clientProperties = FieldTableFactory.newFieldTable();
+ clientProperties.put(new AMQShortString(ClientProperties.instance.toString()), "Test"); // setting only the client id
+
+ final String locales = new String(connectionStartBody.getLocales(), "utf8");
+ final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
+
+ final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms());
+
+ SaslClient sc = Sasl.createSaslClient(new String[]
+ { mechanism }, null, "AMQP", "localhost", null, SecurityHelper.createCallbackHandler(mechanism, _url));
+
+ ConnectionStartOkBody connectionStartOkBody = ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties, new AMQShortString(
+ tokenizer.nextToken()), new AMQShortString(mechanism), (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null));
+ // ConnectionSecureBody
+ AMQMethodBody body = con.startOk(connectionStartOkBody);
+ ConnectionTuneBody connectionTuneBody;
+
+ if (body instanceof ConnectionSecureBody)
+ {
+ ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody) body;
+ ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(_major, _minor, sc
+ .evaluateChallenge(connectionSecureBody.getChallenge()));
+ //Assuming the server is not going to send another challenge
+ connectionTuneBody = (ConnectionTuneBody) con.secureOk(connectionSecureOkBody);
+
+ }
+ else
+ {
+ connectionTuneBody = (ConnectionTuneBody) body;
+ }
+
+ // Using broker supplied values
+ ConnectionTuneOkBody connectionTuneOkBody = ConnectionTuneOkBody.createMethodBody(_major, _minor, connectionTuneBody.getChannelMax(),
+ connectionTuneBody.getFrameMax(), connectionTuneBody.getHeartbeat());
+ con.tuneOk(connectionTuneOkBody);
+
+ ConnectionOpenBody connectionOpenBody = ConnectionOpenBody.createMethodBody(_major, _minor, null, true, new AMQShortString(_url
+ .getVirtualHost()));
+
+ ConnectionOpenOkBody connectionOpenOkBody = con.open(connectionOpenBody);
+ }
+
+ public void handleChannelNegotiation() throws Exception
+ {
+ AMQPChannel channel = _classFactory.createChannelClass(_channel);
+
+ ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1"));
+ ChannelOpenOkBody channelOpenOkBody = channel.open(channelOpenBody);
+
+ //lets have some fun
+ ChannelFlowBody channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, false);
+
+ ChannelFlowOkBody channelFlowOkBody = channel.flow(channelFlowBody);
+ System.out.println("Channel is " + (channelFlowOkBody.getActive() ? "active" : "suspend"));
+
+ channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, true);
+ channelFlowOkBody = channel.flow(channelFlowBody);
+ System.out.println("Channel is " + (channelFlowOkBody.getActive() ? "active" : "suspend"));
+ }
+
+ public void createExchange() throws Exception
+ {
+ AMQPExchange exchange = _classFactory.createExchangeClass(_channel);
+
+ ExchangeDeclareBody exchangeDeclareBody = ExchangeDeclareBody.createMethodBody(_major, _minor, null, // arguments
+ false,//auto delete
+ false,// durable
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME), true, //internal
+ false,// nowait
+ false,// passive
+ _ticket, new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS));
+
+ AMQPCallBack cb = createCallBackWithMessage("Broker has created the exchange");
+ exchange.declare(exchangeDeclareBody, cb);
+ // Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+ }
+
+ public void createAndBindQueue() throws Exception
+ {
+ AMQPQueue queue = _classFactory.createQueueClass(_channel);
+
+ QueueDeclareBody queueDeclareBody = QueueDeclareBody.createMethodBody(_major, _minor, null, //arguments
+ false,//auto delete
+ false,// durable
+ false, //exclusive,
+ false, //nowait,
+ false, //passive,
+ new AMQShortString("MyTestQueue"), 0);
+
+ AMQPCallBack cb = new AMQPCallBack()
+ {
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody) body;
+ System.out.println("[Broker has created the queue, " + "message count " + queueDeclareOkBody.getMessageCount() + "consumer count "
+ + queueDeclareOkBody.getConsumerCount() + "]\n");
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ queue.declare(queueDeclareBody, cb);
+ //Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+
+ QueueBindBody queueBindBody = QueueBindBody.createMethodBody(_major, _minor, null, //arguments
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),//exchange
+ false, //nowait
+ new AMQShortString("MyTestQueue"), //queue
+ new AMQShortString("RH"), //routingKey
+ 0 //ticket
+ );
+
+ cb = createCallBackWithMessage("Broker has bound the queue");
+ queue.bind(queueBindBody, cb);
+ //Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+ }
+
+ public void purgeQueue() throws Exception
{
- ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody)body;
- ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(
- _major,_minor,sc.evaluateChallenge(connectionSecureBody.getChallenge()));
- //Assuming the server is not going to send another challenge
- connectionTuneBody = (ConnectionTuneBody)con.secureOk(connectionSecureOkBody);
-
+ AMQPQueue queue = _classFactory.createQueueClass(_channel);
+
+ QueuePurgeBody queuePurgeBody = QueuePurgeBody.createMethodBody(_major, _minor, false, //nowait
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ AMQPCallBack cb = new AMQPCallBack()
+ {
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ QueuePurgeOkBody queuePurgeOkBody = (QueuePurgeOkBody) body;
+ System.out.println("[Broker has purged the queue, message count " + queuePurgeOkBody.getMessageCount() + "]\n");
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ queue.purge(queuePurgeBody, cb);
+ //Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+
}
- else
+
+ public void deleteQueue() throws Exception
{
- connectionTuneBody = (ConnectionTuneBody)body;
+ AMQPQueue queue = _classFactory.createQueueClass(_channel);
+
+ QueueDeleteBody queueDeleteBody = QueueDeleteBody.createMethodBody(_major, _minor, false, //ifEmpty
+ false, //ifUnused
+ false, //nowait
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ AMQPCallBack cb = new AMQPCallBack()
+ {
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ QueueDeleteOkBody queueDeleteOkBody = (QueueDeleteOkBody) body;
+ System.out.println("[Broker has deleted the queue, message count " + queueDeleteOkBody.getMessageCount() + "]\n");
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ queue.delete(queueDeleteBody, cb);
+ //Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+
}
-
-
- // Using broker supplied values
- ConnectionTuneOkBody connectionTuneOkBody =
- ConnectionTuneOkBody.createMethodBody(_major,_minor,
- connectionTuneBody.getChannelMax(),
- connectionTuneBody.getFrameMax(),
- connectionTuneBody.getHeartbeat());
- con.tuneOk(connectionTuneOkBody);
-
- ConnectionOpenBody connectionOpenBody =
- ConnectionOpenBody.createMethodBody(_major,_minor,null, true,new AMQShortString(_url.getVirtualHost()));
-
- ConnectionOpenOkBody connectionOpenOkBody = con.open(connectionOpenBody);
- }
-
- public void handleChannelNegotiation() throws Exception
- {
- AMQPChannel channel = _classFactory.createChannelClass(_channel);
-
- ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1"));
- ChannelOpenOkBody channelOpenOkBody = channel.open(channelOpenBody);
-
- //lets have some fun
- ChannelFlowBody channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, false);
-
- ChannelFlowOkBody channelFlowOkBody = channel.flow(channelFlowBody);
- System.out.println("Channel is " + (channelFlowOkBody.getActive()? "active" : "suspend"));
-
- channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, true);
- channelFlowOkBody = channel.flow(channelFlowBody);
- System.out.println("Channel is " + (channelFlowOkBody.getActive()? "active" : "suspend"));
- }
-
- public void createExchange() throws Exception
- {
- AMQPExchange exchange = _classFactory.createExchangeClass(_channel);
-
- ExchangeDeclareBody exchangeDeclareBody =
- ExchangeDeclareBody.createMethodBody(_major, _minor,
- null, // arguments
- false,//auto delete
- false,// durable
- new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),
- true, //internal
- false,// nowait
- false,// passive
- _ticket,
- new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS));
-
- AMQPCallBack cb = createCallBackWithMessage("Broker has created the exchange");
- exchange.declare(exchangeDeclareBody, cb);
- // Blocking for response
- while (!cb.isComplete()){}
- }
-
-
- public void createAndBindQueue()throws Exception
- {
- AMQPQueue queue = _classFactory.createQueueClass(_channel);
-
- QueueDeclareBody queueDeclareBody =
- QueueDeclareBody.createMethodBody(_major, _minor,
- null, //arguments
- false,//auto delete
- false,// durable
- false, //exclusive,
- false, //nowait,
- false, //passive,
- new AMQShortString("MyTestQueue"),
- 0);
-
- AMQPCallBack cb = new AMQPCallBack(){
-
- @Override
- public void brokerResponded(AMQMethodBody body)
- {
- QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody)body;
- System.out.println("[Broker has created the queue, " +
- "message count " + queueDeclareOkBody.getMessageCount() +
- "consumer count " + queueDeclareOkBody.getConsumerCount() + "]\n");
- }
-
- @Override
- public void brokerRespondedWithError(AMQException e)
- {
- }
-
- };
-
- queue.declare(queueDeclareBody, cb);
- //Blocking for response
- while (!cb.isComplete()){}
-
- QueueBindBody queueBindBody =
- QueueBindBody.createMethodBody(_major, _minor,
- null, //arguments
- new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),//exchange
- false, //nowait
- new AMQShortString("MyTestQueue"), //queue
- new AMQShortString("RH"), //routingKey
- 0 //ticket
- );
-
- cb = createCallBackWithMessage("Broker has bound the queue");
- queue.bind(queueBindBody, cb);
- //Blocking for response
- while (!cb.isComplete()){}
- }
-
- public void purgeQueue()throws Exception
- {
- AMQPQueue queue = _classFactory.createQueueClass(_channel);
-
- QueuePurgeBody queuePurgeBody =
- QueuePurgeBody.createMethodBody(_major, _minor,
- false, //nowait
- new AMQShortString("MyTestQueue"), //queue
- 0 //ticket
- );
-
- AMQPCallBack cb = new AMQPCallBack(){
-
- @Override
- public void brokerResponded(AMQMethodBody body)
- {
- QueuePurgeOkBody queuePurgeOkBody = (QueuePurgeOkBody)body;
- System.out.println("[Broker has purged the queue, message count " + queuePurgeOkBody.getMessageCount() + "]\n");
- }
-
- @Override
- public void brokerRespondedWithError(AMQException e)
- {
- }
-
- };
-
- queue.purge(queuePurgeBody, cb);
- //Blocking for response
- while (!cb.isComplete()){}
-
- }
-
- public void deleteQueue()throws Exception
- {
- AMQPQueue queue = _classFactory.createQueueClass(_channel);
-
- QueueDeleteBody queueDeleteBody =
- QueueDeleteBody.createMethodBody(_major, _minor,
- false, //ifEmpty
- false, //ifUnused
- false, //nowait
- new AMQShortString("MyTestQueue"), //queue
- 0 //ticket
- );
-
- AMQPCallBack cb = new AMQPCallBack(){
-
- @Override
- public void brokerResponded(AMQMethodBody body)
- {
- QueueDeleteOkBody queueDeleteOkBody = (QueueDeleteOkBody)body;
- System.out.println("[Broker has deleted the queue, message count " + queueDeleteOkBody.getMessageCount() + "]\n");
- }
-
- @Override
- public void brokerRespondedWithError(AMQException e)
- {
- }
-
- };
-
- queue.delete(queueDeleteBody, cb);
- //Blocking for response
- while (!cb.isComplete()){}
-
- }
-
- public void publishAndSubscribe() throws Exception
- {
- AMQPMessage message = _classFactory.createMessageClass(_channel,new MessageHelper());
- MessageConsumeBody messageConsumeBody = MessageConsumeBody.createMethodBody(_major, _minor,
- new AMQShortString("myClient"),// destination
- false, //exclusive
- null, //filter
- false, //noAck,
- false, //noLocal,
- new AMQShortString("MyTestQueue"), //queue
- 0 //ticket
- );
-
- AMQPCallBack cb = createCallBackWithMessage("Broker has accepted the consume");
- message.consume(messageConsumeBody, cb);
- //Blocking for response
- while (!cb.isComplete()){}
-
- // Sending 5 messages serially
- for (int i=0; i<5; i++)
+
+ public void publishAndSubscribe() throws Exception
{
- cb = createCallBackWithMessage("Broker has accepted msg " + i);
- message.transfer(createMessages("Test" + i),cb);
- while (!cb.isComplete()){}
+ AMQPMessage message = _classFactory.createMessageClass(_channel, new MessageHelper());
+ MessageConsumeBody messageConsumeBody = MessageConsumeBody.createMethodBody(_major, _minor, new AMQShortString("myClient"),// destination
+ false, //exclusive
+ null, //filter
+ false, //noAck,
+ false, //noLocal,
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ AMQPCallBack cb = createCallBackWithMessage("Broker has accepted the consume");
+ message.consume(messageConsumeBody, cb);
+ //Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+
+ // Sending 5 messages serially
+ for (int i = 0; i < 5; i++)
+ {
+ cb = createCallBackWithMessage("Broker has accepted msg " + i);
+ message.transfer(createMessages("Test" + i), cb);
+ while (!cb.isComplete())
+ {
+ }
+ }
+
+ MessageCancelBody messageCancelBody = MessageCancelBody.createMethodBody(_major, _minor, new AMQShortString("myClient"));
+
+ AMQPCallBack cb2 = createCallBackWithMessage("Broker has accepted the consume cancel");
+ message.cancel(messageCancelBody, cb2);
+
}
-
- MessageCancelBody messageCancelBody = MessageCancelBody.createMethodBody(_major, _minor, new AMQShortString("myClient"));
-
- AMQPCallBack cb2 = createCallBackWithMessage("Broker has accepted the consume cancel");
- message.cancel(messageCancelBody, cb2);
-
- }
-
- private MessageTransferBody createMessages(String content) throws Exception
- {
- FieldTable headers = FieldTableFactory.newFieldTable();
- headers.setAsciiString(new AMQShortString("Test"), System.currentTimeMillis() + "");
-
- MessageTransferBody messageTransferBody =
- MessageTransferBody.createMethodBody(_major, _minor,
- new AMQShortString("testApp"), //appId
- headers, //applicationHeaders
- new Content(Content.TypeEnum.INLINE_T,content.getBytes()), //body
- new AMQShortString(""), //contentEncoding,
- new AMQShortString("text/plain"), //contentType
- new AMQShortString("testApp"), //correlationId
- (short)1, //deliveryMode non persistant
- new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination
- new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange
- 0l, //expiration
- false, //immediate
- false, //mandatory
- new AMQShortString(UUID.randomUUID().toString()), //messageId
- (short)0, //priority
- false, //redelivered
- new AMQShortString("RH"), //replyTo
- new AMQShortString("RH"), //routingKey,
- "abc".getBytes(), //securityToken
- 0, //ticket
- System.currentTimeMillis(), //timestamp
- new AMQShortString(""), //transactionId
- 0l, //ttl,
- new AMQShortString("Hello") //userId
- );
-
- return messageTransferBody;
-
- }
-
- public void publishAndGet() throws Exception
- {
- AMQPMessage message = _classFactory.createMessageClass(_channel,new MessageHelper());
- AMQPCallBack cb = createCallBackWithMessage("Broker has accepted msg 5");
-
- MessageGetBody messageGetBody =
- MessageGetBody.createMethodBody(_major, _minor,
- new AMQShortString("myClient"),
- false, //noAck
- new AMQShortString("MyTestQueue"), //queue
- 0 //ticket
- );
-
- //AMQPMessage message = _classFactory.createMessage(_channel,new MessageHelper());
- message.transfer(createMessages("Test"),cb);
- while(!cb.isComplete()){}
-
- cb = createCallBackWithMessage("Broker has accepted get");
- message.get(messageGetBody, cb);
- }
-
- // Creates a gneric call back and prints the given message
- private AMQPCallBack createCallBackWithMessage(final String msg)
- {
- AMQPCallBack cb = new AMQPCallBack(){
-
- @Override
- public void brokerResponded(AMQMethodBody body)
- {
- System.out.println(msg);
- }
-
- @Override
- public void brokerRespondedWithError(AMQException e)
- {
- }
-
- };
-
- return cb;
- }
-
- public static void main(String[] args)
- {
- TestClient test = new TestClient();
- try
+
+ private MessageTransferBody createMessages(String content) throws Exception
{
- AMQPConnection con = test.openConnection();
- test.handleConnectionNegotiation(con);
- test.handleChannelNegotiation();
- test.createExchange();
- test.createAndBindQueue();
- test.publishAndSubscribe();
- test.purgeQueue();
- test.publishAndGet();
- test.deleteQueue();
+ FieldTable headers = FieldTableFactory.newFieldTable();
+ headers.setAsciiString(new AMQShortString("Test"), System.currentTimeMillis() + "");
+
+ MessageTransferBody messageTransferBody = MessageTransferBody.createMethodBody(_major, _minor, new AMQShortString("testApp"), //appId
+ headers, //applicationHeaders
+ new Content(Content.TypeEnum.INLINE_T, content.getBytes()), //body
+ new AMQShortString(""), //contentEncoding,
+ new AMQShortString("text/plain"), //contentType
+ new AMQShortString("testApp"), //correlationId
+ (short) 1, //deliveryMode non persistant
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange
+ 0l, //expiration
+ false, //immediate
+ false, //mandatory
+ new AMQShortString(UUID.randomUUID().toString()), //messageId
+ (short) 0, //priority
+ false, //redelivered
+ new AMQShortString("RH"), //replyTo
+ new AMQShortString("RH"), //routingKey,
+ "abc".getBytes(), //securityToken
+ 0, //ticket
+ System.currentTimeMillis(), //timestamp
+ new AMQShortString(""), //transactionId
+ 0l, //ttl,
+ new AMQShortString("Hello") //userId
+ );
+
+ return messageTransferBody;
+
}
- catch (Exception e)
+
+ public void publishAndGet() throws Exception
+ {
+ AMQPMessage message = _classFactory.createMessageClass(_channel, new MessageHelper());
+ AMQPCallBack cb = createCallBackWithMessage("Broker has accepted msg 5");
+
+ MessageGetBody messageGetBody = MessageGetBody.createMethodBody(_major, _minor, new AMQShortString("myClient"), false, //noAck
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ //AMQPMessage message = _classFactory.createMessage(_channel,new MessageHelper());
+ message.transfer(createMessages("Test"), cb);
+ while (!cb.isComplete())
+ {
+ }
+
+ cb = createCallBackWithMessage("Broker has accepted get");
+ message.get(messageGetBody, cb);
+ }
+
+ // Creates a gneric call back and prints the given message
+ private AMQPCallBack createCallBackWithMessage(final String msg)
+ {
+ AMQPCallBack cb = new AMQPCallBack()
+ {
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ System.out.println(msg);
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ return cb;
+ }
+
+ public static void main(String[] args)
{
- e.printStackTrace();
+ TestClient test = new TestClient();
+ try
+ {
+ AMQPConnection con = test.openConnection();
+ test.handleConnectionNegotiation(con);
+ test.handleChannelNegotiation();
+ test.createExchange();
+ test.createAndBindQueue();
+ test.publishAndSubscribe();
+ test.purgeQueue();
+ test.publishAndGet();
+ test.deleteQueue();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
}
- }
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
index 3555704c4f..061ec5a849 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
@@ -38,7 +38,8 @@ public class AMQPState
public String toString()
{
- return "AMQState: id = " + _id + " name: " + _name;
+ //return "AMQState: id = " + _id + " name: " + _name;
+ return _name; // looks better with loggin
}
// Connection state
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java
new file mode 100644
index 0000000000..506d267a9e
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.state;
+
+public class AMQPStateChangedEvent
+{
+ private AMQPState _oldState;
+
+ private AMQPState _newState;
+
+ private AMQPStateType _stateType;
+
+ public AMQPStateChangedEvent(AMQPState oldState, AMQPState newState, AMQPStateType stateType)
+ {
+ _oldState = oldState;
+ _newState = newState;
+ _stateType = stateType;
+ }
+
+ public AMQPState getNewState()
+ {
+ return _newState;
+ }
+
+ public void setNewState(AMQPState newState)
+ {
+ this._newState = newState;
+ }
+
+ public AMQPState getOldState()
+ {
+ return _oldState;
+ }
+
+ public void setOldState(AMQPState oldState)
+ {
+ this._oldState = oldState;
+ }
+
+ public AMQPStateType getStateType()
+ {
+ return _stateType;
+ }
+
+ public void setStateType(AMQPStateType stateType)
+ {
+ this._stateType = stateType;
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java
index 67f854caf9..974f707504 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java
@@ -4,5 +4,5 @@ import org.apache.qpid.nclient.core.AMQPException;
public interface AMQPStateListener
{
- public void stateChanged(AMQPState oldState, AMQPState newState) throws AMQPException;
+ public void stateChanged(AMQPStateChangedEvent event) throws AMQPException;
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java
index 2956a19e66..9bc60b658e 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java
@@ -1,11 +1,13 @@
package org.apache.qpid.nclient.amqp.state;
import org.apache.qpid.AMQException;
+import org.apache.qpid.nclient.core.AMQPException;
public interface AMQPStateManager
{
-
- public void addListener(AMQPStateListener l)throws AMQException;
+ public void addListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException;
+
+ public void removeListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException;
- public void removeListener(AMQPStateListener l)throws AMQException;
+ public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException;
} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java
index cbae4aafee..e190d639f2 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java
@@ -40,7 +40,7 @@ public class AMQPStateType
public String toString()
{
- return "AMQState: id = " + _typeId + " name: " + _typeName;
+ return _typeName;
}
// Connection state
diff --git a/java/pom.xml b/java/pom.xml
index 2150e61861..62a49c2c40 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -130,6 +130,7 @@
<module>common</module>
<module>broker</module>
<module>client</module>
+ <module>newclient</module>
<module>cluster</module>
<module>systests</module>
<module>perftests</module>
@@ -382,7 +383,7 @@
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
- <version>1.2</version>
+ <version>1.3</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
@@ -475,6 +476,11 @@
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-newclient</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker</artifactId>
<version>${project.version}</version>
</dependency>