summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-04-18 22:07:01 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-04-18 22:07:01 +0000
commit327b9217006cef5d9f0d4736ba1f55ea6e13ebe2 (patch)
tree5d3f4ec53eb3ec064379e49b5500539c5fcd03c7
parent6bad3035d6b23cc239b88bd71410cb627055b794 (diff)
downloadqpid-python-327b9217006cef5d9f0d4736ba1f55ea6e13ebe2.tar.gz
added state support for distributed transactions
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@530180 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java8
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java88
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java6
3 files changed, 86 insertions, 16 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
index 928780353d..decb120796 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
@@ -130,7 +130,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe
//_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
_channelNotOpend.await();
- checkIfConnectionClosed();
+ checkIfChannelClosed();
AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
notifyState(AMQPState.CHANNEL_OPENED);
_currentState = AMQPState.CHANNEL_OPENED;
@@ -198,7 +198,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe
//_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
_channelFlowNotResponded.await();
- checkIfConnectionClosed();
+ checkIfChannelClosed();
AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
handleChannelFlowState(_channelFlowOkBody.active);
return _channelFlowOkBody;
@@ -228,7 +228,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe
//_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
_channelNotResumed.await();
- checkIfConnectionClosed();
+ checkIfChannelClosed();
AMQPValidator.throwExceptionOnNull(_channelOkBody,
"The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
notifyState(AMQPState.CHANNEL_OPENED);
@@ -330,7 +330,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe
}
}
- private void checkIfConnectionClosed() throws AMQPException
+ private void checkIfChannelClosed() throws AMQPException
{
if (_channelCloseBody != null)
{
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
index a418de7ff6..eb2cdb4d01 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
@@ -1,9 +1,11 @@
package org.apache.qpid.nclient.amqp.qpid;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.DtxDemarcationEndBody;
import org.apache.qpid.framing.DtxDemarcationEndOkBody;
import org.apache.qpid.framing.DtxDemarcationSelectBody;
@@ -11,16 +13,24 @@ import org.apache.qpid.framing.DtxDemarcationSelectOkBody;
import org.apache.qpid.framing.DtxDemarcationStartBody;
import org.apache.qpid.framing.DtxDemarcationStartOkBody;
import org.apache.qpid.nclient.amqp.AMQPDtxDemarcation;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
+import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
+import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.util.AMQPValidator;
-public class QpidAMQPDtxDemarcation implements AMQPDtxDemarcation
+public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMethodListener, AMQPDtxDemarcation
{
private static final Logger _logger = Logger.getLogger(QpidAMQPDtxDemarcation.class);
- // the channelId assigned for this channel
+ // the channelId that will be used for transactions
private int _channelId;
private Phase _phase;
@@ -29,28 +39,62 @@ public class QpidAMQPDtxDemarcation implements AMQPDtxDemarcation
private AMQPStateManager _stateManager;
- private final AMQPState[] _validCloseStates = new AMQPState[]
- { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND };
+ private final AMQPState[] _validEndStates = new AMQPState[]
+ { AMQPState.DTX_STARTED };
- private final AMQPState[] _validResumeStates = new AMQPState[]
- { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED };
+ private final AMQPState[] _validStartStates = new AMQPState[]
+ { AMQPState.DTX_NOT_STARTED, AMQPState.DTX_END };
// The wait period until a server sends a respond
private long _serverTimeOut = 1000;
private final Lock _lock = new ReentrantLock();
+ private final Condition _dtxNotSelected = _lock.newCondition();
+
+ private final Condition _channelNotClosed = _lock.newCondition();
- public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException
+ private DtxDemarcationSelectOkBody _dtxDemarcationSelectOkBody;
+
+ protected QpidAMQPDtxDemarcation(int channelId, Phase phase, AMQPStateManager stateManager)
{
- // TODO Auto-generated method stub
- return null;
- }
+ _channelId = channelId;
+ _phase = phase;
+ _stateManager = stateManager;
+ _currentState = AMQPState.DTX_CHANNEL_NOT_SELECTED;
+ _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+ }
+ /**
+ * -------------------------------------------
+ * API Methods
+ * --------------------------------------------
+ */
public DtxDemarcationSelectOkBody select(DtxDemarcationSelectBody dtxDemarcationSelectBody) throws AMQPException
{
- // TODO Auto-generated method stub
- return null;
+ _lock.lock();
+ try
+ {
+ _dtxDemarcationSelectOkBody = null;
+ checkIfValidStateTransition(AMQPState.DTX_CHANNEL_NOT_SELECTED, _currentState, AMQPState.DTX_NOT_STARTED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationSelectOkBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _dtxNotSelected.await();
+ AMQPValidator.throwExceptionOnNull(_dtxDemarcationSelectOkBody, "The broker didn't send the DtxDemarcationSelectOkBody in time");
+ notifyState(AMQPState.CHANNEL_OPENED);
+ _currentState = AMQPState.CHANNEL_OPENED;
+ return _dtxDemarcationSelectOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in dtx.select", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
public DtxDemarcationStartOkBody start(DtxDemarcationStartBody dtxDemarcationStartBody) throws AMQPException
@@ -58,5 +102,25 @@ public class QpidAMQPDtxDemarcation implements AMQPDtxDemarcation
// TODO Auto-generated method stub
return null;
}
+
+ public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /**
+ * -------------------------------------------
+ * AMQPMethodListener methods
+ * --------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ return true;
+ }
+ private void notifyState(AMQPState newState) throws AMQPException
+ {
+ _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE));
+ }
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
index 061ec5a849..18219abc46 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
@@ -57,4 +57,10 @@ public class AMQPState
public static final AMQPState CHANNEL_OPENED = new AMQPState(11, "CHANNEL_OPENED");
public static final AMQPState CHANNEL_CLOSED = new AMQPState(11, "CHANNEL_CLOSED");
public static final AMQPState CHANNEL_SUSPEND = new AMQPState(11, "CHANNEL_SUSPEND");
+
+ // Distributed Transaction state
+ public static final AMQPState DTX_CHANNEL_NOT_SELECTED = new AMQPState(10, "DTX_CHANNEL_NOT_SELECTED");
+ public static final AMQPState DTX_NOT_STARTED = new AMQPState(10, "DTX_NOT_STARTED");
+ public static final AMQPState DTX_STARTED = new AMQPState(10, "DTX_STARTED");
+ public static final AMQPState DTX_END = new AMQPState(10, "DTX_END");
}