diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-05-30 17:19:51 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-05-30 17:19:51 +0000 |
commit | c21fe3e31285fe01b0f14f3d5b2c920edd6b2546 (patch) | |
tree | e1daa1e8a2bde10db285ffbb7a1d881e0336c746 | |
parent | 48f73ce3731366fd1b014faa3bbaa2820f41e8bb (diff) | |
download | qpid-python-c21fe3e31285fe01b0f14f3d5b2c920edd6b2546.tar.gz |
bug fixes and enchancements for Qpid java client
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@542880 13f79535-47bb-0310-9956-ffa450edef68
33 files changed, 623 insertions, 271 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java index f984e812c2..42a3d4bb3f 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java @@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.core.AMQPConstants; public abstract class AMQPCallBackSupport { @@ -52,14 +52,14 @@ public abstract class AMQPCallBackSupport { if(noWait) { - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,AMQPConstants.EMPTY_CORRELATION_ID); return msg; } else { // u only need to register if u are expecting a response long localCorrelationId = getNextCorrelationId(); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,AMQPConstants.EMPTY_CORRELATION_ID,localCorrelationId); _cbMap.put(localCorrelationId, cb); return msg; } @@ -68,7 +68,7 @@ public abstract class AMQPCallBackSupport protected AMQPMethodEvent handleAsynchronousCall(AMQMethodBody methodBody,AMQPCallBack cb) { long localCorrelationId = getNextCorrelationId(); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,AMQPConstants.EMPTY_CORRELATION_ID,localCorrelationId); _cbMap.put(localCorrelationId, cb); return msg; } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java index 6d0e83bb7e..094b94f5cb 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java @@ -22,13 +22,13 @@ package org.apache.qpid.nclient.amqp; import org.apache.qpid.nclient.config.ClientConfiguration; import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.core.AMQPConstants; public class AbstractAMQPClassFactory { public static AMQPClassFactory getFactoryInstance() throws AMQPException { - String className = ClientConfiguration.get().getString(QpidConstants.AMQP_CLASS_FACTORY); + String className = ClientConfiguration.get().getString(AMQPConstants.AMQP_CLASS_FACTORY); try { return (AMQPClassFactory)Class.forName(className).newInstance(); diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java index decb120796..0a019a3d28 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java @@ -46,7 +46,7 @@ import org.apache.qpid.nclient.amqp.state.AMQPStateType; import org.apache.qpid.nclient.config.ClientConfiguration; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.Phase; -import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.core.AMQPConstants; import org.apache.qpid.nclient.util.AMQPValidator; /** @@ -106,7 +106,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe _phase = phase; _stateManager = stateManager; _currentState = AMQPState.CHANNEL_NOT_OPENED; - _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); + _serverTimeOut = ClientConfiguration.get().getLong(AMQPConstants.SERVER_TIMEOUT_IN_MILLISECONDS); } /** @@ -125,7 +125,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe { _channelOpenOkBody = null; checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, AMQPConstants.EMPTY_CORRELATION_ID); _phase.messageSent(msg); //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS); @@ -156,7 +156,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe { _channelCloseOkBody = null; checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, AMQPConstants.EMPTY_CORRELATION_ID); _phase.messageSent(msg); //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); @@ -193,7 +193,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe { checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND); } - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, AMQPConstants.EMPTY_CORRELATION_ID); _phase.messageSent(msg); //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS); @@ -223,7 +223,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe { _channelOkBody = null; checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, AMQPConstants.EMPTY_CORRELATION_ID); _phase.messageSent(msg); //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS); diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java index a38469def5..653882d3c1 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java @@ -65,7 +65,7 @@ import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.DefaultPhaseContext; import org.apache.qpid.nclient.core.Phase; import org.apache.qpid.nclient.core.PhaseContext; -import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.core.AMQPConstants; import org.apache.qpid.nclient.transport.AMQPConnectionURL; import org.apache.qpid.nclient.transport.ConnectionURL; import org.apache.qpid.nclient.transport.TransportConnection; @@ -119,16 +119,16 @@ public class QpidAMQPClassFactory implements AMQPClassFactory if (_amqpConnection == null) { PhaseContext ctx = new DefaultPhaseContext(); - ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager); + ctx.setProperty(AMQPConstants.EVENT_MANAGER, _eventManager); TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type, ctx); _amqpConnection = new QpidAMQPConnection(conn, _stateManager); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection); + _eventManager.addMethodEventListener(AMQPConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection); + _eventManager.addMethodEventListener(AMQPConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection); + _eventManager.addMethodEventListener(AMQPConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection); + _eventManager.addMethodEventListener(AMQPConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection); + _eventManager.addMethodEventListener(AMQPConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection); + _eventManager.addMethodEventListener(AMQPConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection); } return _amqpConnection; } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java index 9b4d776cc5..3c5ca3a949 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java @@ -48,7 +48,7 @@ import org.apache.qpid.nclient.amqp.state.AMQPStateType; import org.apache.qpid.nclient.config.ClientConfiguration; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.Phase; -import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.core.AMQPConstants; import org.apache.qpid.nclient.transport.TransportConnection; import org.apache.qpid.nclient.util.AMQPValidator; @@ -107,7 +107,7 @@ public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodLi _connection = connection; _stateManager = stateManager; _currentState = AMQPState.CONNECTION_UNDEFINED; - _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); + _serverTimeOut = ClientConfiguration.get().getLong(AMQPConstants.SERVER_TIMEOUT_IN_MILLISECONDS); } /** @@ -159,7 +159,7 @@ public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodLi { _connectionSecureBody = null; checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId); + AMQPMethodEvent msg = new AMQPMethodEvent(AMQPConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId); _phase.messageSent(msg); // _connectionNotSecure.await(_serverTimeOut,TimeUnit.MILLISECONDS); _connectionNotSecure.await(); @@ -205,7 +205,7 @@ public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodLi checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED); _connectionSecureBody = null; // The server could send a fresh challenge - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId); + AMQPMethodEvent msg = new AMQPMethodEvent(AMQPConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId); _phase.messageSent(msg); //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS); @@ -249,7 +249,7 @@ public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodLi { checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED); _connectionSecureBody = null; - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId); + AMQPMethodEvent msg = new AMQPMethodEvent(AMQPConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId); _phase.messageSent(msg); notifyState(AMQPState.CONNECTION_NOT_OPENED); _currentState = AMQPState.CONNECTION_NOT_OPENED; @@ -274,7 +274,7 @@ public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodLi _connectionOpenOkBody = null; checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID); + AMQPMethodEvent msg = new AMQPMethodEvent(AMQPConstants.CHANNEL_ZERO, connectionOpenBody, AMQPConstants.EMPTY_CORRELATION_ID); _phase.messageSent(msg); //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS); @@ -306,7 +306,7 @@ public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodLi { _connectionCloseOkBody = null; checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID); + AMQPMethodEvent msg = new AMQPMethodEvent(AMQPConstants.CHANNEL_ZERO, connectioncloseBody, AMQPConstants.EMPTY_CORRELATION_ID); _phase.messageSent(msg); _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time"); diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java index fd7b6f8ec6..9d21c84290 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java @@ -43,7 +43,7 @@ import org.apache.qpid.nclient.amqp.state.AMQPStateType; import org.apache.qpid.nclient.config.ClientConfiguration; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.Phase; -import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.core.AMQPConstants; import org.apache.qpid.nclient.util.AMQPValidator; public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMethodListener, AMQPDtxDemarcation @@ -89,7 +89,7 @@ public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMeth _phase = phase; _stateManager = stateManager; _currentState = AMQPState.DTX_CHANNEL_NOT_SELECTED; - _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); + _serverTimeOut = ClientConfiguration.get().getLong(AMQPConstants.SERVER_TIMEOUT_IN_MILLISECONDS); } /** @@ -104,7 +104,7 @@ public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMeth { _dtxDemarcationSelectOkBody = null; checkIfValidStateTransition(AMQPState.DTX_CHANNEL_NOT_SELECTED, _currentState, AMQPState.DTX_NOT_STARTED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, dtxDemarcationSelectBody, QpidConstants.EMPTY_CORRELATION_ID); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, dtxDemarcationSelectBody, AMQPConstants.EMPTY_CORRELATION_ID); _phase.messageSent(msg); //_dtxNotSelected.await(_serverTimeOut, TimeUnit.MILLISECONDS); @@ -131,7 +131,7 @@ public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMeth { _dtxDemarcationStartOkBody = null; checkIfValidStateTransition(_validStartStates, _currentState, AMQPState.DTX_STARTED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationStartOkBody, QpidConstants.EMPTY_CORRELATION_ID); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationStartOkBody, AMQPConstants.EMPTY_CORRELATION_ID); _phase.messageSent(msg); //_dtxNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS); @@ -158,7 +158,7 @@ public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMeth { _dtxDemarcationEndOkBody = null; checkIfValidStateTransition(_validEndStates, _currentState, AMQPState.DTX_END); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationEndOkBody, QpidConstants.EMPTY_CORRELATION_ID); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationEndOkBody, AMQPConstants.EMPTY_CORRELATION_ID); _phase.messageSent(msg); //_dtxNotEnd.await(_serverTimeOut, TimeUnit.MILLISECONDS); diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java new file mode 100644 index 0000000000..16e79ca0de --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java @@ -0,0 +1,40 @@ +package org.apache.qpid.nclient.amqp.sample; + +import org.apache.qpid.nclient.api.QpidConnection; +import org.apache.qpid.nclient.api.QpidConstants; +import org.apache.qpid.nclient.api.QpidExchangeHelper; +import org.apache.qpid.nclient.api.QpidMessageProducer; +import org.apache.qpid.nclient.api.QpidQueueHelper; +import org.apache.qpid.nclient.api.QpidSession; +import org.apache.qpid.nclient.impl.QpidConnectionImpl; + +public class QpidTestClient +{ + public static void main(String[] args) + { + try + { + QpidConnection con = new QpidConnectionImpl(); + con.connect("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'"); + + QpidSession session = con.createSession(QpidConstants.SESSION_EXPIRY_TIED_TO_CHANNEL); + session.open(); + + QpidExchangeHelper exchangeHelper = session.getExchangeHelper(); + exchangeHelper.open(); + exchangeHelper.declareExchange(false, false, QpidConstants.DIRECT_EXCHANGE_NAME, false, false, false, QpidConstants.DIRECT_EXCHANGE_CLASS); + + QpidQueueHelper queueHelper = session.getQueueHelper(); + queueHelper.open(); + queueHelper.declareQueue(false, false, false, false, false, "myQueue"); + queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "myQueue", "RH"); + + QpidMessageProducer messageProducer = session.createProducer(); + } + catch(Exception e) + { + + } + + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConnection.java index 744b0aa89f..dccf3317ff 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConnection.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConnection.java @@ -22,9 +22,6 @@ package org.apache.qpid.nclient.api; public interface QpidConnection { - public static final int SESSION_EXPIRY_MAX_TIME = Integer.MAX_VALUE; - public static final int SESSION_EXPIRY_TIED_TO_CHANNEL = 0; - public void connect(String url) throws QpidException; public QpidSession createSession(int expiryInSeconds) throws QpidException; diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java new file mode 100644 index 0000000000..96103509cf --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java @@ -0,0 +1,29 @@ +package org.apache.qpid.nclient.api; + + +public class QpidConstants +{ + public static final int SESSION_EXPIRY_TIED_TO_CHANNEL = 0; + public static final int SESSION_EXPIRY_MAX_TIME = Integer.MAX_VALUE; + + public final static String TOPIC_EXCHANGE_NAME = "amq.topic"; + + public final static String TOPIC_EXCHANGE_CLASS = "topic"; + + public final static String DIRECT_EXCHANGE_NAME = "amq.direct"; + + public final static String DIRECT_EXCHANGE_CLASS = "direct"; + + public final static String HEADERS_EXCHANGE_NAME = "amq.match"; + + public final static String HEADERS_EXCHANGE_CLASS = "headers"; + + public final static String FANOUT_EXCHANGE_NAME = "amq.fanout"; + + public final static String FANOUT_EXCHANGE_CLASS = "fanout"; + + + public final static String SYSTEM_MANAGEMENT_EXCHANGE_NAME = "qpid.sysmgmt"; + + public final static String SYSTEM_MANAGEMENT_CLASS = "sysmmgmt"; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageConsumer.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageConsumer.java index 41adc23727..3053fc3ddc 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageConsumer.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageConsumer.java @@ -31,4 +31,10 @@ public interface QpidMessageConsumer public AMQPApplicationMessage receive()throws QpidException; public AMQPApplicationMessage receive(long timeout, TimeUnit tu)throws QpidException; + + public void messageArrived(AMQPApplicationMessage msg)throws QpidException; + + public void open() throws QpidException; + + public void close() throws QpidException; } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageHelper.java index ec50d16f9e..0e956d26a4 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageHelper.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageHelper.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.nclient.api; +import org.apache.qpid.nclient.amqp.AMQPMessage; + /** * used as a helper class to support the Session class * This reduces the clutter and makes the session class @@ -31,15 +33,7 @@ package org.apache.qpid.nclient.api; */ public interface QpidMessageHelper { - public void declareQueue(boolean autoDelete, boolean durable, boolean exclusive,boolean nowait,boolean passive,String queueName) throws QpidException; - - public void bindQueue(String exchangeName,boolean nowait,String queueName,String routingKey)throws QpidException; - - public void unbindQueue(String exchangeName,String queueName,String routingKey)throws QpidException; - - public void purgeQueue(boolean nowait,String queueName)throws QpidException; - - public void deleteQueue(boolean ifEmpty, boolean ifUnused, boolean nowait,String queueName)throws QpidException; + public AMQPMessage getMessageClass() throws QpidException; public void open() throws QpidException; diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageProducer.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageProducer.java index 684b53da42..d962d16af3 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageProducer.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageProducer.java @@ -25,4 +25,8 @@ import org.apache.qpid.nclient.message.AMQPApplicationMessage; public interface QpidMessageProducer { public void send(boolean disableMessageId,boolean inline,AMQPApplicationMessage msg)throws QpidException; + + public void open() throws QpidException; + + public void close() throws QpidException; } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidSession.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidSession.java index 7386cc4092..88f1cebbd9 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidSession.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidSession.java @@ -41,7 +41,7 @@ public interface QpidSession public QpidMessageProducer createProducer() throws QpidException; - public QpidMessageConsumer createConsumer() throws QpidException; + public QpidMessageConsumer createConsumer(String queueName, boolean noLocal, boolean exclusive) throws QpidException; public QpidMessageHelper getMessageHelper() throws QpidException; diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java index 7bc77e02c0..ca42085632 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java @@ -13,7 +13,7 @@ import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.SystemConfiguration; import org.apache.commons.configuration.XMLConfiguration; import org.apache.log4j.Logger; -import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.core.AMQPConstants; import org.apache.qpid.nclient.security.AMQPCallbackHandler; /** @@ -48,11 +48,11 @@ public class ClientConfiguration extends CombinedConfiguration { private InputStream getInputStream() { - if (System.getProperty(QpidConstants.CONFIG_FILE_PATH) != null) + if (System.getProperty(AMQPConstants.CONFIG_FILE_PATH) != null) { try { - return new FileInputStream((String)System.getProperty(QpidConstants.CONFIG_FILE_PATH)); + return new FileInputStream((String)System.getProperty(AMQPConstants.CONFIG_FILE_PATH)); } catch(Exception e) { @@ -68,9 +68,9 @@ public class ClientConfiguration extends CombinedConfiguration { public static void main(String[] args) { - String key = QpidConstants.AMQP_SECURITY + "." + - QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES + "." + - QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY; + String key = AMQPConstants.AMQP_SECURITY + "." + + AMQPConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES + "." + + AMQPConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY; TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister = new TreeMap<String, Class<? extends SaslClientFactory>>(); diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPConstants.java index 034fc28070..2ee1257782 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPConstants.java @@ -1,6 +1,6 @@ package org.apache.qpid.nclient.core; -public interface QpidConstants +public interface AMQPConstants { // Common properties diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java index 9542aab344..f14825ed31 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java @@ -24,7 +24,7 @@ public class PhaseFactory */ public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException { - String key = QpidConstants.PHASE_PIPE + "." + QpidConstants.PHASE; + String key = AMQPConstants.PHASE_PIPE + "." + AMQPConstants.PHASE; Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>(); List<String> list = ClientConfiguration.get().getList(key); int index = 0; @@ -33,7 +33,7 @@ public class PhaseFactory try { Phase temp = (Phase)Class.forName(s).newInstance(); - phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + QpidConstants.INDEX),temp) ; + phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + AMQPConstants.INDEX),temp) ; } catch(Exception e) { diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java index 1305500439..e31345aa69 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java @@ -13,7 +13,7 @@ import org.apache.qpid.framing.AMQResponseBody; import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.AbstractPhase; -import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.core.AMQPConstants; /** * Corressponds to the Layer 2 in AMQP. @@ -83,7 +83,7 @@ public class ExecutionPhase extends AbstractPhase public void messageSent(Object msg) throws AMQPException { AMQPMethodEvent evt = (AMQPMethodEvent) msg; - if (evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID) + if (evt.getCorrelationId() == AMQPConstants.EMPTY_CORRELATION_ID) { // This is a request AMQFrame frame = handleRequest(evt); diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java index c5a75d242f..cf7c4e226f 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java @@ -33,7 +33,7 @@ import org.apache.qpid.framing.AMQResponseBody; import org.apache.qpid.framing.RequestResponseMappingException; import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; import org.apache.qpid.nclient.config.ClientConfiguration; -import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.core.AMQPConstants; public class ResponseManager { @@ -99,7 +99,7 @@ public class ResponseManager this.connectionId = connectionId; responseIdCount = 1L; lastReceivedRequestId = 0L; - maxAccumulatedResponses = ClientConfiguration.get().getInt(QpidConstants.MAX_ACCUMILATED_RESPONSES); + maxAccumulatedResponses = ClientConfiguration.get().getInt(AMQPConstants.MAX_ACCUMILATED_RESPONSES); responseMap = new ConcurrentHashMap<Long, ResponseStatus>(); } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java index d202bab843..d0a4ab79a5 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java @@ -12,6 +12,7 @@ import org.apache.qpid.nclient.core.AMQPException; public abstract class AbstractResource { private String _resourceName; + private boolean _closed = true; public AbstractResource(String resourceName) { @@ -20,12 +21,13 @@ public abstract class AbstractResource public void open() throws QpidException { + _closed = false; try { openResource(); } - catch(AMQPException e) + catch(Exception e) { throw new QpidException("Error creating " + _resourceName + " due to " + e.getMessage(),e); } @@ -33,7 +35,8 @@ public abstract class AbstractResource public void close() throws QpidException { - try + _closed = true; + try { closeResource(); @@ -45,7 +48,15 @@ public abstract class AbstractResource } - protected abstract void openResource() throws AMQPException; + protected abstract void openResource() throws AMQPException, QpidException; - protected abstract void closeResource() throws AMQPException; + protected abstract void closeResource() throws AMQPException, QpidException; + + public void checkClosed() throws QpidException + { + if(_closed) + { + throw new QpidException("The resource you are trying to access is closed"); + } + } } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java index 054eedfcee..e9809e4c83 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java @@ -23,6 +23,7 @@ package org.apache.qpid.nclient.impl; import java.util.Map; import java.util.StringTokenizer; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -48,7 +49,6 @@ import org.apache.qpid.nclient.amqp.AMQPChannel; import org.apache.qpid.nclient.amqp.AMQPClassFactory; import org.apache.qpid.nclient.amqp.AMQPConnection; import org.apache.qpid.nclient.amqp.AbstractAMQPClassFactory; -import org.apache.qpid.nclient.amqp.qpid.QpidAMQPChannel; import org.apache.qpid.nclient.amqp.state.AMQPState; import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; import org.apache.qpid.nclient.amqp.state.AMQPStateListener; @@ -66,7 +66,7 @@ import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionTy * replaced by the session methods. * */ -public class QpidConnectionImpl implements QpidConnection, AMQPStateListener +public class QpidConnectionImpl extends AbstractResource implements QpidConnection, AMQPStateListener { private static final Logger _logger = Logger.getLogger(QpidConnectionImpl.class); @@ -89,18 +89,70 @@ public class QpidConnectionImpl implements QpidConnection, AMQPStateListener private Lock _lock = new ReentrantLock(); - /** --------------------------------------------- - * Methods from o.a.qpid.client.Connection - * ---------------------------------------------- + private AtomicBoolean _closed; + + private AtomicBoolean _opened; + + public QpidConnectionImpl() + { + super("Connection"); + } + + /** + * ----------------------------------------------------- + * Methods introduced by AbstractResource + * ----------------------------------------------------- */ + protected void openResource() throws AMQPException, QpidException + { + throw new UnsupportedOperationException("open is not defined for this resource"); + } - public void close() + protected void closeResource() throws AMQPException, QpidException { - // handle failover + _classFactory = null; } + + @Override + public void checkClosed() throws QpidException + { + if(_closed.get()) + { + throw new QpidException("The resource you are trying to access is closed"); + } + } + + /** --------------------------------------------- + * Methods from o.a.qpid.client.Connection + * ---------------------------------------------- + */ - public void connect(String url) throws QpidException + @Override + public void close() throws QpidException { + if (!_closed.getAndSet(true)) + { + _lock.lock(); + try + { + super.close(); + _opened.set(false); + initiateFailover(); + } + finally + { + _lock.unlock(); + } + } + } + + public void connect(String url) throws QpidException + { + if (_opened.get()) + { + throw new QpidException("The connection is already opened"); + } + try { _classFactory = AbstractAMQPClassFactory.getFactoryInstance(); @@ -128,10 +180,13 @@ public class QpidConnectionImpl implements QpidConnection, AMQPStateListener { throw new QpidException("Connection negotiation failed due to " + e.getMessage(),e); } + + _opened.set(true); } public QpidSession createSession(int expiryInSeconds) throws QpidException { + checkClosed(); AMQPChannel channel = null; _lock.lock(); try @@ -165,20 +220,7 @@ public class QpidConnectionImpl implements QpidConnection, AMQPStateListener if(event.getNewState() == AMQPState.CONNECTION_CLOSED) { - //We need to notify the sessions that they need to - //kick in the fail over logic - for (Integer sessionId : _sessionMap.keySet()) - { - QpidSession session = _sessionMap.get(sessionId); - try - { - session.failover(); - } - catch(Exception e) - { - _logger.error("Error executing failover logic for session : " + sessionId, e); - } - } + initiateFailover(); } } @@ -188,7 +230,7 @@ public class QpidConnectionImpl implements QpidConnection, AMQPStateListener * ---------------------------------------------- */ - public void handleConnectionNegotiation() throws Exception + private void handleConnectionNegotiation() throws Exception { _classFactory.getStateManager().addListener(AMQPStateType.CONNECTION_STATE, this); @@ -239,4 +281,21 @@ public class QpidConnectionImpl implements QpidConnection, AMQPStateListener ConnectionOpenOkBody connectionOpenOkBody = _amqpConnection.open(connectionOpenBody); } + private void initiateFailover() + { + //We need to notify the sessions that they need to + //kick in the fail over logic + for (Integer sessionId : _sessionMap.keySet()) + { + QpidSession session = _sessionMap.get(sessionId); + try + { + session.failover(); + } + catch(Exception e) + { + _logger.error("Error executing failover logic for session : " + sessionId, e); + } + } + } } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidExchangeHelperImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidExchangeHelperImpl.java index 2643fe17e6..2ed54c1d5c 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidExchangeHelperImpl.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidExchangeHelperImpl.java @@ -32,6 +32,7 @@ public class QpidExchangeHelperImpl extends AbstractResource implements QpidExch */ public void declareExchange(boolean autoDelete, boolean durable, String exchangeName,boolean internal, boolean nowait, boolean passive,String exchangeClass) throws QpidException { + checkClosed(); final ExchangeDeclareBody exchangeDeclareBody = ExchangeDeclareBody.createMethodBody( _session.getMajor(), _session.getMinor(), @@ -61,6 +62,7 @@ public class QpidExchangeHelperImpl extends AbstractResource implements QpidExch public void deleteExchange(String exchangeName, boolean ifUnused, boolean nowait) throws QpidException { + checkClosed(); final ExchangeDeleteBody exchangeDeclareBody = ExchangeDeleteBody.createMethodBody( _session.getMajor(), _session.getMinor(), @@ -88,12 +90,12 @@ public class QpidExchangeHelperImpl extends AbstractResource implements QpidExch * Methods introduced by AbstractResource * ----------------------------------------------------- */ - protected void openResource() throws AMQPException + protected void openResource() throws AMQPException, QpidException { _exchange = _session.getClassFactory().createExchangeClass(_session.getChannel()); } - protected void closeResource() throws AMQPException + protected void closeResource() throws AMQPException, QpidException { _session.getClassFactory().destoryExchangeClass(_session.getChannel(), _exchange); } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java index 22000a8506..fe0231522c 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java @@ -24,35 +24,34 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import org.apache.qpid.framing.Content; -import org.apache.qpid.framing.MessageAppendBody; -import org.apache.qpid.framing.MessageCheckpointBody; -import org.apache.qpid.framing.MessageCloseBody; -import org.apache.qpid.framing.MessageOpenBody; -import org.apache.qpid.framing.MessageRecoverBody; -import org.apache.qpid.framing.MessageResumeBody; -import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageConsumeBody; import org.apache.qpid.nclient.amqp.AMQPMessage; -import org.apache.qpid.nclient.amqp.AMQPMessageCallBack; import org.apache.qpid.nclient.api.QpidException; import org.apache.qpid.nclient.api.QpidMessageConsumer; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.message.AMQPApplicationMessage; -import org.apache.qpid.nclient.message.MessageHeaders; -import org.apache.qpid.nclient.message.MessageStore; -import org.apache.qpid.nclient.message.TransientMessageStore; -public class QpidMessageConsumerImpl extends AbstractResource implements QpidMessageConsumer, AMQPMessageCallBack +public class QpidMessageConsumerImpl extends AbstractResource implements QpidMessageConsumer { - private MessageStore _msgStore = new TransientMessageStore(); private final BlockingQueue<AMQPApplicationMessage> _queue = new LinkedBlockingQueue<AMQPApplicationMessage>(); private QpidSessionImpl _session; private AMQPMessage _amqpMessage; + private String _consumerTag; + private String _queueName; + private boolean _noLocal; + private boolean _exclusive; - protected QpidMessageConsumerImpl(QpidSessionImpl session) + protected QpidMessageConsumerImpl(QpidSessionImpl session,String consumerTag,String queueName,boolean noLocal,boolean exclusive) throws QpidException { - super("Message Class"); + super("Message Consuer"); _session = session; + _amqpMessage = session.getMessageHelper().getMessageClass(); + _consumerTag = consumerTag; + _queueName = queueName; + _noLocal = noLocal; + _exclusive = exclusive; } /** @@ -63,17 +62,20 @@ public class QpidMessageConsumerImpl extends AbstractResource implements QpidMes public AMQPApplicationMessage get() throws QpidException { + checkClosed(); // I want this to do a message.get return null; } public AMQPApplicationMessage receive()throws QpidException { + checkClosed(); return _queue.poll(); } public AMQPApplicationMessage receive(long timeout, TimeUnit tu)throws QpidException { + checkClosed(); try { return _queue.poll(timeout, tu); @@ -83,118 +85,104 @@ public class QpidMessageConsumerImpl extends AbstractResource implements QpidMes throw new QpidException("Error retrieving message from queue",e); } } + + public void messageArrived(AMQPApplicationMessage msg)throws QpidException + { + try + { + _queue.put(msg); + } + catch(Exception e) + { + throw new QpidException("Error queueing the messsage",e); + } + } /** * ----------------------------------------------- * Abstract methods from AbstractResource class * ----------------------------------------------- */ - protected void openResource() throws AMQPException + protected void openResource() throws AMQPException, QpidException { - _amqpMessage = _session.getClassFactory().createMessageClass(_session.getChannel(),null); + // Will wait till the dust settles on the message selectors + + final MessageConsumeBody messageConsumeBody = + MessageConsumeBody.createMethodBody( + _session.getMajor(), + _session.getMinor(), + new AMQShortString(_consumerTag),// destination/deliveryTag/consumerTag + _exclusive, //exclusive + null, //filter + false, //noAck, + _noLocal, //noLocal, + new AMQShortString(_queueName), //queue + _session.getAccessTicket() //ticket + ); + + final AMQPCallbackHelper cb = new AMQPCallbackHelper(); + HelperTemplate template = new HelperTemplate(){ + + public void amqpMethodCall() throws AMQPException + { + _amqpMessage.consume(messageConsumeBody, cb); + } + }; + + template.invokeAMQPMethodCall("Message consume failed due to"); + + template.evaulateResponse(cb); } - protected void closeResource() throws AMQPException + protected void closeResource() throws AMQPException, QpidException { - _session.getClassFactory().destoryMessageClass(_session.getChannel(), _amqpMessage); + ((QpidMessageHelperImpl)_session.getMessageHelper()).deregisterConsumer(_consumerTag); + + final MessageCancelBody messageCancelBody = + MessageCancelBody.createMethodBody( + _session.getMajor(), + _session.getMinor(), + new AMQShortString(_queueName)); + + final AMQPCallbackHelper cb = new AMQPCallbackHelper(); + HelperTemplate template = new HelperTemplate(){ + + public void amqpMethodCall() throws AMQPException + { + _amqpMessage.cancel(messageCancelBody, cb); + } + }; + + template.invokeAMQPMethodCall("Message cancel failed due to"); + + template.evaulateResponse(cb); } /** - * ----------------------------------------------- - * Methods from AMQPMessageCallback class - * ----------------------------------------------- + * ---------------------------------------------- + * Getters for Message Consumer properties + * No setters are allowed. Once these properties + * are set in the constructor they are not allowed + * to be modifed. + * ---------------------------------------------- */ - public void append(MessageAppendBody messageAppendBody, long correlationId) throws AMQPException - { - String reference = new String(messageAppendBody.getReference()); - AMQPApplicationMessage msg = _msgStore.getMessage(reference); - msg.addContent(messageAppendBody.getBytes()); - } - - public void checkpoint(MessageCheckpointBody messageCheckpointBody, long correlationId) throws AMQPException - { - // TODO Auto-generated method stub - } - - public void close(MessageCloseBody messageCloseBody, long correlationId) throws AMQPException + public String getConsumerTag() { - String reference = new String(messageCloseBody.getReference()); - AMQPApplicationMessage msg = _msgStore.getMessage(reference); - enQueue(msg); - _msgStore.removeMessage(reference); + return _consumerTag; } - public void open(MessageOpenBody messageOpenBody, long correlationId) throws AMQPException + public boolean isExclusive() { - String reference = new String(messageOpenBody.getReference()); - AMQPApplicationMessage msg = new AMQPApplicationMessage(_session.getChannel(), messageOpenBody.getReference()); - _msgStore.storeMessage(reference, msg); + return _exclusive; } - public void recover(MessageRecoverBody messageRecoverBody, long correlationId) throws AMQPException + public boolean isNoLocal() { - // TODO Auto-generated method stub - - } - - public void resume(MessageResumeBody messageResumeBody, long correlationId) throws AMQPException - { - // TODO Auto-generated method stub - - } - - public void transfer(MessageTransferBody messageTransferBody, long correlationId) throws AMQPException - { - MessageHeaders messageHeaders = new MessageHeaders(); - messageHeaders.setMessageId(messageTransferBody.getMessageId()); - messageHeaders.setAppId(messageTransferBody.getAppId()); - messageHeaders.setContentType(messageTransferBody.getContentType()); - messageHeaders.setEncoding(messageTransferBody.getContentEncoding()); - messageHeaders.setCorrelationId(messageTransferBody.getCorrelationId()); - messageHeaders.setDestination(messageTransferBody.getDestination()); - messageHeaders.setExchange(messageTransferBody.getExchange()); - messageHeaders.setExpiration(messageTransferBody.getExpiration()); - messageHeaders.setReplyTo(messageTransferBody.getReplyTo()); - messageHeaders.setRoutingKey(messageTransferBody.getRoutingKey()); - messageHeaders.setTransactionId(messageTransferBody.getTransactionId()); - messageHeaders.setUserId(messageTransferBody.getUserId()); - messageHeaders.setPriority(messageTransferBody.getPriority()); - messageHeaders.setDeliveryMode(messageTransferBody.getDeliveryMode()); - messageHeaders.setApplicationHeaders(messageTransferBody.getApplicationHeaders()); - - - - if (messageTransferBody.getBody().getContentType() == Content.TypeEnum.INLINE_T) - { - AMQPApplicationMessage msg = new AMQPApplicationMessage(_session.getChannel(), - correlationId, - messageHeaders, - messageTransferBody.getBody().getContentAsByteArray(), - messageTransferBody.getRedelivered()); - - enQueue(msg); - } - else - { - byte[] referenceId = messageTransferBody.getBody().getContentAsByteArray(); - AMQPApplicationMessage msg = new AMQPApplicationMessage(_session.getChannel(),referenceId); - msg.setMessageHeaders(messageHeaders); - msg.setRedeliveredFlag(messageTransferBody.getRedelivered()); - - _msgStore.storeMessage(new String(referenceId), msg); - } + return _noLocal; } - private void enQueue(AMQPApplicationMessage msg)throws AMQPException + public String getQueueName() { - try - { - _queue.put(msg); - } - catch(Exception e) - { - throw new AMQPException("Error queueing the messsage",e); - } + return _queueName; } - -} +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageHelperImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageHelperImpl.java new file mode 100644 index 0000000000..bd67a905a1 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageHelperImpl.java @@ -0,0 +1,186 @@ +package org.apache.qpid.nclient.impl; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.qpid.framing.Content; +import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.nclient.amqp.AMQPMessage; +import org.apache.qpid.nclient.amqp.AMQPMessageCallBack; +import org.apache.qpid.nclient.api.QpidException; +import org.apache.qpid.nclient.api.QpidMessageConsumer; +import org.apache.qpid.nclient.api.QpidMessageHelper; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.message.AMQPApplicationMessage; +import org.apache.qpid.nclient.message.MessageHeaders; +import org.apache.qpid.nclient.message.MessageStore; +import org.apache.qpid.nclient.message.TransientMessageStore; + +public class QpidMessageHelperImpl extends AbstractResource implements QpidMessageHelper, AMQPMessageCallBack +{ + private MessageStore _msgStore = new TransientMessageStore(); + private Map<String,QpidMessageConsumer> _consumers = new ConcurrentHashMap<String,QpidMessageConsumer>(); + private QpidSessionImpl _session; + private AMQPMessage _amqpMessage; + + protected QpidMessageHelperImpl(QpidSessionImpl session) + { + super("Message Class"); + _session = session; + } + + /** + * ----------------------------------------------- + * Abstract methods from AbstractResource class + * ----------------------------------------------- + */ + protected void openResource() throws AMQPException, QpidException + { + _amqpMessage = _session.getClassFactory().createMessageClass(_session.getChannel(),this); + } + + protected void closeResource() throws AMQPException, QpidException + { + _session.getClassFactory().destoryMessageClass(_session.getChannel(), _amqpMessage); + for (String consumerTag : _consumers.keySet()) + { + QpidMessageConsumer consumer = _consumers.get(consumerTag); + // The close method will deregister itself too. + consumer.close(); + } + } + + /** + * ----------------------------------------------- + * methods from QpidMessageHelper class + * ----------------------------------------------- + */ + public AMQPMessage getMessageClass() throws QpidException + { + return _amqpMessage; + } + + /** + * ----------------------------------------------- + * Methods from AMQPMessageCallback class + * ----------------------------------------------- + */ + public void append(MessageAppendBody messageAppendBody, long correlationId) throws AMQPException + { + String reference = new String(messageAppendBody.getReference()); + AMQPApplicationMessage msg = _msgStore.getMessage(reference); + msg.addContent(messageAppendBody.getBytes()); + } + + public void checkpoint(MessageCheckpointBody messageCheckpointBody, long correlationId) throws AMQPException + { + // TODO Auto-generated method stub + } + + public void close(MessageCloseBody messageCloseBody, long correlationId) throws AMQPException + { + String reference = new String(messageCloseBody.getReference()); + AMQPApplicationMessage msg = _msgStore.getMessage(reference); + notifyMessageArrival(msg); + _msgStore.removeMessage(reference); + } + + public void open(MessageOpenBody messageOpenBody, long correlationId) throws AMQPException + { + String reference = new String(messageOpenBody.getReference()); + AMQPApplicationMessage msg = new AMQPApplicationMessage(_session.getChannel(), messageOpenBody.getReference()); + _msgStore.storeMessage(reference, msg); + } + + public void recover(MessageRecoverBody messageRecoverBody, long correlationId) throws AMQPException + { + // TODO Auto-generated method stub + + } + + public void resume(MessageResumeBody messageResumeBody, long correlationId) throws AMQPException + { + // TODO Auto-generated method stub + + } + + public void transfer(MessageTransferBody messageTransferBody, long correlationId) throws AMQPException + { + MessageHeaders messageHeaders = new MessageHeaders(); + messageHeaders.setMessageId(messageTransferBody.getMessageId()); + messageHeaders.setAppId(messageTransferBody.getAppId()); + messageHeaders.setContentType(messageTransferBody.getContentType()); + messageHeaders.setEncoding(messageTransferBody.getContentEncoding()); + messageHeaders.setCorrelationId(messageTransferBody.getCorrelationId()); + messageHeaders.setDestination(messageTransferBody.getDestination()); + messageHeaders.setExchange(messageTransferBody.getExchange()); + messageHeaders.setExpiration(messageTransferBody.getExpiration()); + messageHeaders.setReplyTo(messageTransferBody.getReplyTo()); + messageHeaders.setRoutingKey(messageTransferBody.getRoutingKey()); + messageHeaders.setTransactionId(messageTransferBody.getTransactionId()); + messageHeaders.setUserId(messageTransferBody.getUserId()); + messageHeaders.setPriority(messageTransferBody.getPriority()); + messageHeaders.setDeliveryMode(messageTransferBody.getDeliveryMode()); + messageHeaders.setApplicationHeaders(messageTransferBody.getApplicationHeaders()); + + + + if (messageTransferBody.getBody().getContentType() == Content.TypeEnum.INLINE_T) + { + AMQPApplicationMessage msg = new AMQPApplicationMessage(_session.getChannel(), + messageTransferBody.getDestination().asString(), + messageHeaders, + messageTransferBody.getBody().getContentAsByteArray(), + messageTransferBody.getRedelivered()); + + notifyMessageArrival(msg); + } + else + { + byte[] referenceId = messageTransferBody.getBody().getContentAsByteArray(); + AMQPApplicationMessage msg = new AMQPApplicationMessage(_session.getChannel(),referenceId); + msg.setMessageHeaders(messageHeaders); + msg.setRedeliveredFlag(messageTransferBody.getRedelivered()); + + _msgStore.storeMessage(new String(referenceId), msg); + } + } + + /** ----------------------------------------- + * Methods defined by this class + * ------------------------------------------ + */ + + public void registerConsumer(String consumerTag,QpidMessageConsumer messageConsumer) + { + _consumers.put(consumerTag, messageConsumer); + } + + public void deregisterConsumer(String consumerTag) + { + if(_consumers.containsKey(consumerTag)) + { + _consumers.remove(consumerTag); + } + // If it's not their no need to worry (or raise an exception) + } + + private void notifyMessageArrival(AMQPApplicationMessage msg) + { + QpidMessageConsumer consumer = _consumers.get(msg.getDeliveryTag()); + try + { + consumer.messageArrived(msg); + } + catch(QpidException e) + { + // maybe retry and then reject the message + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java index f553743ea6..91298dcc57 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java @@ -18,10 +18,11 @@ public class QpidMessageProducerImpl extends AbstractResource implements QpidMes private QpidSessionImpl _session; private AMQPMessage _amqpMessage; - protected QpidMessageProducerImpl(QpidSessionImpl session) + protected QpidMessageProducerImpl(QpidSessionImpl session) throws QpidException { - super("Message Class"); + super("Message Producer"); _session = session; + _amqpMessage = session.getMessageHelper().getMessageClass(); } /** @@ -31,6 +32,7 @@ public class QpidMessageProducerImpl extends AbstractResource implements QpidMes */ public void send(boolean disableMessageId,boolean inline,AMQPApplicationMessage msg)throws QpidException { + checkClosed(); // need to handle the inline and reference case final MessageTransferBody messageTransferBody = prepareTransfer(disableMessageId,msg); final AMQPCallbackHelper cb = new AMQPCallbackHelper(); @@ -52,14 +54,14 @@ public class QpidMessageProducerImpl extends AbstractResource implements QpidMes * Methods introduced by AbstractResource * ----------------------------------------------------- */ - protected void openResource() throws AMQPException + protected void openResource() throws AMQPException, QpidException { - _amqpMessage = _session.getClassFactory().createMessageClass(_session.getChannel(),null); + } - protected void closeResource() throws AMQPException + protected void closeResource() throws AMQPException, QpidException { - _session.getClassFactory().destoryMessageClass(_session.getChannel(), _amqpMessage); + } /** diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidQueueHelperImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidQueueHelperImpl.java index 33b3c1177e..b95f7c3210 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidQueueHelperImpl.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidQueueHelperImpl.java @@ -29,6 +29,7 @@ public class QpidQueueHelperImpl extends AbstractResource implements QpidQueueHe */ public void bindQueue(String exchangeName, boolean nowait, String queueName, String routingKey) throws QpidException { + checkClosed(); final QueueBindBody queueBindBody = QueueBindBody.createMethodBody( _session.getMajor(), _session.getMinor(), @@ -57,6 +58,7 @@ public class QpidQueueHelperImpl extends AbstractResource implements QpidQueueHe public void declareQueue(boolean autoDelete, boolean durable, boolean exclusive, boolean nowait, boolean passive, String queueName) throws QpidException { + checkClosed(); final QueueDeclareBody queueDeclareBody = QueueDeclareBody.createMethodBody( _session.getMajor(), _session.getMinor(), @@ -86,6 +88,7 @@ public class QpidQueueHelperImpl extends AbstractResource implements QpidQueueHe public void deleteQueue(boolean ifEmpty, boolean ifUnused, boolean nowait, String queueName) throws QpidException { + checkClosed(); final QueueDeleteBody queueDeleteBody = QueueDeleteBody.createMethodBody( _session.getMajor(), _session.getMinor(), @@ -112,6 +115,7 @@ public class QpidQueueHelperImpl extends AbstractResource implements QpidQueueHe public void purgeQueue(boolean nowait, String queueName) throws QpidException { + checkClosed(); final QueuePurgeBody queuePurgeBody = QueuePurgeBody.createMethodBody( _session.getMajor(), _session.getMinor(), @@ -136,6 +140,7 @@ public class QpidQueueHelperImpl extends AbstractResource implements QpidQueueHe public void unbindQueue(String exchangeName, String queueName, String routingKey) throws QpidException { + checkClosed(); final QueueUnbindBody queueUnbindBody = QueueUnbindBody.createMethodBody( _session.getMajor(), _session.getMinor(), @@ -166,12 +171,12 @@ public class QpidQueueHelperImpl extends AbstractResource implements QpidQueueHe * Methods introduced by AbstractResource * ----------------------------------------------------- */ - protected void openResource() throws AMQPException + protected void openResource() throws AMQPException, QpidException { _queueClass = _session.getClassFactory().createQueueClass(_session.getChannel()); } - protected void closeResource() throws AMQPException + protected void closeResource() throws AMQPException, QpidException { _session.getClassFactory().destroyQueueClass(_session.getChannel(), _queueClass); } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java index 7d3cf5f861..c7c62c24d8 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.nclient.impl; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -31,6 +34,7 @@ import org.apache.qpid.framing.ChannelOpenOkBody; import org.apache.qpid.nclient.amqp.AMQPChannel; import org.apache.qpid.nclient.amqp.AMQPClassFactory; import org.apache.qpid.nclient.api.QpidConnection; +import org.apache.qpid.nclient.api.QpidConstants; import org.apache.qpid.nclient.api.QpidExchangeHelper; import org.apache.qpid.nclient.api.QpidMessageConsumer; import org.apache.qpid.nclient.api.QpidMessageHelper; @@ -45,7 +49,7 @@ import org.apache.qpid.protocol.AMQConstant; /** * According to the 0-9 spec, the session is built on channels(1-1 map) and when a channel is closed * the session should be closed. However with the introdution of the session class in 0-10 - * this may change. Therefore I will not implement that logic yet. + * this will change. Therefore I will not implement failover logic yet. * * Once the dust settles there will be a Failover Helper that will manage the sessions * failover logic. @@ -60,13 +64,14 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession private int _ticket = 0; //currently useless private QpidExchangeHelperImpl _qpidExchangeHelper; private QpidQueueHelperImpl _qpidQueueHelper; - private QpidMessageConsumerImpl _qpidMessageConsumer; - private QpidMessageProducerImpl _qpidMessageProducer; + private QpidMessageHelperImpl _qpidMessageHelper; + private List<QpidMessageProducerImpl> _producers = new ArrayList<QpidMessageProducerImpl>(); private AtomicBoolean _closed; + private AtomicInteger _consumerTag; private Lock _sessionCloseLock = new ReentrantLock(); - + // this will be used as soon as Session class is finalized - private int _expiryInSeconds = QpidConnection.SESSION_EXPIRY_TIED_TO_CHANNEL; + private int _expiryInSeconds = QpidConstants.SESSION_EXPIRY_TIED_TO_CHANNEL; private QpidConnection _qpidConnection; public QpidSessionImpl(AMQPClassFactory classFactory,AMQPChannel channelClass,int channel,byte major, byte minor) @@ -85,13 +90,23 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession * Methods introduced by AbstractResource * ----------------------------------------------------- */ - protected void openResource() throws AMQPException + protected void openResource() throws AMQPException, QpidException { // These methods will be changed to session methods openChannel(); + + //initialize method helper + _qpidMessageHelper = new QpidMessageHelperImpl(this); + _qpidMessageHelper.open(); + + _qpidExchangeHelper = new QpidExchangeHelperImpl(this); + _qpidExchangeHelper.open(); + + _qpidQueueHelper = new QpidQueueHelperImpl(this); + _qpidQueueHelper.open(); } - protected void closeResource() throws AMQPException + protected void closeResource() throws AMQPException, QpidException { ChannelCloseBody channelCloseBody = ChannelCloseBody.createMethodBody(_major, _minor, 0, //classId @@ -110,13 +125,23 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession { _qpidExchangeHelper.closeResource(); } - if(_qpidMessageConsumer != null) + if(_qpidMessageHelper != null) { - _qpidMessageConsumer.closeResource(); + // The MessageHelper will close the Message Consumers too + _qpidMessageHelper.closeResource(); + } + for (QpidMessageProducerImpl producer: _producers) + { + producer.close(); } - if(_qpidMessageProducer != null) + } + + @Override + public void checkClosed() throws QpidException + { + if(_closed.get()) { - _qpidMessageProducer.closeResource(); + throw new QpidException("The resource you are trying to access is closed"); } } @@ -125,6 +150,7 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession * Methods introduced by QpidSession * ----------------------------------------------------- */ + @Override public void close() throws QpidException { if (!_closed.getAndSet(true)) @@ -139,11 +165,22 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession _sessionCloseLock.unlock(); } } - } + } public void resume() throws QpidException { - + if (!_closed.getAndSet(false)) + { + _sessionCloseLock.lock(); + try + { + super.open(); + } + finally + { + _sessionCloseLock.unlock(); + } + } } // not intended to be used at the jms layer @@ -154,7 +191,7 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession public void failover() throws QpidException { - if(_expiryInSeconds == QpidConnection.SESSION_EXPIRY_TIED_TO_CHANNEL) + if(_expiryInSeconds == QpidConstants.SESSION_EXPIRY_TIED_TO_CHANNEL) { // then close the session } @@ -164,24 +201,23 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession } } - public QpidMessageConsumer createConsumer() throws QpidException + public QpidMessageConsumer createConsumer(String queueName, boolean noLocal, boolean exclusive) throws QpidException { - if (_qpidMessageConsumer == null) - { - _qpidMessageConsumer = new QpidMessageConsumerImpl(this); - _qpidMessageConsumer.open(); - } - return _qpidMessageConsumer; + checkClosed(); + String consumerTag = String.valueOf(_consumerTag.incrementAndGet()); + QpidMessageConsumerImpl qpidMessageConsumer = new QpidMessageConsumerImpl(this,consumerTag,queueName,noLocal,exclusive); + _qpidMessageHelper.registerConsumer(consumerTag, qpidMessageConsumer); + + return qpidMessageConsumer; } public QpidMessageProducer createProducer() throws QpidException { - if (_qpidMessageProducer == null) - { - _qpidMessageProducer = new QpidMessageProducerImpl(this); - _qpidMessageProducer.open(); - } - return _qpidMessageProducer; + checkClosed(); + QpidMessageProducerImpl qpidMessageProducer = new QpidMessageProducerImpl(this); + _producers.add(qpidMessageProducer); + + return qpidMessageProducer; } /** ------------------------------------------ @@ -192,32 +228,25 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession */ public QpidExchangeHelper getExchangeHelper() throws QpidException { - if (_qpidExchangeHelper == null) - { - _qpidExchangeHelper = new QpidExchangeHelperImpl(this); - _qpidExchangeHelper.open(); - } + checkClosed(); return _qpidExchangeHelper; } public QpidMessageHelper getMessageHelper() throws QpidException { - // TODO Auto-generated method stub - return null; + checkClosed(); + return _qpidMessageHelper; } public QpidQueueHelper getQueueHelper() throws QpidException { - if (_qpidQueueHelper == null) - { - _qpidQueueHelper = new QpidQueueHelperImpl(this); - _qpidQueueHelper.open(); - } + checkClosed(); return _qpidQueueHelper; } public QpidTransactionHelper getTransactionHelper()throws QpidException { + checkClosed(); return null; } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java index 79302540be..29e61f623f 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java @@ -31,7 +31,7 @@ public class AMQPApplicationMessage { private int channelId; private byte[] referenceId; private List<byte[]> contents = new LinkedList<byte[]>(); - private long deliveryTag; + private String deliveryTag; private boolean redeliveredFlag; private MessageHeaders messageHeaders; @@ -41,7 +41,7 @@ public class AMQPApplicationMessage { this.referenceId = referenceId; } - public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, boolean redeliveredFlag) + public AMQPApplicationMessage(int channelId, String deliveryTag, MessageHeaders messageHeaders, boolean redeliveredFlag) { this.channelId = channelId; this.deliveryTag = deliveryTag; @@ -49,7 +49,7 @@ public class AMQPApplicationMessage { this.redeliveredFlag = redeliveredFlag; } - public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, byte[] content, boolean redeliveredFlag) + public AMQPApplicationMessage(int channelId, String deliveryTag, MessageHeaders messageHeaders, byte[] content, boolean redeliveredFlag) { this.channelId = channelId; this.deliveryTag = deliveryTag; @@ -95,7 +95,7 @@ public class AMQPApplicationMessage { return buf.array(); } - public long getDeliveryTag() + public String getDeliveryTag() { return deliveryTag; } @@ -117,7 +117,7 @@ public class AMQPApplicationMessage { new String(contents.get(0)); } - public void setDeliveryTag(long deliveryTag) + public void setDeliveryTag(String deliveryTag) { this.deliveryTag = deliveryTag; } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java index d845059ee7..02e3117202 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java @@ -11,7 +11,7 @@ import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.AbstractPhase; import org.apache.qpid.nclient.core.Phase; import org.apache.qpid.nclient.core.PhaseContext; -import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.core.AMQPConstants; /** * This Phase handles Layer 3 functionality of the AMQP spec. @@ -59,7 +59,7 @@ public class ModelPhase extends AbstractPhase { public void notifyMethodListerners(AMQPMethodEvent event) throws AMQPException { - AMQPEventManager eventManager = (AMQPEventManager)_ctx.getProperty(QpidConstants.EVENT_MANAGER); + AMQPEventManager eventManager = (AMQPEventManager)_ctx.getProperty(AMQPConstants.EVENT_MANAGER); eventManager.notifyEvent(event); } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java index 428cd6753d..919b668662 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java @@ -26,7 +26,7 @@ import java.util.Map; import org.apache.log4j.Logger; import org.apache.qpid.nclient.config.ClientConfiguration; -import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.core.AMQPConstants; public class CallbackHandlerRegistry { @@ -62,9 +62,9 @@ public class CallbackHandlerRegistry private void parseProperties() { - String key = QpidConstants.AMQP_SECURITY + "." + - QpidConstants.AMQP_SECURITY_MECHANISMS + "." + - QpidConstants.AMQP_SECURITY_MECHANISM_HANDLER; + String key = AMQPConstants.AMQP_SECURITY + "." + + AMQPConstants.AMQP_SECURITY_MECHANISMS + "." + + AMQPConstants.AMQP_SECURITY_MECHANISM_HANDLER; int index = ClientConfiguration.get().getMaxIndex(key); diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java index 958c6c4782..e4f2cb163d 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java @@ -29,7 +29,7 @@ import javax.security.sasl.SaslClientFactory; import org.apache.log4j.Logger; import org.apache.qpid.nclient.config.ClientConfiguration; -import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.core.AMQPConstants; public class DynamicSaslRegistrar { @@ -47,12 +47,12 @@ public class DynamicSaslRegistrar private static Map<String, Class<? extends SaslClientFactory>> parseProperties() { - List<String> mechanisms = ClientConfiguration.get().getList(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES); + List<String> mechanisms = ClientConfiguration.get().getList(AMQPConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES); TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister = new TreeMap<String, Class<? extends SaslClientFactory>>(); for (String mechanism: mechanisms) { - String className = ClientConfiguration.get().getString(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY + "_" + mechanism); + String className = ClientConfiguration.get().getString(AMQPConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY + "_" + mechanism); try { Class<?> clazz = Class.forName(className); diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java index 734aa68a9d..2ce03d7577 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java @@ -13,7 +13,7 @@ import org.apache.qpid.nclient.core.DefaultPhaseContext; import org.apache.qpid.nclient.core.Phase; import org.apache.qpid.nclient.core.PhaseContext; import org.apache.qpid.nclient.core.PhaseFactory; -import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.core.AMQPConstants; import org.apache.qpid.pool.ReadWriteThreadModel; public class TCPConnection implements TransportConnection @@ -29,11 +29,11 @@ public class TCPConnection implements TransportConnection _brokerDetails = url.getBrokerDetails(0); _ctx = ctx; - ByteBuffer.setUseDirectBuffers(ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_DIRECT_BUFFERS)); + ByteBuffer.setUseDirectBuffers(ClientConfiguration.get().getBoolean(AMQPConstants.ENABLE_DIRECT_BUFFERS)); // the MINA default is currently to use the pooled allocator although this may change in future // once more testing of the performance of the simple allocator has been done - if (ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_POOLED_ALLOCATOR)) + if (ClientConfiguration.get().getBoolean(AMQPConstants.ENABLE_POOLED_ALLOCATOR)) { // Not sure what the original code wanted use :) } @@ -48,22 +48,22 @@ public class TCPConnection implements TransportConnection // if we do not use our own thread model we get the MINA default which is to use // its own leader-follower model - if (ClientConfiguration.get().getBoolean(QpidConstants.USE_SHARED_READ_WRITE_POOL)) + if (ClientConfiguration.get().getBoolean(AMQPConstants.USE_SHARED_READ_WRITE_POOL)) { cfg.setThreadModel(ReadWriteThreadModel.getInstance()); } SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); - scfg.setTcpNoDelay(ClientConfiguration.get().getBoolean(QpidConstants.TCP_NO_DELAY)); - scfg.setSendBufferSize(ClientConfiguration.get().getInt(QpidConstants.SEND_BUFFER_SIZE_IN_KB)*1024); - scfg.setReceiveBufferSize(ClientConfiguration.get().getInt(QpidConstants.RECEIVE_BUFFER_SIZE_IN_KB)*1024); + scfg.setTcpNoDelay(ClientConfiguration.get().getBoolean(AMQPConstants.TCP_NO_DELAY)); + scfg.setSendBufferSize(ClientConfiguration.get().getInt(AMQPConstants.SEND_BUFFER_SIZE_IN_KB)*1024); + scfg.setReceiveBufferSize(ClientConfiguration.get().getInt(AMQPConstants.RECEIVE_BUFFER_SIZE_IN_KB)*1024); } // Returns the phase pipe public Phase connect() throws AMQPException { - _ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails); - _ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector); + _ctx.setProperty(AMQPConstants.AMQP_BROKER_DETAILS,_brokerDetails); + _ctx.setProperty(AMQPConstants.MINA_IO_CONNECTOR,_ioConnector); _phase = PhaseFactory.createPhasePipe(_ctx); _phase.start(); diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java index 911e855d4f..2c618cfe81 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java @@ -44,7 +44,7 @@ import org.apache.qpid.framing.ProtocolVersionList; import org.apache.qpid.nclient.config.ClientConfiguration; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.AbstractPhase; -import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.core.AMQPConstants; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.ssl.BogusSSLContextFactory; @@ -72,8 +72,8 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol public void start()throws AMQPException { - _brokerDetails = (BrokerDetails)_ctx.getProperty(QpidConstants.AMQP_BROKER_DETAILS); - IoConnector ioConnector = (IoConnector)_ctx.getProperty(QpidConstants.MINA_IO_CONNECTOR); + _brokerDetails = (BrokerDetails)_ctx.getProperty(AMQPConstants.AMQP_BROKER_DETAILS); + IoConnector ioConnector = (IoConnector)_ctx.getProperty(AMQPConstants.MINA_IO_CONNECTOR); final SocketAddress address; if (ioConnector instanceof VmPipeConnector) @@ -179,7 +179,7 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol new AMQCodecFactory(false)); if (ClientConfiguration.get().getBoolean( - QpidConstants.USE_SHARED_READ_WRITE_POOL)) + AMQPConstants.USE_SHARED_READ_WRITE_POOL)) { session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf); diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java index ba38848149..db38fdb528 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java @@ -15,7 +15,7 @@ import org.apache.qpid.nclient.core.DefaultPhaseContext; import org.apache.qpid.nclient.core.Phase; import org.apache.qpid.nclient.core.PhaseContext; import org.apache.qpid.nclient.core.PhaseFactory; -import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.core.AMQPConstants; import org.apache.qpid.pool.PoolingFilter; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.pool.ReferenceCountingExecutorService; @@ -48,8 +48,8 @@ public class VMConnection implements TransportConnection { createVMBroker(); - _ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails); - _ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector); + _ctx.setProperty(AMQPConstants.AMQP_BROKER_DETAILS,_brokerDetails); + _ctx.setProperty(AMQPConstants.MINA_IO_CONNECTOR,_ioConnector); _phase = PhaseFactory.createPhasePipe(_ctx); _phase.start(); @@ -86,7 +86,7 @@ public class VMConnection implements TransportConnection private IoHandlerAdapter createBrokerInstance(int port) throws AMQPException { - String protocolProviderClass = ClientConfiguration.get().getString(QpidConstants.QPID_VM_BROKER_CLASS); + String protocolProviderClass = ClientConfiguration.get().getString(AMQPConstants.QPID_VM_BROKER_CLASS); _logger.info("Creating Qpid protocol provider: " + protocolProviderClass); // can't use introspection to get Provider as it is a server class. |