summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-04-19 23:08:19 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-04-19 23:08:19 +0000
commit92b10f932cabb33924d2249e1e2270246d56cca0 (patch)
treeb10ee72c32b283d6782a86eff445fc3d63881725
parent327b9217006cef5d9f0d4736ba1f55ea6e13ebe2 (diff)
downloadqpid-python-92b10f932cabb33924d2249e1e2270246d56cca0.tar.gz
added cordination support
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@530586 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/pom.xml5
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java21
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java121
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java118
4 files changed, 242 insertions, 23 deletions
diff --git a/java/client/pom.xml b/java/client/pom.xml
index 854428fb39..6d6bb7cae2 100644
--- a/java/client/pom.xml
+++ b/java/client/pom.xml
@@ -53,6 +53,11 @@
</dependency>
<dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jta_1.1_spec</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java
index 7cef9f1edd..53e11c48fb 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java
@@ -21,32 +21,27 @@
package org.apache.qpid.nclient.amqp;
import org.apache.qpid.framing.DtxCoordinationCommitBody;
-import org.apache.qpid.framing.DtxCoordinationCommitOkBody;
import org.apache.qpid.framing.DtxCoordinationForgetBody;
-import org.apache.qpid.framing.DtxCoordinationForgetOkBody;
import org.apache.qpid.framing.DtxCoordinationGetTimeoutBody;
-import org.apache.qpid.framing.DtxCoordinationGetTimeoutOkBody;
import org.apache.qpid.framing.DtxCoordinationPrepareBody;
-import org.apache.qpid.framing.DtxCoordinationPrepareOkBody;
import org.apache.qpid.framing.DtxCoordinationRecoverBody;
-import org.apache.qpid.framing.DtxCoordinationRecoverOkBody;
import org.apache.qpid.framing.DtxCoordinationRollbackBody;
-import org.apache.qpid.framing.DtxCoordinationRollbackOkBody;
+import org.apache.qpid.framing.DtxCoordinationSetTimeoutBody;
import org.apache.qpid.nclient.core.AMQPException;
public interface AMQPDtxCoordination
{
- public DtxCoordinationCommitOkBody commit(DtxCoordinationCommitBody dtxCoordinationCommitBody) throws AMQPException;
+ public void commit(DtxCoordinationCommitBody dtxCoordinationCommitBody,AMQPCallBack cb) throws AMQPException;
- public DtxCoordinationForgetOkBody forget(DtxCoordinationForgetBody dtxCoordinationForgetBody) throws AMQPException;
+ public void forget(DtxCoordinationForgetBody dtxCoordinationForgetBody,AMQPCallBack cb) throws AMQPException;
- public DtxCoordinationGetTimeoutOkBody getTimeOut(DtxCoordinationGetTimeoutBody dtxCoordinationGetTimeoutBody) throws AMQPException;
+ public void getTimeOut(DtxCoordinationGetTimeoutBody dtxCoordinationGetTimeoutBody,AMQPCallBack cb) throws AMQPException;
- public DtxCoordinationPrepareOkBody prepare(DtxCoordinationPrepareBody dtxCoordinationPrepareBody) throws AMQPException;
+ public void prepare(DtxCoordinationPrepareBody dtxCoordinationPrepareBody,AMQPCallBack cb) throws AMQPException;
- public DtxCoordinationRecoverOkBody recover(DtxCoordinationRecoverBody dtxCoordinationRecoverBody) throws AMQPException;
+ public void recover(DtxCoordinationRecoverBody dtxCoordinationRecoverBody,AMQPCallBack cb) throws AMQPException;
- public DtxCoordinationRollbackOkBody getTimeOut(DtxCoordinationRollbackBody dtxCoordinationRollbackBody) throws AMQPException;
+ public void rollback(DtxCoordinationRollbackBody dtxCoordinationRollbackBody,AMQPCallBack cb) throws AMQPException;
- //public DtxCoordinationSetTimeoutOkBody getTimeOut(DtxCoordinationSetTimeoutBody dtxCoordinationSetTimeoutBody) throws AMQPException;
+ public void setTimeOut(DtxCoordinationSetTimeoutBody dtxCoordinationSetTimeoutBody,AMQPCallBack cb) throws AMQPException;
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java
new file mode 100644
index 0000000000..70ea6bd27d
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.nclient.amqp.qpid;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.DtxCoordinationCommitBody;
+import org.apache.qpid.framing.DtxCoordinationCommitOkBody;
+import org.apache.qpid.framing.DtxCoordinationForgetBody;
+import org.apache.qpid.framing.DtxCoordinationForgetOkBody;
+import org.apache.qpid.framing.DtxCoordinationGetTimeoutBody;
+import org.apache.qpid.framing.DtxCoordinationGetTimeoutOkBody;
+import org.apache.qpid.framing.DtxCoordinationPrepareBody;
+import org.apache.qpid.framing.DtxCoordinationPrepareOkBody;
+import org.apache.qpid.framing.DtxCoordinationRecoverBody;
+import org.apache.qpid.framing.DtxCoordinationRecoverOkBody;
+import org.apache.qpid.framing.DtxCoordinationRollbackBody;
+import org.apache.qpid.framing.DtxCoordinationSetTimeoutBody;
+import org.apache.qpid.nclient.amqp.AMQPCallBack;
+import org.apache.qpid.nclient.amqp.AMQPCallBackSupport;
+import org.apache.qpid.nclient.amqp.AMQPDtxCoordination;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+
+public class QpidAMQPDtxCoordination extends AMQPCallBackSupport implements AMQPMethodListener, AMQPDtxCoordination
+{
+ private Phase _phase;
+
+ protected QpidAMQPDtxCoordination(int channelId,Phase phase)
+ {
+ super(channelId);
+ _phase = phase;
+ }
+
+ public void commit(DtxCoordinationCommitBody dtxCoordinationCommitBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationCommitBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void forget(DtxCoordinationForgetBody dtxCoordinationForgetBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationForgetBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void getTimeOut(DtxCoordinationGetTimeoutBody dtxCoordinationGetTimeoutBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationGetTimeoutBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void rollback(DtxCoordinationRollbackBody dtxCoordinationRollbackBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationRollbackBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void prepare(DtxCoordinationPrepareBody dtxCoordinationPrepareBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationPrepareBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void recover(DtxCoordinationRecoverBody dtxCoordinationRecoverBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationRecoverBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void setTimeOut(DtxCoordinationSetTimeoutBody dtxCoordinationSetTimeoutBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationSetTimeoutBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /**-------------------------------------------
+ * AMQPMethodListener methods
+ *--------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ long localCorrelationId = evt.getLocalCorrelationId();
+ AMQMethodBody methodBody = evt.getMethod();
+ if ( methodBody instanceof DtxCoordinationCommitOkBody ||
+ methodBody instanceof DtxCoordinationForgetOkBody ||
+ methodBody instanceof DtxCoordinationGetTimeoutOkBody ||
+ methodBody instanceof DtxCoordinationPrepareOkBody ||
+ methodBody instanceof DtxCoordinationRecoverOkBody
+ )
+ {
+ invokeCallBack(localCorrelationId,methodBody);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
index eb2cdb4d01..fd7b6f8ec6 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.nclient.amqp.qpid;
import java.util.concurrent.locks.Condition;
@@ -52,10 +72,17 @@ public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMeth
private final Condition _dtxNotSelected = _lock.newCondition();
- private final Condition _channelNotClosed = _lock.newCondition();
+ private final Condition _dtxNotStarted = _lock.newCondition();
+
+ // maybe it needs a better name
+ private final Condition _dtxNotEnd = _lock.newCondition();
private DtxDemarcationSelectOkBody _dtxDemarcationSelectOkBody;
+ private DtxDemarcationStartOkBody _dtxDemarcationStartOkBody;
+
+ private DtxDemarcationEndOkBody _dtxDemarcationEndOkBody;
+
protected QpidAMQPDtxDemarcation(int channelId, Phase phase, AMQPStateManager stateManager)
{
_channelId = channelId;
@@ -77,14 +104,14 @@ public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMeth
{
_dtxDemarcationSelectOkBody = null;
checkIfValidStateTransition(AMQPState.DTX_CHANNEL_NOT_SELECTED, _currentState, AMQPState.DTX_NOT_STARTED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationSelectOkBody, QpidConstants.EMPTY_CORRELATION_ID);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, dtxDemarcationSelectBody, QpidConstants.EMPTY_CORRELATION_ID);
_phase.messageSent(msg);
- //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ //_dtxNotSelected.await(_serverTimeOut, TimeUnit.MILLISECONDS);
_dtxNotSelected.await();
AMQPValidator.throwExceptionOnNull(_dtxDemarcationSelectOkBody, "The broker didn't send the DtxDemarcationSelectOkBody in time");
- notifyState(AMQPState.CHANNEL_OPENED);
- _currentState = AMQPState.CHANNEL_OPENED;
+ notifyState(AMQPState.DTX_NOT_STARTED);
+ _currentState = AMQPState.DTX_NOT_STARTED;
return _dtxDemarcationSelectOkBody;
}
catch (Exception e)
@@ -99,14 +126,56 @@ public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMeth
public DtxDemarcationStartOkBody start(DtxDemarcationStartBody dtxDemarcationStartBody) throws AMQPException
{
- // TODO Auto-generated method stub
- return null;
+ _lock.lock();
+ try
+ {
+ _dtxDemarcationStartOkBody = null;
+ checkIfValidStateTransition(_validStartStates, _currentState, AMQPState.DTX_STARTED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationStartOkBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_dtxNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _dtxNotStarted.await();
+ AMQPValidator.throwExceptionOnNull(_dtxDemarcationStartOkBody, "The broker didn't send the DtxDemarcationStartOkBody in time");
+ notifyState(AMQPState.DTX_STARTED);
+ _currentState = AMQPState.DTX_STARTED;
+ return _dtxDemarcationStartOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in dtx.start", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException
{
- // TODO Auto-generated method stub
- return null;
+ _lock.lock();
+ try
+ {
+ _dtxDemarcationEndOkBody = null;
+ checkIfValidStateTransition(_validEndStates, _currentState, AMQPState.DTX_END);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationEndOkBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_dtxNotEnd.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _dtxNotEnd.await();
+ AMQPValidator.throwExceptionOnNull(_dtxDemarcationEndOkBody, "The broker didn't send the DtxDemarcationEndOkBody in time");
+ notifyState(AMQPState.DTX_END);
+ _currentState = AMQPState.DTX_END;
+ return _dtxDemarcationEndOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in dtx.start", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
/**
@@ -116,7 +185,36 @@ public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMeth
*/
public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
{
- return true;
+ _lock.lock();
+ try
+ {
+ if (evt.getMethod() instanceof DtxDemarcationSelectOkBody)
+ {
+ _dtxDemarcationEndOkBody = (DtxDemarcationEndOkBody) evt.getMethod();
+ _dtxNotSelected.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof DtxDemarcationStartOkBody)
+ {
+ _dtxDemarcationStartOkBody = (DtxDemarcationStartOkBody) evt.getMethod();
+ _dtxNotStarted.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof DtxDemarcationEndOkBody)
+ {
+ _dtxDemarcationEndOkBody = (DtxDemarcationEndOkBody) evt.getMethod();
+ _dtxNotEnd.signal();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
private void notifyState(AMQPState newState) throws AMQPException