diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-04-18 22:07:01 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-04-18 22:07:01 +0000 |
commit | 327b9217006cef5d9f0d4736ba1f55ea6e13ebe2 (patch) | |
tree | 5d3f4ec53eb3ec064379e49b5500539c5fcd03c7 | |
parent | 6bad3035d6b23cc239b88bd71410cb627055b794 (diff) | |
download | qpid-python-327b9217006cef5d9f0d4736ba1f55ea6e13ebe2.tar.gz |
added state support for distributed transactions
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@530180 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 86 insertions, 16 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java index 928780353d..decb120796 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java @@ -130,7 +130,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS); _channelNotOpend.await(); - checkIfConnectionClosed(); + checkIfChannelClosed(); AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time"); notifyState(AMQPState.CHANNEL_OPENED); _currentState = AMQPState.CHANNEL_OPENED; @@ -198,7 +198,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS); _channelFlowNotResponded.await(); - checkIfConnectionClosed(); + checkIfChannelClosed(); AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time"); handleChannelFlowState(_channelFlowOkBody.active); return _channelFlowOkBody; @@ -228,7 +228,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS); _channelNotResumed.await(); - checkIfConnectionClosed(); + checkIfChannelClosed(); AMQPValidator.throwExceptionOnNull(_channelOkBody, "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time"); notifyState(AMQPState.CHANNEL_OPENED); @@ -330,7 +330,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe } } - private void checkIfConnectionClosed() throws AMQPException + private void checkIfChannelClosed() throws AMQPException { if (_channelCloseBody != null) { diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java index a418de7ff6..eb2cdb4d01 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java @@ -1,9 +1,11 @@ package org.apache.qpid.nclient.amqp.qpid; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.DtxDemarcationEndBody; import org.apache.qpid.framing.DtxDemarcationEndOkBody; import org.apache.qpid.framing.DtxDemarcationSelectBody; @@ -11,16 +13,24 @@ import org.apache.qpid.framing.DtxDemarcationSelectOkBody; import org.apache.qpid.framing.DtxDemarcationStartBody; import org.apache.qpid.framing.DtxDemarcationStartOkBody; import org.apache.qpid.nclient.amqp.AMQPDtxDemarcation; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; import org.apache.qpid.nclient.amqp.state.AMQPState; +import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; +import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; import org.apache.qpid.nclient.amqp.state.AMQPStateManager; +import org.apache.qpid.nclient.amqp.state.AMQPStateType; +import org.apache.qpid.nclient.config.ClientConfiguration; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.util.AMQPValidator; -public class QpidAMQPDtxDemarcation implements AMQPDtxDemarcation +public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMethodListener, AMQPDtxDemarcation { private static final Logger _logger = Logger.getLogger(QpidAMQPDtxDemarcation.class); - // the channelId assigned for this channel + // the channelId that will be used for transactions private int _channelId; private Phase _phase; @@ -29,28 +39,62 @@ public class QpidAMQPDtxDemarcation implements AMQPDtxDemarcation private AMQPStateManager _stateManager; - private final AMQPState[] _validCloseStates = new AMQPState[] - { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND }; + private final AMQPState[] _validEndStates = new AMQPState[] + { AMQPState.DTX_STARTED }; - private final AMQPState[] _validResumeStates = new AMQPState[] - { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED }; + private final AMQPState[] _validStartStates = new AMQPState[] + { AMQPState.DTX_NOT_STARTED, AMQPState.DTX_END }; // The wait period until a server sends a respond private long _serverTimeOut = 1000; private final Lock _lock = new ReentrantLock(); + private final Condition _dtxNotSelected = _lock.newCondition(); + + private final Condition _channelNotClosed = _lock.newCondition(); - public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException + private DtxDemarcationSelectOkBody _dtxDemarcationSelectOkBody; + + protected QpidAMQPDtxDemarcation(int channelId, Phase phase, AMQPStateManager stateManager) { - // TODO Auto-generated method stub - return null; - } + _channelId = channelId; + _phase = phase; + _stateManager = stateManager; + _currentState = AMQPState.DTX_CHANNEL_NOT_SELECTED; + _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); + } + /** + * ------------------------------------------- + * API Methods + * -------------------------------------------- + */ public DtxDemarcationSelectOkBody select(DtxDemarcationSelectBody dtxDemarcationSelectBody) throws AMQPException { - // TODO Auto-generated method stub - return null; + _lock.lock(); + try + { + _dtxDemarcationSelectOkBody = null; + checkIfValidStateTransition(AMQPState.DTX_CHANNEL_NOT_SELECTED, _currentState, AMQPState.DTX_NOT_STARTED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationSelectOkBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _dtxNotSelected.await(); + AMQPValidator.throwExceptionOnNull(_dtxDemarcationSelectOkBody, "The broker didn't send the DtxDemarcationSelectOkBody in time"); + notifyState(AMQPState.CHANNEL_OPENED); + _currentState = AMQPState.CHANNEL_OPENED; + return _dtxDemarcationSelectOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in dtx.select", e); + } + finally + { + _lock.unlock(); + } } public DtxDemarcationStartOkBody start(DtxDemarcationStartBody dtxDemarcationStartBody) throws AMQPException @@ -58,5 +102,25 @@ public class QpidAMQPDtxDemarcation implements AMQPDtxDemarcation // TODO Auto-generated method stub return null; } + + public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException + { + // TODO Auto-generated method stub + return null; + } + + /** + * ------------------------------------------- + * AMQPMethodListener methods + * -------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + return true; + } + private void notifyState(AMQPState newState) throws AMQPException + { + _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE)); + } } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java index 061ec5a849..18219abc46 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java @@ -57,4 +57,10 @@ public class AMQPState public static final AMQPState CHANNEL_OPENED = new AMQPState(11, "CHANNEL_OPENED"); public static final AMQPState CHANNEL_CLOSED = new AMQPState(11, "CHANNEL_CLOSED"); public static final AMQPState CHANNEL_SUSPEND = new AMQPState(11, "CHANNEL_SUSPEND"); + + // Distributed Transaction state + public static final AMQPState DTX_CHANNEL_NOT_SELECTED = new AMQPState(10, "DTX_CHANNEL_NOT_SELECTED"); + public static final AMQPState DTX_NOT_STARTED = new AMQPState(10, "DTX_NOT_STARTED"); + public static final AMQPState DTX_STARTED = new AMQPState(10, "DTX_STARTED"); + public static final AMQPState DTX_END = new AMQPState(10, "DTX_END"); } |