summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-05-30 17:19:51 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-05-30 17:19:51 +0000
commitc21fe3e31285fe01b0f14f3d5b2c920edd6b2546 (patch)
treee1daa1e8a2bde10db285ffbb7a1d881e0336c746
parent48f73ce3731366fd1b014faa3bbaa2820f41e8bb (diff)
downloadqpid-python-c21fe3e31285fe01b0f14f3d5b2c920edd6b2546.tar.gz
bug fixes and enchancements for Qpid java client
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@542880 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java8
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java4
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java12
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java16
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java14
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java10
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java40
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConnection.java3
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java29
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageConsumer.java6
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageHelper.java12
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageProducer.java4
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidSession.java2
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java12
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPConstants.java (renamed from java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java)2
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java4
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java4
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java4
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java19
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java105
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidExchangeHelperImpl.java6
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java200
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageHelperImpl.java186
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java14
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidQueueHelperImpl.java9
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java107
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java10
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java4
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java8
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java6
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java18
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java8
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java8
33 files changed, 623 insertions, 271 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
index f984e812c2..42a3d4bb3f 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.core.AMQPConstants;
public abstract class AMQPCallBackSupport
{
@@ -52,14 +52,14 @@ public abstract class AMQPCallBackSupport
{
if(noWait)
{
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,AMQPConstants.EMPTY_CORRELATION_ID);
return msg;
}
else
{
// u only need to register if u are expecting a response
long localCorrelationId = getNextCorrelationId();
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,AMQPConstants.EMPTY_CORRELATION_ID,localCorrelationId);
_cbMap.put(localCorrelationId, cb);
return msg;
}
@@ -68,7 +68,7 @@ public abstract class AMQPCallBackSupport
protected AMQPMethodEvent handleAsynchronousCall(AMQMethodBody methodBody,AMQPCallBack cb)
{
long localCorrelationId = getNextCorrelationId();
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,AMQPConstants.EMPTY_CORRELATION_ID,localCorrelationId);
_cbMap.put(localCorrelationId, cb);
return msg;
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java
index 6d0e83bb7e..094b94f5cb 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java
@@ -22,13 +22,13 @@ package org.apache.qpid.nclient.amqp;
import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.core.AMQPConstants;
public class AbstractAMQPClassFactory
{
public static AMQPClassFactory getFactoryInstance() throws AMQPException
{
- String className = ClientConfiguration.get().getString(QpidConstants.AMQP_CLASS_FACTORY);
+ String className = ClientConfiguration.get().getString(AMQPConstants.AMQP_CLASS_FACTORY);
try
{
return (AMQPClassFactory)Class.forName(className).newInstance();
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
index decb120796..0a019a3d28 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
@@ -46,7 +46,7 @@ import org.apache.qpid.nclient.amqp.state.AMQPStateType;
import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.core.AMQPConstants;
import org.apache.qpid.nclient.util.AMQPValidator;
/**
@@ -106,7 +106,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe
_phase = phase;
_stateManager = stateManager;
_currentState = AMQPState.CHANNEL_NOT_OPENED;
- _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+ _serverTimeOut = ClientConfiguration.get().getLong(AMQPConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
}
/**
@@ -125,7 +125,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe
{
_channelOpenOkBody = null;
checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, AMQPConstants.EMPTY_CORRELATION_ID);
_phase.messageSent(msg);
//_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
@@ -156,7 +156,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe
{
_channelCloseOkBody = null;
checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, AMQPConstants.EMPTY_CORRELATION_ID);
_phase.messageSent(msg);
//_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
@@ -193,7 +193,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe
{
checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND);
}
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, AMQPConstants.EMPTY_CORRELATION_ID);
_phase.messageSent(msg);
//_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
@@ -223,7 +223,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe
{
_channelOkBody = null;
checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, AMQPConstants.EMPTY_CORRELATION_ID);
_phase.messageSent(msg);
//_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java
index a38469def5..653882d3c1 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java
@@ -65,7 +65,7 @@ import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.DefaultPhaseContext;
import org.apache.qpid.nclient.core.Phase;
import org.apache.qpid.nclient.core.PhaseContext;
-import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.core.AMQPConstants;
import org.apache.qpid.nclient.transport.AMQPConnectionURL;
import org.apache.qpid.nclient.transport.ConnectionURL;
import org.apache.qpid.nclient.transport.TransportConnection;
@@ -119,16 +119,16 @@ public class QpidAMQPClassFactory implements AMQPClassFactory
if (_amqpConnection == null)
{
PhaseContext ctx = new DefaultPhaseContext();
- ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager);
+ ctx.setProperty(AMQPConstants.EVENT_MANAGER, _eventManager);
TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type, ctx);
_amqpConnection = new QpidAMQPConnection(conn, _stateManager);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(AMQPConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(AMQPConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(AMQPConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(AMQPConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(AMQPConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(AMQPConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection);
}
return _amqpConnection;
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java
index 9b4d776cc5..3c5ca3a949 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java
@@ -48,7 +48,7 @@ import org.apache.qpid.nclient.amqp.state.AMQPStateType;
import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.core.AMQPConstants;
import org.apache.qpid.nclient.transport.TransportConnection;
import org.apache.qpid.nclient.util.AMQPValidator;
@@ -107,7 +107,7 @@ public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodLi
_connection = connection;
_stateManager = stateManager;
_currentState = AMQPState.CONNECTION_UNDEFINED;
- _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+ _serverTimeOut = ClientConfiguration.get().getLong(AMQPConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
}
/**
@@ -159,7 +159,7 @@ public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodLi
{
_connectionSecureBody = null;
checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
+ AMQPMethodEvent msg = new AMQPMethodEvent(AMQPConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
_phase.messageSent(msg);
// _connectionNotSecure.await(_serverTimeOut,TimeUnit.MILLISECONDS);
_connectionNotSecure.await();
@@ -205,7 +205,7 @@ public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodLi
checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
_connectionSecureBody = null; // The server could send a fresh challenge
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
+ AMQPMethodEvent msg = new AMQPMethodEvent(AMQPConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
_phase.messageSent(msg);
//_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
@@ -249,7 +249,7 @@ public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodLi
{
checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED);
_connectionSecureBody = null;
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId);
+ AMQPMethodEvent msg = new AMQPMethodEvent(AMQPConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId);
_phase.messageSent(msg);
notifyState(AMQPState.CONNECTION_NOT_OPENED);
_currentState = AMQPState.CONNECTION_NOT_OPENED;
@@ -274,7 +274,7 @@ public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodLi
_connectionOpenOkBody = null;
checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
+ AMQPMethodEvent msg = new AMQPMethodEvent(AMQPConstants.CHANNEL_ZERO, connectionOpenBody, AMQPConstants.EMPTY_CORRELATION_ID);
_phase.messageSent(msg);
//_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
@@ -306,7 +306,7 @@ public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodLi
{
_connectionCloseOkBody = null;
checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID);
+ AMQPMethodEvent msg = new AMQPMethodEvent(AMQPConstants.CHANNEL_ZERO, connectioncloseBody, AMQPConstants.EMPTY_CORRELATION_ID);
_phase.messageSent(msg);
_connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time");
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
index fd7b6f8ec6..9d21c84290 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
@@ -43,7 +43,7 @@ import org.apache.qpid.nclient.amqp.state.AMQPStateType;
import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.core.AMQPConstants;
import org.apache.qpid.nclient.util.AMQPValidator;
public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMethodListener, AMQPDtxDemarcation
@@ -89,7 +89,7 @@ public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMeth
_phase = phase;
_stateManager = stateManager;
_currentState = AMQPState.DTX_CHANNEL_NOT_SELECTED;
- _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+ _serverTimeOut = ClientConfiguration.get().getLong(AMQPConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
}
/**
@@ -104,7 +104,7 @@ public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMeth
{
_dtxDemarcationSelectOkBody = null;
checkIfValidStateTransition(AMQPState.DTX_CHANNEL_NOT_SELECTED, _currentState, AMQPState.DTX_NOT_STARTED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, dtxDemarcationSelectBody, QpidConstants.EMPTY_CORRELATION_ID);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, dtxDemarcationSelectBody, AMQPConstants.EMPTY_CORRELATION_ID);
_phase.messageSent(msg);
//_dtxNotSelected.await(_serverTimeOut, TimeUnit.MILLISECONDS);
@@ -131,7 +131,7 @@ public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMeth
{
_dtxDemarcationStartOkBody = null;
checkIfValidStateTransition(_validStartStates, _currentState, AMQPState.DTX_STARTED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationStartOkBody, QpidConstants.EMPTY_CORRELATION_ID);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationStartOkBody, AMQPConstants.EMPTY_CORRELATION_ID);
_phase.messageSent(msg);
//_dtxNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
@@ -158,7 +158,7 @@ public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMeth
{
_dtxDemarcationEndOkBody = null;
checkIfValidStateTransition(_validEndStates, _currentState, AMQPState.DTX_END);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationEndOkBody, QpidConstants.EMPTY_CORRELATION_ID);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationEndOkBody, AMQPConstants.EMPTY_CORRELATION_ID);
_phase.messageSent(msg);
//_dtxNotEnd.await(_serverTimeOut, TimeUnit.MILLISECONDS);
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
new file mode 100644
index 0000000000..16e79ca0de
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
@@ -0,0 +1,40 @@
+package org.apache.qpid.nclient.amqp.sample;
+
+import org.apache.qpid.nclient.api.QpidConnection;
+import org.apache.qpid.nclient.api.QpidConstants;
+import org.apache.qpid.nclient.api.QpidExchangeHelper;
+import org.apache.qpid.nclient.api.QpidMessageProducer;
+import org.apache.qpid.nclient.api.QpidQueueHelper;
+import org.apache.qpid.nclient.api.QpidSession;
+import org.apache.qpid.nclient.impl.QpidConnectionImpl;
+
+public class QpidTestClient
+{
+ public static void main(String[] args)
+ {
+ try
+ {
+ QpidConnection con = new QpidConnectionImpl();
+ con.connect("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'");
+
+ QpidSession session = con.createSession(QpidConstants.SESSION_EXPIRY_TIED_TO_CHANNEL);
+ session.open();
+
+ QpidExchangeHelper exchangeHelper = session.getExchangeHelper();
+ exchangeHelper.open();
+ exchangeHelper.declareExchange(false, false, QpidConstants.DIRECT_EXCHANGE_NAME, false, false, false, QpidConstants.DIRECT_EXCHANGE_CLASS);
+
+ QpidQueueHelper queueHelper = session.getQueueHelper();
+ queueHelper.open();
+ queueHelper.declareQueue(false, false, false, false, false, "myQueue");
+ queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "myQueue", "RH");
+
+ QpidMessageProducer messageProducer = session.createProducer();
+ }
+ catch(Exception e)
+ {
+
+ }
+
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConnection.java
index 744b0aa89f..dccf3317ff 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConnection.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConnection.java
@@ -22,9 +22,6 @@ package org.apache.qpid.nclient.api;
public interface QpidConnection
{
- public static final int SESSION_EXPIRY_MAX_TIME = Integer.MAX_VALUE;
- public static final int SESSION_EXPIRY_TIED_TO_CHANNEL = 0;
-
public void connect(String url) throws QpidException;
public QpidSession createSession(int expiryInSeconds) throws QpidException;
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java
new file mode 100644
index 0000000000..96103509cf
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java
@@ -0,0 +1,29 @@
+package org.apache.qpid.nclient.api;
+
+
+public class QpidConstants
+{
+ public static final int SESSION_EXPIRY_TIED_TO_CHANNEL = 0;
+ public static final int SESSION_EXPIRY_MAX_TIME = Integer.MAX_VALUE;
+
+ public final static String TOPIC_EXCHANGE_NAME = "amq.topic";
+
+ public final static String TOPIC_EXCHANGE_CLASS = "topic";
+
+ public final static String DIRECT_EXCHANGE_NAME = "amq.direct";
+
+ public final static String DIRECT_EXCHANGE_CLASS = "direct";
+
+ public final static String HEADERS_EXCHANGE_NAME = "amq.match";
+
+ public final static String HEADERS_EXCHANGE_CLASS = "headers";
+
+ public final static String FANOUT_EXCHANGE_NAME = "amq.fanout";
+
+ public final static String FANOUT_EXCHANGE_CLASS = "fanout";
+
+
+ public final static String SYSTEM_MANAGEMENT_EXCHANGE_NAME = "qpid.sysmgmt";
+
+ public final static String SYSTEM_MANAGEMENT_CLASS = "sysmmgmt";
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageConsumer.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageConsumer.java
index 41adc23727..3053fc3ddc 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageConsumer.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageConsumer.java
@@ -31,4 +31,10 @@ public interface QpidMessageConsumer
public AMQPApplicationMessage receive()throws QpidException;
public AMQPApplicationMessage receive(long timeout, TimeUnit tu)throws QpidException;
+
+ public void messageArrived(AMQPApplicationMessage msg)throws QpidException;
+
+ public void open() throws QpidException;
+
+ public void close() throws QpidException;
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageHelper.java
index ec50d16f9e..0e956d26a4 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageHelper.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageHelper.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.nclient.api;
+import org.apache.qpid.nclient.amqp.AMQPMessage;
+
/**
* used as a helper class to support the Session class
* This reduces the clutter and makes the session class
@@ -31,15 +33,7 @@ package org.apache.qpid.nclient.api;
*/
public interface QpidMessageHelper
{
- public void declareQueue(boolean autoDelete, boolean durable, boolean exclusive,boolean nowait,boolean passive,String queueName) throws QpidException;
-
- public void bindQueue(String exchangeName,boolean nowait,String queueName,String routingKey)throws QpidException;
-
- public void unbindQueue(String exchangeName,String queueName,String routingKey)throws QpidException;
-
- public void purgeQueue(boolean nowait,String queueName)throws QpidException;
-
- public void deleteQueue(boolean ifEmpty, boolean ifUnused, boolean nowait,String queueName)throws QpidException;
+ public AMQPMessage getMessageClass() throws QpidException;
public void open() throws QpidException;
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageProducer.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageProducer.java
index 684b53da42..d962d16af3 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageProducer.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageProducer.java
@@ -25,4 +25,8 @@ import org.apache.qpid.nclient.message.AMQPApplicationMessage;
public interface QpidMessageProducer
{
public void send(boolean disableMessageId,boolean inline,AMQPApplicationMessage msg)throws QpidException;
+
+ public void open() throws QpidException;
+
+ public void close() throws QpidException;
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidSession.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidSession.java
index 7386cc4092..88f1cebbd9 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidSession.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidSession.java
@@ -41,7 +41,7 @@ public interface QpidSession
public QpidMessageProducer createProducer() throws QpidException;
- public QpidMessageConsumer createConsumer() throws QpidException;
+ public QpidMessageConsumer createConsumer(String queueName, boolean noLocal, boolean exclusive) throws QpidException;
public QpidMessageHelper getMessageHelper() throws QpidException;
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java
index 7bc77e02c0..ca42085632 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java
@@ -13,7 +13,7 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.SystemConfiguration;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.log4j.Logger;
-import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.core.AMQPConstants;
import org.apache.qpid.nclient.security.AMQPCallbackHandler;
/**
@@ -48,11 +48,11 @@ public class ClientConfiguration extends CombinedConfiguration {
private InputStream getInputStream()
{
- if (System.getProperty(QpidConstants.CONFIG_FILE_PATH) != null)
+ if (System.getProperty(AMQPConstants.CONFIG_FILE_PATH) != null)
{
try
{
- return new FileInputStream((String)System.getProperty(QpidConstants.CONFIG_FILE_PATH));
+ return new FileInputStream((String)System.getProperty(AMQPConstants.CONFIG_FILE_PATH));
}
catch(Exception e)
{
@@ -68,9 +68,9 @@ public class ClientConfiguration extends CombinedConfiguration {
public static void main(String[] args)
{
- String key = QpidConstants.AMQP_SECURITY + "." +
- QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES + "." +
- QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY;
+ String key = AMQPConstants.AMQP_SECURITY + "." +
+ AMQPConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES + "." +
+ AMQPConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY;
TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister =
new TreeMap<String, Class<? extends SaslClientFactory>>();
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPConstants.java
index 034fc28070..2ee1257782 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPConstants.java
@@ -1,6 +1,6 @@
package org.apache.qpid.nclient.core;
-public interface QpidConstants
+public interface AMQPConstants
{
// Common properties
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
index 9542aab344..f14825ed31 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
@@ -24,7 +24,7 @@ public class PhaseFactory
*/
public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException
{
- String key = QpidConstants.PHASE_PIPE + "." + QpidConstants.PHASE;
+ String key = AMQPConstants.PHASE_PIPE + "." + AMQPConstants.PHASE;
Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>();
List<String> list = ClientConfiguration.get().getList(key);
int index = 0;
@@ -33,7 +33,7 @@ public class PhaseFactory
try
{
Phase temp = (Phase)Class.forName(s).newInstance();
- phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + QpidConstants.INDEX),temp) ;
+ phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + AMQPConstants.INDEX),temp) ;
}
catch(Exception e)
{
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
index 1305500439..e31345aa69 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
@@ -13,7 +13,7 @@ import org.apache.qpid.framing.AMQResponseBody;
import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.AbstractPhase;
-import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.core.AMQPConstants;
/**
* Corressponds to the Layer 2 in AMQP.
@@ -83,7 +83,7 @@ public class ExecutionPhase extends AbstractPhase
public void messageSent(Object msg) throws AMQPException
{
AMQPMethodEvent evt = (AMQPMethodEvent) msg;
- if (evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID)
+ if (evt.getCorrelationId() == AMQPConstants.EMPTY_CORRELATION_ID)
{
// This is a request
AMQFrame frame = handleRequest(evt);
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java
index c5a75d242f..cf7c4e226f 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java
@@ -33,7 +33,7 @@ import org.apache.qpid.framing.AMQResponseBody;
import org.apache.qpid.framing.RequestResponseMappingException;
import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
import org.apache.qpid.nclient.config.ClientConfiguration;
-import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.core.AMQPConstants;
public class ResponseManager
{
@@ -99,7 +99,7 @@ public class ResponseManager
this.connectionId = connectionId;
responseIdCount = 1L;
lastReceivedRequestId = 0L;
- maxAccumulatedResponses = ClientConfiguration.get().getInt(QpidConstants.MAX_ACCUMILATED_RESPONSES);
+ maxAccumulatedResponses = ClientConfiguration.get().getInt(AMQPConstants.MAX_ACCUMILATED_RESPONSES);
responseMap = new ConcurrentHashMap<Long, ResponseStatus>();
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java
index d202bab843..d0a4ab79a5 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java
@@ -12,6 +12,7 @@ import org.apache.qpid.nclient.core.AMQPException;
public abstract class AbstractResource
{
private String _resourceName;
+ private boolean _closed = true;
public AbstractResource(String resourceName)
{
@@ -20,12 +21,13 @@ public abstract class AbstractResource
public void open() throws QpidException
{
+ _closed = false;
try
{
openResource();
}
- catch(AMQPException e)
+ catch(Exception e)
{
throw new QpidException("Error creating " + _resourceName + " due to " + e.getMessage(),e);
}
@@ -33,7 +35,8 @@ public abstract class AbstractResource
public void close() throws QpidException
{
- try
+ _closed = true;
+ try
{
closeResource();
@@ -45,7 +48,15 @@ public abstract class AbstractResource
}
- protected abstract void openResource() throws AMQPException;
+ protected abstract void openResource() throws AMQPException, QpidException;
- protected abstract void closeResource() throws AMQPException;
+ protected abstract void closeResource() throws AMQPException, QpidException;
+
+ public void checkClosed() throws QpidException
+ {
+ if(_closed)
+ {
+ throw new QpidException("The resource you are trying to access is closed");
+ }
+ }
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java
index 054eedfcee..e9809e4c83 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java
@@ -23,6 +23,7 @@ package org.apache.qpid.nclient.impl;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -48,7 +49,6 @@ import org.apache.qpid.nclient.amqp.AMQPChannel;
import org.apache.qpid.nclient.amqp.AMQPClassFactory;
import org.apache.qpid.nclient.amqp.AMQPConnection;
import org.apache.qpid.nclient.amqp.AbstractAMQPClassFactory;
-import org.apache.qpid.nclient.amqp.qpid.QpidAMQPChannel;
import org.apache.qpid.nclient.amqp.state.AMQPState;
import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
import org.apache.qpid.nclient.amqp.state.AMQPStateListener;
@@ -66,7 +66,7 @@ import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionTy
* replaced by the session methods.
*
*/
-public class QpidConnectionImpl implements QpidConnection, AMQPStateListener
+public class QpidConnectionImpl extends AbstractResource implements QpidConnection, AMQPStateListener
{
private static final Logger _logger = Logger.getLogger(QpidConnectionImpl.class);
@@ -89,18 +89,70 @@ public class QpidConnectionImpl implements QpidConnection, AMQPStateListener
private Lock _lock = new ReentrantLock();
- /** ---------------------------------------------
- * Methods from o.a.qpid.client.Connection
- * ----------------------------------------------
+ private AtomicBoolean _closed;
+
+ private AtomicBoolean _opened;
+
+ public QpidConnectionImpl()
+ {
+ super("Connection");
+ }
+
+ /**
+ * -----------------------------------------------------
+ * Methods introduced by AbstractResource
+ * -----------------------------------------------------
*/
+ protected void openResource() throws AMQPException, QpidException
+ {
+ throw new UnsupportedOperationException("open is not defined for this resource");
+ }
- public void close()
+ protected void closeResource() throws AMQPException, QpidException
{
- // handle failover
+ _classFactory = null;
}
+
+ @Override
+ public void checkClosed() throws QpidException
+ {
+ if(_closed.get())
+ {
+ throw new QpidException("The resource you are trying to access is closed");
+ }
+ }
+
+ /** ---------------------------------------------
+ * Methods from o.a.qpid.client.Connection
+ * ----------------------------------------------
+ */
- public void connect(String url) throws QpidException
+ @Override
+ public void close() throws QpidException
{
+ if (!_closed.getAndSet(true))
+ {
+ _lock.lock();
+ try
+ {
+ super.close();
+ _opened.set(false);
+ initiateFailover();
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+ }
+
+ public void connect(String url) throws QpidException
+ {
+ if (_opened.get())
+ {
+ throw new QpidException("The connection is already opened");
+ }
+
try
{
_classFactory = AbstractAMQPClassFactory.getFactoryInstance();
@@ -128,10 +180,13 @@ public class QpidConnectionImpl implements QpidConnection, AMQPStateListener
{
throw new QpidException("Connection negotiation failed due to " + e.getMessage(),e);
}
+
+ _opened.set(true);
}
public QpidSession createSession(int expiryInSeconds) throws QpidException
{
+ checkClosed();
AMQPChannel channel = null;
_lock.lock();
try
@@ -165,20 +220,7 @@ public class QpidConnectionImpl implements QpidConnection, AMQPStateListener
if(event.getNewState() == AMQPState.CONNECTION_CLOSED)
{
- //We need to notify the sessions that they need to
- //kick in the fail over logic
- for (Integer sessionId : _sessionMap.keySet())
- {
- QpidSession session = _sessionMap.get(sessionId);
- try
- {
- session.failover();
- }
- catch(Exception e)
- {
- _logger.error("Error executing failover logic for session : " + sessionId, e);
- }
- }
+ initiateFailover();
}
}
@@ -188,7 +230,7 @@ public class QpidConnectionImpl implements QpidConnection, AMQPStateListener
* ----------------------------------------------
*/
- public void handleConnectionNegotiation() throws Exception
+ private void handleConnectionNegotiation() throws Exception
{
_classFactory.getStateManager().addListener(AMQPStateType.CONNECTION_STATE, this);
@@ -239,4 +281,21 @@ public class QpidConnectionImpl implements QpidConnection, AMQPStateListener
ConnectionOpenOkBody connectionOpenOkBody = _amqpConnection.open(connectionOpenBody);
}
+ private void initiateFailover()
+ {
+ //We need to notify the sessions that they need to
+ //kick in the fail over logic
+ for (Integer sessionId : _sessionMap.keySet())
+ {
+ QpidSession session = _sessionMap.get(sessionId);
+ try
+ {
+ session.failover();
+ }
+ catch(Exception e)
+ {
+ _logger.error("Error executing failover logic for session : " + sessionId, e);
+ }
+ }
+ }
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidExchangeHelperImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidExchangeHelperImpl.java
index 2643fe17e6..2ed54c1d5c 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidExchangeHelperImpl.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidExchangeHelperImpl.java
@@ -32,6 +32,7 @@ public class QpidExchangeHelperImpl extends AbstractResource implements QpidExch
*/
public void declareExchange(boolean autoDelete, boolean durable, String exchangeName,boolean internal, boolean nowait, boolean passive,String exchangeClass) throws QpidException
{
+ checkClosed();
final ExchangeDeclareBody exchangeDeclareBody = ExchangeDeclareBody.createMethodBody(
_session.getMajor(),
_session.getMinor(),
@@ -61,6 +62,7 @@ public class QpidExchangeHelperImpl extends AbstractResource implements QpidExch
public void deleteExchange(String exchangeName, boolean ifUnused, boolean nowait) throws QpidException
{
+ checkClosed();
final ExchangeDeleteBody exchangeDeclareBody = ExchangeDeleteBody.createMethodBody(
_session.getMajor(),
_session.getMinor(),
@@ -88,12 +90,12 @@ public class QpidExchangeHelperImpl extends AbstractResource implements QpidExch
* Methods introduced by AbstractResource
* -----------------------------------------------------
*/
- protected void openResource() throws AMQPException
+ protected void openResource() throws AMQPException, QpidException
{
_exchange = _session.getClassFactory().createExchangeClass(_session.getChannel());
}
- protected void closeResource() throws AMQPException
+ protected void closeResource() throws AMQPException, QpidException
{
_session.getClassFactory().destoryExchangeClass(_session.getChannel(), _exchange);
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java
index 22000a8506..fe0231522c 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java
@@ -24,35 +24,34 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.apache.qpid.framing.Content;
-import org.apache.qpid.framing.MessageAppendBody;
-import org.apache.qpid.framing.MessageCheckpointBody;
-import org.apache.qpid.framing.MessageCloseBody;
-import org.apache.qpid.framing.MessageOpenBody;
-import org.apache.qpid.framing.MessageRecoverBody;
-import org.apache.qpid.framing.MessageResumeBody;
-import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageConsumeBody;
import org.apache.qpid.nclient.amqp.AMQPMessage;
-import org.apache.qpid.nclient.amqp.AMQPMessageCallBack;
import org.apache.qpid.nclient.api.QpidException;
import org.apache.qpid.nclient.api.QpidMessageConsumer;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.message.AMQPApplicationMessage;
-import org.apache.qpid.nclient.message.MessageHeaders;
-import org.apache.qpid.nclient.message.MessageStore;
-import org.apache.qpid.nclient.message.TransientMessageStore;
-public class QpidMessageConsumerImpl extends AbstractResource implements QpidMessageConsumer, AMQPMessageCallBack
+public class QpidMessageConsumerImpl extends AbstractResource implements QpidMessageConsumer
{
- private MessageStore _msgStore = new TransientMessageStore();
private final BlockingQueue<AMQPApplicationMessage> _queue = new LinkedBlockingQueue<AMQPApplicationMessage>();
private QpidSessionImpl _session;
private AMQPMessage _amqpMessage;
+ private String _consumerTag;
+ private String _queueName;
+ private boolean _noLocal;
+ private boolean _exclusive;
- protected QpidMessageConsumerImpl(QpidSessionImpl session)
+ protected QpidMessageConsumerImpl(QpidSessionImpl session,String consumerTag,String queueName,boolean noLocal,boolean exclusive) throws QpidException
{
- super("Message Class");
+ super("Message Consuer");
_session = session;
+ _amqpMessage = session.getMessageHelper().getMessageClass();
+ _consumerTag = consumerTag;
+ _queueName = queueName;
+ _noLocal = noLocal;
+ _exclusive = exclusive;
}
/**
@@ -63,17 +62,20 @@ public class QpidMessageConsumerImpl extends AbstractResource implements QpidMes
public AMQPApplicationMessage get() throws QpidException
{
+ checkClosed();
// I want this to do a message.get
return null;
}
public AMQPApplicationMessage receive()throws QpidException
{
+ checkClosed();
return _queue.poll();
}
public AMQPApplicationMessage receive(long timeout, TimeUnit tu)throws QpidException
{
+ checkClosed();
try
{
return _queue.poll(timeout, tu);
@@ -83,118 +85,104 @@ public class QpidMessageConsumerImpl extends AbstractResource implements QpidMes
throw new QpidException("Error retrieving message from queue",e);
}
}
+
+ public void messageArrived(AMQPApplicationMessage msg)throws QpidException
+ {
+ try
+ {
+ _queue.put(msg);
+ }
+ catch(Exception e)
+ {
+ throw new QpidException("Error queueing the messsage",e);
+ }
+ }
/**
* -----------------------------------------------
* Abstract methods from AbstractResource class
* -----------------------------------------------
*/
- protected void openResource() throws AMQPException
+ protected void openResource() throws AMQPException, QpidException
{
- _amqpMessage = _session.getClassFactory().createMessageClass(_session.getChannel(),null);
+ // Will wait till the dust settles on the message selectors
+
+ final MessageConsumeBody messageConsumeBody =
+ MessageConsumeBody.createMethodBody(
+ _session.getMajor(),
+ _session.getMinor(),
+ new AMQShortString(_consumerTag),// destination/deliveryTag/consumerTag
+ _exclusive, //exclusive
+ null, //filter
+ false, //noAck,
+ _noLocal, //noLocal,
+ new AMQShortString(_queueName), //queue
+ _session.getAccessTicket() //ticket
+ );
+
+ final AMQPCallbackHelper cb = new AMQPCallbackHelper();
+ HelperTemplate template = new HelperTemplate(){
+
+ public void amqpMethodCall() throws AMQPException
+ {
+ _amqpMessage.consume(messageConsumeBody, cb);
+ }
+ };
+
+ template.invokeAMQPMethodCall("Message consume failed due to");
+
+ template.evaulateResponse(cb);
}
- protected void closeResource() throws AMQPException
+ protected void closeResource() throws AMQPException, QpidException
{
- _session.getClassFactory().destoryMessageClass(_session.getChannel(), _amqpMessage);
+ ((QpidMessageHelperImpl)_session.getMessageHelper()).deregisterConsumer(_consumerTag);
+
+ final MessageCancelBody messageCancelBody =
+ MessageCancelBody.createMethodBody(
+ _session.getMajor(),
+ _session.getMinor(),
+ new AMQShortString(_queueName));
+
+ final AMQPCallbackHelper cb = new AMQPCallbackHelper();
+ HelperTemplate template = new HelperTemplate(){
+
+ public void amqpMethodCall() throws AMQPException
+ {
+ _amqpMessage.cancel(messageCancelBody, cb);
+ }
+ };
+
+ template.invokeAMQPMethodCall("Message cancel failed due to");
+
+ template.evaulateResponse(cb);
}
/**
- * -----------------------------------------------
- * Methods from AMQPMessageCallback class
- * -----------------------------------------------
+ * ----------------------------------------------
+ * Getters for Message Consumer properties
+ * No setters are allowed. Once these properties
+ * are set in the constructor they are not allowed
+ * to be modifed.
+ * ----------------------------------------------
*/
- public void append(MessageAppendBody messageAppendBody, long correlationId) throws AMQPException
- {
- String reference = new String(messageAppendBody.getReference());
- AMQPApplicationMessage msg = _msgStore.getMessage(reference);
- msg.addContent(messageAppendBody.getBytes());
- }
-
- public void checkpoint(MessageCheckpointBody messageCheckpointBody, long correlationId) throws AMQPException
- {
- // TODO Auto-generated method stub
- }
-
- public void close(MessageCloseBody messageCloseBody, long correlationId) throws AMQPException
+ public String getConsumerTag()
{
- String reference = new String(messageCloseBody.getReference());
- AMQPApplicationMessage msg = _msgStore.getMessage(reference);
- enQueue(msg);
- _msgStore.removeMessage(reference);
+ return _consumerTag;
}
- public void open(MessageOpenBody messageOpenBody, long correlationId) throws AMQPException
+ public boolean isExclusive()
{
- String reference = new String(messageOpenBody.getReference());
- AMQPApplicationMessage msg = new AMQPApplicationMessage(_session.getChannel(), messageOpenBody.getReference());
- _msgStore.storeMessage(reference, msg);
+ return _exclusive;
}
- public void recover(MessageRecoverBody messageRecoverBody, long correlationId) throws AMQPException
+ public boolean isNoLocal()
{
- // TODO Auto-generated method stub
-
- }
-
- public void resume(MessageResumeBody messageResumeBody, long correlationId) throws AMQPException
- {
- // TODO Auto-generated method stub
-
- }
-
- public void transfer(MessageTransferBody messageTransferBody, long correlationId) throws AMQPException
- {
- MessageHeaders messageHeaders = new MessageHeaders();
- messageHeaders.setMessageId(messageTransferBody.getMessageId());
- messageHeaders.setAppId(messageTransferBody.getAppId());
- messageHeaders.setContentType(messageTransferBody.getContentType());
- messageHeaders.setEncoding(messageTransferBody.getContentEncoding());
- messageHeaders.setCorrelationId(messageTransferBody.getCorrelationId());
- messageHeaders.setDestination(messageTransferBody.getDestination());
- messageHeaders.setExchange(messageTransferBody.getExchange());
- messageHeaders.setExpiration(messageTransferBody.getExpiration());
- messageHeaders.setReplyTo(messageTransferBody.getReplyTo());
- messageHeaders.setRoutingKey(messageTransferBody.getRoutingKey());
- messageHeaders.setTransactionId(messageTransferBody.getTransactionId());
- messageHeaders.setUserId(messageTransferBody.getUserId());
- messageHeaders.setPriority(messageTransferBody.getPriority());
- messageHeaders.setDeliveryMode(messageTransferBody.getDeliveryMode());
- messageHeaders.setApplicationHeaders(messageTransferBody.getApplicationHeaders());
-
-
-
- if (messageTransferBody.getBody().getContentType() == Content.TypeEnum.INLINE_T)
- {
- AMQPApplicationMessage msg = new AMQPApplicationMessage(_session.getChannel(),
- correlationId,
- messageHeaders,
- messageTransferBody.getBody().getContentAsByteArray(),
- messageTransferBody.getRedelivered());
-
- enQueue(msg);
- }
- else
- {
- byte[] referenceId = messageTransferBody.getBody().getContentAsByteArray();
- AMQPApplicationMessage msg = new AMQPApplicationMessage(_session.getChannel(),referenceId);
- msg.setMessageHeaders(messageHeaders);
- msg.setRedeliveredFlag(messageTransferBody.getRedelivered());
-
- _msgStore.storeMessage(new String(referenceId), msg);
- }
+ return _noLocal;
}
- private void enQueue(AMQPApplicationMessage msg)throws AMQPException
+ public String getQueueName()
{
- try
- {
- _queue.put(msg);
- }
- catch(Exception e)
- {
- throw new AMQPException("Error queueing the messsage",e);
- }
+ return _queueName;
}
-
-}
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageHelperImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageHelperImpl.java
new file mode 100644
index 0000000000..bd67a905a1
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageHelperImpl.java
@@ -0,0 +1,186 @@
+package org.apache.qpid.nclient.impl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.nclient.amqp.AMQPMessage;
+import org.apache.qpid.nclient.amqp.AMQPMessageCallBack;
+import org.apache.qpid.nclient.api.QpidException;
+import org.apache.qpid.nclient.api.QpidMessageConsumer;
+import org.apache.qpid.nclient.api.QpidMessageHelper;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.message.AMQPApplicationMessage;
+import org.apache.qpid.nclient.message.MessageHeaders;
+import org.apache.qpid.nclient.message.MessageStore;
+import org.apache.qpid.nclient.message.TransientMessageStore;
+
+public class QpidMessageHelperImpl extends AbstractResource implements QpidMessageHelper, AMQPMessageCallBack
+{
+ private MessageStore _msgStore = new TransientMessageStore();
+ private Map<String,QpidMessageConsumer> _consumers = new ConcurrentHashMap<String,QpidMessageConsumer>();
+ private QpidSessionImpl _session;
+ private AMQPMessage _amqpMessage;
+
+ protected QpidMessageHelperImpl(QpidSessionImpl session)
+ {
+ super("Message Class");
+ _session = session;
+ }
+
+ /**
+ * -----------------------------------------------
+ * Abstract methods from AbstractResource class
+ * -----------------------------------------------
+ */
+ protected void openResource() throws AMQPException, QpidException
+ {
+ _amqpMessage = _session.getClassFactory().createMessageClass(_session.getChannel(),this);
+ }
+
+ protected void closeResource() throws AMQPException, QpidException
+ {
+ _session.getClassFactory().destoryMessageClass(_session.getChannel(), _amqpMessage);
+ for (String consumerTag : _consumers.keySet())
+ {
+ QpidMessageConsumer consumer = _consumers.get(consumerTag);
+ // The close method will deregister itself too.
+ consumer.close();
+ }
+ }
+
+ /**
+ * -----------------------------------------------
+ * methods from QpidMessageHelper class
+ * -----------------------------------------------
+ */
+ public AMQPMessage getMessageClass() throws QpidException
+ {
+ return _amqpMessage;
+ }
+
+ /**
+ * -----------------------------------------------
+ * Methods from AMQPMessageCallback class
+ * -----------------------------------------------
+ */
+ public void append(MessageAppendBody messageAppendBody, long correlationId) throws AMQPException
+ {
+ String reference = new String(messageAppendBody.getReference());
+ AMQPApplicationMessage msg = _msgStore.getMessage(reference);
+ msg.addContent(messageAppendBody.getBytes());
+ }
+
+ public void checkpoint(MessageCheckpointBody messageCheckpointBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+ }
+
+ public void close(MessageCloseBody messageCloseBody, long correlationId) throws AMQPException
+ {
+ String reference = new String(messageCloseBody.getReference());
+ AMQPApplicationMessage msg = _msgStore.getMessage(reference);
+ notifyMessageArrival(msg);
+ _msgStore.removeMessage(reference);
+ }
+
+ public void open(MessageOpenBody messageOpenBody, long correlationId) throws AMQPException
+ {
+ String reference = new String(messageOpenBody.getReference());
+ AMQPApplicationMessage msg = new AMQPApplicationMessage(_session.getChannel(), messageOpenBody.getReference());
+ _msgStore.storeMessage(reference, msg);
+ }
+
+ public void recover(MessageRecoverBody messageRecoverBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void resume(MessageResumeBody messageResumeBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void transfer(MessageTransferBody messageTransferBody, long correlationId) throws AMQPException
+ {
+ MessageHeaders messageHeaders = new MessageHeaders();
+ messageHeaders.setMessageId(messageTransferBody.getMessageId());
+ messageHeaders.setAppId(messageTransferBody.getAppId());
+ messageHeaders.setContentType(messageTransferBody.getContentType());
+ messageHeaders.setEncoding(messageTransferBody.getContentEncoding());
+ messageHeaders.setCorrelationId(messageTransferBody.getCorrelationId());
+ messageHeaders.setDestination(messageTransferBody.getDestination());
+ messageHeaders.setExchange(messageTransferBody.getExchange());
+ messageHeaders.setExpiration(messageTransferBody.getExpiration());
+ messageHeaders.setReplyTo(messageTransferBody.getReplyTo());
+ messageHeaders.setRoutingKey(messageTransferBody.getRoutingKey());
+ messageHeaders.setTransactionId(messageTransferBody.getTransactionId());
+ messageHeaders.setUserId(messageTransferBody.getUserId());
+ messageHeaders.setPriority(messageTransferBody.getPriority());
+ messageHeaders.setDeliveryMode(messageTransferBody.getDeliveryMode());
+ messageHeaders.setApplicationHeaders(messageTransferBody.getApplicationHeaders());
+
+
+
+ if (messageTransferBody.getBody().getContentType() == Content.TypeEnum.INLINE_T)
+ {
+ AMQPApplicationMessage msg = new AMQPApplicationMessage(_session.getChannel(),
+ messageTransferBody.getDestination().asString(),
+ messageHeaders,
+ messageTransferBody.getBody().getContentAsByteArray(),
+ messageTransferBody.getRedelivered());
+
+ notifyMessageArrival(msg);
+ }
+ else
+ {
+ byte[] referenceId = messageTransferBody.getBody().getContentAsByteArray();
+ AMQPApplicationMessage msg = new AMQPApplicationMessage(_session.getChannel(),referenceId);
+ msg.setMessageHeaders(messageHeaders);
+ msg.setRedeliveredFlag(messageTransferBody.getRedelivered());
+
+ _msgStore.storeMessage(new String(referenceId), msg);
+ }
+ }
+
+ /** -----------------------------------------
+ * Methods defined by this class
+ * ------------------------------------------
+ */
+
+ public void registerConsumer(String consumerTag,QpidMessageConsumer messageConsumer)
+ {
+ _consumers.put(consumerTag, messageConsumer);
+ }
+
+ public void deregisterConsumer(String consumerTag)
+ {
+ if(_consumers.containsKey(consumerTag))
+ {
+ _consumers.remove(consumerTag);
+ }
+ // If it's not their no need to worry (or raise an exception)
+ }
+
+ private void notifyMessageArrival(AMQPApplicationMessage msg)
+ {
+ QpidMessageConsumer consumer = _consumers.get(msg.getDeliveryTag());
+ try
+ {
+ consumer.messageArrived(msg);
+ }
+ catch(QpidException e)
+ {
+ // maybe retry and then reject the message
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java
index f553743ea6..91298dcc57 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java
@@ -18,10 +18,11 @@ public class QpidMessageProducerImpl extends AbstractResource implements QpidMes
private QpidSessionImpl _session;
private AMQPMessage _amqpMessage;
- protected QpidMessageProducerImpl(QpidSessionImpl session)
+ protected QpidMessageProducerImpl(QpidSessionImpl session) throws QpidException
{
- super("Message Class");
+ super("Message Producer");
_session = session;
+ _amqpMessage = session.getMessageHelper().getMessageClass();
}
/**
@@ -31,6 +32,7 @@ public class QpidMessageProducerImpl extends AbstractResource implements QpidMes
*/
public void send(boolean disableMessageId,boolean inline,AMQPApplicationMessage msg)throws QpidException
{
+ checkClosed();
// need to handle the inline and reference case
final MessageTransferBody messageTransferBody = prepareTransfer(disableMessageId,msg);
final AMQPCallbackHelper cb = new AMQPCallbackHelper();
@@ -52,14 +54,14 @@ public class QpidMessageProducerImpl extends AbstractResource implements QpidMes
* Methods introduced by AbstractResource
* -----------------------------------------------------
*/
- protected void openResource() throws AMQPException
+ protected void openResource() throws AMQPException, QpidException
{
- _amqpMessage = _session.getClassFactory().createMessageClass(_session.getChannel(),null);
+
}
- protected void closeResource() throws AMQPException
+ protected void closeResource() throws AMQPException, QpidException
{
- _session.getClassFactory().destoryMessageClass(_session.getChannel(), _amqpMessage);
+
}
/**
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidQueueHelperImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidQueueHelperImpl.java
index 33b3c1177e..b95f7c3210 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidQueueHelperImpl.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidQueueHelperImpl.java
@@ -29,6 +29,7 @@ public class QpidQueueHelperImpl extends AbstractResource implements QpidQueueHe
*/
public void bindQueue(String exchangeName, boolean nowait, String queueName, String routingKey) throws QpidException
{
+ checkClosed();
final QueueBindBody queueBindBody = QueueBindBody.createMethodBody(
_session.getMajor(),
_session.getMinor(),
@@ -57,6 +58,7 @@ public class QpidQueueHelperImpl extends AbstractResource implements QpidQueueHe
public void declareQueue(boolean autoDelete, boolean durable, boolean exclusive, boolean nowait, boolean passive, String queueName)
throws QpidException
{
+ checkClosed();
final QueueDeclareBody queueDeclareBody = QueueDeclareBody.createMethodBody(
_session.getMajor(),
_session.getMinor(),
@@ -86,6 +88,7 @@ public class QpidQueueHelperImpl extends AbstractResource implements QpidQueueHe
public void deleteQueue(boolean ifEmpty, boolean ifUnused, boolean nowait, String queueName) throws QpidException
{
+ checkClosed();
final QueueDeleteBody queueDeleteBody = QueueDeleteBody.createMethodBody(
_session.getMajor(),
_session.getMinor(),
@@ -112,6 +115,7 @@ public class QpidQueueHelperImpl extends AbstractResource implements QpidQueueHe
public void purgeQueue(boolean nowait, String queueName) throws QpidException
{
+ checkClosed();
final QueuePurgeBody queuePurgeBody = QueuePurgeBody.createMethodBody(
_session.getMajor(),
_session.getMinor(),
@@ -136,6 +140,7 @@ public class QpidQueueHelperImpl extends AbstractResource implements QpidQueueHe
public void unbindQueue(String exchangeName, String queueName, String routingKey) throws QpidException
{
+ checkClosed();
final QueueUnbindBody queueUnbindBody = QueueUnbindBody.createMethodBody(
_session.getMajor(),
_session.getMinor(),
@@ -166,12 +171,12 @@ public class QpidQueueHelperImpl extends AbstractResource implements QpidQueueHe
* Methods introduced by AbstractResource
* -----------------------------------------------------
*/
- protected void openResource() throws AMQPException
+ protected void openResource() throws AMQPException, QpidException
{
_queueClass = _session.getClassFactory().createQueueClass(_session.getChannel());
}
- protected void closeResource() throws AMQPException
+ protected void closeResource() throws AMQPException, QpidException
{
_session.getClassFactory().destroyQueueClass(_session.getChannel(), _queueClass);
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java
index 7d3cf5f861..c7c62c24d8 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java
@@ -20,7 +20,10 @@
*/
package org.apache.qpid.nclient.impl;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -31,6 +34,7 @@ import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.nclient.amqp.AMQPChannel;
import org.apache.qpid.nclient.amqp.AMQPClassFactory;
import org.apache.qpid.nclient.api.QpidConnection;
+import org.apache.qpid.nclient.api.QpidConstants;
import org.apache.qpid.nclient.api.QpidExchangeHelper;
import org.apache.qpid.nclient.api.QpidMessageConsumer;
import org.apache.qpid.nclient.api.QpidMessageHelper;
@@ -45,7 +49,7 @@ import org.apache.qpid.protocol.AMQConstant;
/**
* According to the 0-9 spec, the session is built on channels(1-1 map) and when a channel is closed
* the session should be closed. However with the introdution of the session class in 0-10
- * this may change. Therefore I will not implement that logic yet.
+ * this will change. Therefore I will not implement failover logic yet.
*
* Once the dust settles there will be a Failover Helper that will manage the sessions
* failover logic.
@@ -60,13 +64,14 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession
private int _ticket = 0; //currently useless
private QpidExchangeHelperImpl _qpidExchangeHelper;
private QpidQueueHelperImpl _qpidQueueHelper;
- private QpidMessageConsumerImpl _qpidMessageConsumer;
- private QpidMessageProducerImpl _qpidMessageProducer;
+ private QpidMessageHelperImpl _qpidMessageHelper;
+ private List<QpidMessageProducerImpl> _producers = new ArrayList<QpidMessageProducerImpl>();
private AtomicBoolean _closed;
+ private AtomicInteger _consumerTag;
private Lock _sessionCloseLock = new ReentrantLock();
-
+
// this will be used as soon as Session class is finalized
- private int _expiryInSeconds = QpidConnection.SESSION_EXPIRY_TIED_TO_CHANNEL;
+ private int _expiryInSeconds = QpidConstants.SESSION_EXPIRY_TIED_TO_CHANNEL;
private QpidConnection _qpidConnection;
public QpidSessionImpl(AMQPClassFactory classFactory,AMQPChannel channelClass,int channel,byte major, byte minor)
@@ -85,13 +90,23 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession
* Methods introduced by AbstractResource
* -----------------------------------------------------
*/
- protected void openResource() throws AMQPException
+ protected void openResource() throws AMQPException, QpidException
{
// These methods will be changed to session methods
openChannel();
+
+ //initialize method helper
+ _qpidMessageHelper = new QpidMessageHelperImpl(this);
+ _qpidMessageHelper.open();
+
+ _qpidExchangeHelper = new QpidExchangeHelperImpl(this);
+ _qpidExchangeHelper.open();
+
+ _qpidQueueHelper = new QpidQueueHelperImpl(this);
+ _qpidQueueHelper.open();
}
- protected void closeResource() throws AMQPException
+ protected void closeResource() throws AMQPException, QpidException
{
ChannelCloseBody channelCloseBody = ChannelCloseBody.createMethodBody(_major, _minor,
0, //classId
@@ -110,13 +125,23 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession
{
_qpidExchangeHelper.closeResource();
}
- if(_qpidMessageConsumer != null)
+ if(_qpidMessageHelper != null)
{
- _qpidMessageConsumer.closeResource();
+ // The MessageHelper will close the Message Consumers too
+ _qpidMessageHelper.closeResource();
+ }
+ for (QpidMessageProducerImpl producer: _producers)
+ {
+ producer.close();
}
- if(_qpidMessageProducer != null)
+ }
+
+ @Override
+ public void checkClosed() throws QpidException
+ {
+ if(_closed.get())
{
- _qpidMessageProducer.closeResource();
+ throw new QpidException("The resource you are trying to access is closed");
}
}
@@ -125,6 +150,7 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession
* Methods introduced by QpidSession
* -----------------------------------------------------
*/
+ @Override
public void close() throws QpidException
{
if (!_closed.getAndSet(true))
@@ -139,11 +165,22 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession
_sessionCloseLock.unlock();
}
}
- }
+ }
public void resume() throws QpidException
{
-
+ if (!_closed.getAndSet(false))
+ {
+ _sessionCloseLock.lock();
+ try
+ {
+ super.open();
+ }
+ finally
+ {
+ _sessionCloseLock.unlock();
+ }
+ }
}
// not intended to be used at the jms layer
@@ -154,7 +191,7 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession
public void failover() throws QpidException
{
- if(_expiryInSeconds == QpidConnection.SESSION_EXPIRY_TIED_TO_CHANNEL)
+ if(_expiryInSeconds == QpidConstants.SESSION_EXPIRY_TIED_TO_CHANNEL)
{
// then close the session
}
@@ -164,24 +201,23 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession
}
}
- public QpidMessageConsumer createConsumer() throws QpidException
+ public QpidMessageConsumer createConsumer(String queueName, boolean noLocal, boolean exclusive) throws QpidException
{
- if (_qpidMessageConsumer == null)
- {
- _qpidMessageConsumer = new QpidMessageConsumerImpl(this);
- _qpidMessageConsumer.open();
- }
- return _qpidMessageConsumer;
+ checkClosed();
+ String consumerTag = String.valueOf(_consumerTag.incrementAndGet());
+ QpidMessageConsumerImpl qpidMessageConsumer = new QpidMessageConsumerImpl(this,consumerTag,queueName,noLocal,exclusive);
+ _qpidMessageHelper.registerConsumer(consumerTag, qpidMessageConsumer);
+
+ return qpidMessageConsumer;
}
public QpidMessageProducer createProducer() throws QpidException
{
- if (_qpidMessageProducer == null)
- {
- _qpidMessageProducer = new QpidMessageProducerImpl(this);
- _qpidMessageProducer.open();
- }
- return _qpidMessageProducer;
+ checkClosed();
+ QpidMessageProducerImpl qpidMessageProducer = new QpidMessageProducerImpl(this);
+ _producers.add(qpidMessageProducer);
+
+ return qpidMessageProducer;
}
/** ------------------------------------------
@@ -192,32 +228,25 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession
*/
public QpidExchangeHelper getExchangeHelper() throws QpidException
{
- if (_qpidExchangeHelper == null)
- {
- _qpidExchangeHelper = new QpidExchangeHelperImpl(this);
- _qpidExchangeHelper.open();
- }
+ checkClosed();
return _qpidExchangeHelper;
}
public QpidMessageHelper getMessageHelper() throws QpidException
{
- // TODO Auto-generated method stub
- return null;
+ checkClosed();
+ return _qpidMessageHelper;
}
public QpidQueueHelper getQueueHelper() throws QpidException
{
- if (_qpidQueueHelper == null)
- {
- _qpidQueueHelper = new QpidQueueHelperImpl(this);
- _qpidQueueHelper.open();
- }
+ checkClosed();
return _qpidQueueHelper;
}
public QpidTransactionHelper getTransactionHelper()throws QpidException
{
+ checkClosed();
return null;
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
index 79302540be..29e61f623f 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
@@ -31,7 +31,7 @@ public class AMQPApplicationMessage {
private int channelId;
private byte[] referenceId;
private List<byte[]> contents = new LinkedList<byte[]>();
- private long deliveryTag;
+ private String deliveryTag;
private boolean redeliveredFlag;
private MessageHeaders messageHeaders;
@@ -41,7 +41,7 @@ public class AMQPApplicationMessage {
this.referenceId = referenceId;
}
- public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, boolean redeliveredFlag)
+ public AMQPApplicationMessage(int channelId, String deliveryTag, MessageHeaders messageHeaders, boolean redeliveredFlag)
{
this.channelId = channelId;
this.deliveryTag = deliveryTag;
@@ -49,7 +49,7 @@ public class AMQPApplicationMessage {
this.redeliveredFlag = redeliveredFlag;
}
- public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, byte[] content, boolean redeliveredFlag)
+ public AMQPApplicationMessage(int channelId, String deliveryTag, MessageHeaders messageHeaders, byte[] content, boolean redeliveredFlag)
{
this.channelId = channelId;
this.deliveryTag = deliveryTag;
@@ -95,7 +95,7 @@ public class AMQPApplicationMessage {
return buf.array();
}
- public long getDeliveryTag()
+ public String getDeliveryTag()
{
return deliveryTag;
}
@@ -117,7 +117,7 @@ public class AMQPApplicationMessage {
new String(contents.get(0));
}
- public void setDeliveryTag(long deliveryTag)
+ public void setDeliveryTag(String deliveryTag)
{
this.deliveryTag = deliveryTag;
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java
index d845059ee7..02e3117202 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java
@@ -11,7 +11,7 @@ import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.AbstractPhase;
import org.apache.qpid.nclient.core.Phase;
import org.apache.qpid.nclient.core.PhaseContext;
-import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.core.AMQPConstants;
/**
* This Phase handles Layer 3 functionality of the AMQP spec.
@@ -59,7 +59,7 @@ public class ModelPhase extends AbstractPhase {
public void notifyMethodListerners(AMQPMethodEvent event) throws AMQPException
{
- AMQPEventManager eventManager = (AMQPEventManager)_ctx.getProperty(QpidConstants.EVENT_MANAGER);
+ AMQPEventManager eventManager = (AMQPEventManager)_ctx.getProperty(AMQPConstants.EVENT_MANAGER);
eventManager.notifyEvent(event);
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java
index 428cd6753d..919b668662 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java
@@ -26,7 +26,7 @@ import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.qpid.nclient.config.ClientConfiguration;
-import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.core.AMQPConstants;
public class CallbackHandlerRegistry
{
@@ -62,9 +62,9 @@ public class CallbackHandlerRegistry
private void parseProperties()
{
- String key = QpidConstants.AMQP_SECURITY + "." +
- QpidConstants.AMQP_SECURITY_MECHANISMS + "." +
- QpidConstants.AMQP_SECURITY_MECHANISM_HANDLER;
+ String key = AMQPConstants.AMQP_SECURITY + "." +
+ AMQPConstants.AMQP_SECURITY_MECHANISMS + "." +
+ AMQPConstants.AMQP_SECURITY_MECHANISM_HANDLER;
int index = ClientConfiguration.get().getMaxIndex(key);
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java
index 958c6c4782..e4f2cb163d 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java
@@ -29,7 +29,7 @@ import javax.security.sasl.SaslClientFactory;
import org.apache.log4j.Logger;
import org.apache.qpid.nclient.config.ClientConfiguration;
-import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.core.AMQPConstants;
public class DynamicSaslRegistrar
{
@@ -47,12 +47,12 @@ public class DynamicSaslRegistrar
private static Map<String, Class<? extends SaslClientFactory>> parseProperties()
{
- List<String> mechanisms = ClientConfiguration.get().getList(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES);
+ List<String> mechanisms = ClientConfiguration.get().getList(AMQPConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES);
TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister =
new TreeMap<String, Class<? extends SaslClientFactory>>();
for (String mechanism: mechanisms)
{
- String className = ClientConfiguration.get().getString(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY + "_" + mechanism);
+ String className = ClientConfiguration.get().getString(AMQPConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY + "_" + mechanism);
try
{
Class<?> clazz = Class.forName(className);
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java
index 734aa68a9d..2ce03d7577 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java
@@ -13,7 +13,7 @@ import org.apache.qpid.nclient.core.DefaultPhaseContext;
import org.apache.qpid.nclient.core.Phase;
import org.apache.qpid.nclient.core.PhaseContext;
import org.apache.qpid.nclient.core.PhaseFactory;
-import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.core.AMQPConstants;
import org.apache.qpid.pool.ReadWriteThreadModel;
public class TCPConnection implements TransportConnection
@@ -29,11 +29,11 @@ public class TCPConnection implements TransportConnection
_brokerDetails = url.getBrokerDetails(0);
_ctx = ctx;
- ByteBuffer.setUseDirectBuffers(ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_DIRECT_BUFFERS));
+ ByteBuffer.setUseDirectBuffers(ClientConfiguration.get().getBoolean(AMQPConstants.ENABLE_DIRECT_BUFFERS));
// the MINA default is currently to use the pooled allocator although this may change in future
// once more testing of the performance of the simple allocator has been done
- if (ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_POOLED_ALLOCATOR))
+ if (ClientConfiguration.get().getBoolean(AMQPConstants.ENABLE_POOLED_ALLOCATOR))
{
// Not sure what the original code wanted use :)
}
@@ -48,22 +48,22 @@ public class TCPConnection implements TransportConnection
// if we do not use our own thread model we get the MINA default which is to use
// its own leader-follower model
- if (ClientConfiguration.get().getBoolean(QpidConstants.USE_SHARED_READ_WRITE_POOL))
+ if (ClientConfiguration.get().getBoolean(AMQPConstants.USE_SHARED_READ_WRITE_POOL))
{
cfg.setThreadModel(ReadWriteThreadModel.getInstance());
}
SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
- scfg.setTcpNoDelay(ClientConfiguration.get().getBoolean(QpidConstants.TCP_NO_DELAY));
- scfg.setSendBufferSize(ClientConfiguration.get().getInt(QpidConstants.SEND_BUFFER_SIZE_IN_KB)*1024);
- scfg.setReceiveBufferSize(ClientConfiguration.get().getInt(QpidConstants.RECEIVE_BUFFER_SIZE_IN_KB)*1024);
+ scfg.setTcpNoDelay(ClientConfiguration.get().getBoolean(AMQPConstants.TCP_NO_DELAY));
+ scfg.setSendBufferSize(ClientConfiguration.get().getInt(AMQPConstants.SEND_BUFFER_SIZE_IN_KB)*1024);
+ scfg.setReceiveBufferSize(ClientConfiguration.get().getInt(AMQPConstants.RECEIVE_BUFFER_SIZE_IN_KB)*1024);
}
// Returns the phase pipe
public Phase connect() throws AMQPException
{
- _ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
- _ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
+ _ctx.setProperty(AMQPConstants.AMQP_BROKER_DETAILS,_brokerDetails);
+ _ctx.setProperty(AMQPConstants.MINA_IO_CONNECTOR,_ioConnector);
_phase = PhaseFactory.createPhasePipe(_ctx);
_phase.start();
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java
index 911e855d4f..2c618cfe81 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java
@@ -44,7 +44,7 @@ import org.apache.qpid.framing.ProtocolVersionList;
import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.AbstractPhase;
-import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.core.AMQPConstants;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.ssl.BogusSSLContextFactory;
@@ -72,8 +72,8 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol
public void start()throws AMQPException
{
- _brokerDetails = (BrokerDetails)_ctx.getProperty(QpidConstants.AMQP_BROKER_DETAILS);
- IoConnector ioConnector = (IoConnector)_ctx.getProperty(QpidConstants.MINA_IO_CONNECTOR);
+ _brokerDetails = (BrokerDetails)_ctx.getProperty(AMQPConstants.AMQP_BROKER_DETAILS);
+ IoConnector ioConnector = (IoConnector)_ctx.getProperty(AMQPConstants.MINA_IO_CONNECTOR);
final SocketAddress address;
if (ioConnector instanceof VmPipeConnector)
@@ -179,7 +179,7 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol
new AMQCodecFactory(false));
if (ClientConfiguration.get().getBoolean(
- QpidConstants.USE_SHARED_READ_WRITE_POOL))
+ AMQPConstants.USE_SHARED_READ_WRITE_POOL))
{
session.getFilterChain().addBefore("AsynchronousWriteFilter",
"protocolFilter", pcf);
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java
index ba38848149..db38fdb528 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java
@@ -15,7 +15,7 @@ import org.apache.qpid.nclient.core.DefaultPhaseContext;
import org.apache.qpid.nclient.core.Phase;
import org.apache.qpid.nclient.core.PhaseContext;
import org.apache.qpid.nclient.core.PhaseFactory;
-import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.core.AMQPConstants;
import org.apache.qpid.pool.PoolingFilter;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
@@ -48,8 +48,8 @@ public class VMConnection implements TransportConnection
{
createVMBroker();
- _ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
- _ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
+ _ctx.setProperty(AMQPConstants.AMQP_BROKER_DETAILS,_brokerDetails);
+ _ctx.setProperty(AMQPConstants.MINA_IO_CONNECTOR,_ioConnector);
_phase = PhaseFactory.createPhasePipe(_ctx);
_phase.start();
@@ -86,7 +86,7 @@ public class VMConnection implements TransportConnection
private IoHandlerAdapter createBrokerInstance(int port) throws AMQPException
{
- String protocolProviderClass = ClientConfiguration.get().getString(QpidConstants.QPID_VM_BROKER_CLASS);
+ String protocolProviderClass = ClientConfiguration.get().getString(AMQPConstants.QPID_VM_BROKER_CLASS);
_logger.info("Creating Qpid protocol provider: " + protocolProviderClass);
// can't use introspection to get Provider as it is a server class.