diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-04-19 23:08:19 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-04-19 23:08:19 +0000 |
commit | 92b10f932cabb33924d2249e1e2270246d56cca0 (patch) | |
tree | b10ee72c32b283d6782a86eff445fc3d63881725 | |
parent | 327b9217006cef5d9f0d4736ba1f55ea6e13ebe2 (diff) | |
download | qpid-python-92b10f932cabb33924d2249e1e2270246d56cca0.tar.gz |
added cordination support
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@530586 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 242 insertions, 23 deletions
diff --git a/java/client/pom.xml b/java/client/pom.xml index 854428fb39..6d6bb7cae2 100644 --- a/java/client/pom.xml +++ b/java/client/pom.xml @@ -53,6 +53,11 @@ </dependency> <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jta_1.1_spec</artifactId> + </dependency> + + <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> </dependency> diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java index 7cef9f1edd..53e11c48fb 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java @@ -21,32 +21,27 @@ package org.apache.qpid.nclient.amqp; import org.apache.qpid.framing.DtxCoordinationCommitBody; -import org.apache.qpid.framing.DtxCoordinationCommitOkBody; import org.apache.qpid.framing.DtxCoordinationForgetBody; -import org.apache.qpid.framing.DtxCoordinationForgetOkBody; import org.apache.qpid.framing.DtxCoordinationGetTimeoutBody; -import org.apache.qpid.framing.DtxCoordinationGetTimeoutOkBody; import org.apache.qpid.framing.DtxCoordinationPrepareBody; -import org.apache.qpid.framing.DtxCoordinationPrepareOkBody; import org.apache.qpid.framing.DtxCoordinationRecoverBody; -import org.apache.qpid.framing.DtxCoordinationRecoverOkBody; import org.apache.qpid.framing.DtxCoordinationRollbackBody; -import org.apache.qpid.framing.DtxCoordinationRollbackOkBody; +import org.apache.qpid.framing.DtxCoordinationSetTimeoutBody; import org.apache.qpid.nclient.core.AMQPException; public interface AMQPDtxCoordination { - public DtxCoordinationCommitOkBody commit(DtxCoordinationCommitBody dtxCoordinationCommitBody) throws AMQPException; + public void commit(DtxCoordinationCommitBody dtxCoordinationCommitBody,AMQPCallBack cb) throws AMQPException; - public DtxCoordinationForgetOkBody forget(DtxCoordinationForgetBody dtxCoordinationForgetBody) throws AMQPException; + public void forget(DtxCoordinationForgetBody dtxCoordinationForgetBody,AMQPCallBack cb) throws AMQPException; - public DtxCoordinationGetTimeoutOkBody getTimeOut(DtxCoordinationGetTimeoutBody dtxCoordinationGetTimeoutBody) throws AMQPException; + public void getTimeOut(DtxCoordinationGetTimeoutBody dtxCoordinationGetTimeoutBody,AMQPCallBack cb) throws AMQPException; - public DtxCoordinationPrepareOkBody prepare(DtxCoordinationPrepareBody dtxCoordinationPrepareBody) throws AMQPException; + public void prepare(DtxCoordinationPrepareBody dtxCoordinationPrepareBody,AMQPCallBack cb) throws AMQPException; - public DtxCoordinationRecoverOkBody recover(DtxCoordinationRecoverBody dtxCoordinationRecoverBody) throws AMQPException; + public void recover(DtxCoordinationRecoverBody dtxCoordinationRecoverBody,AMQPCallBack cb) throws AMQPException; - public DtxCoordinationRollbackOkBody getTimeOut(DtxCoordinationRollbackBody dtxCoordinationRollbackBody) throws AMQPException; + public void rollback(DtxCoordinationRollbackBody dtxCoordinationRollbackBody,AMQPCallBack cb) throws AMQPException; - //public DtxCoordinationSetTimeoutOkBody getTimeOut(DtxCoordinationSetTimeoutBody dtxCoordinationSetTimeoutBody) throws AMQPException; + public void setTimeOut(DtxCoordinationSetTimeoutBody dtxCoordinationSetTimeoutBody,AMQPCallBack cb) throws AMQPException; } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java new file mode 100644 index 0000000000..70ea6bd27d --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java @@ -0,0 +1,121 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.nclient.amqp.qpid; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.DtxCoordinationCommitBody; +import org.apache.qpid.framing.DtxCoordinationCommitOkBody; +import org.apache.qpid.framing.DtxCoordinationForgetBody; +import org.apache.qpid.framing.DtxCoordinationForgetOkBody; +import org.apache.qpid.framing.DtxCoordinationGetTimeoutBody; +import org.apache.qpid.framing.DtxCoordinationGetTimeoutOkBody; +import org.apache.qpid.framing.DtxCoordinationPrepareBody; +import org.apache.qpid.framing.DtxCoordinationPrepareOkBody; +import org.apache.qpid.framing.DtxCoordinationRecoverBody; +import org.apache.qpid.framing.DtxCoordinationRecoverOkBody; +import org.apache.qpid.framing.DtxCoordinationRollbackBody; +import org.apache.qpid.framing.DtxCoordinationSetTimeoutBody; +import org.apache.qpid.nclient.amqp.AMQPCallBack; +import org.apache.qpid.nclient.amqp.AMQPCallBackSupport; +import org.apache.qpid.nclient.amqp.AMQPDtxCoordination; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; + +public class QpidAMQPDtxCoordination extends AMQPCallBackSupport implements AMQPMethodListener, AMQPDtxCoordination +{ + private Phase _phase; + + protected QpidAMQPDtxCoordination(int channelId,Phase phase) + { + super(channelId); + _phase = phase; + } + + public void commit(DtxCoordinationCommitBody dtxCoordinationCommitBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationCommitBody,cb); + _phase.messageSent(msg); + } + + public void forget(DtxCoordinationForgetBody dtxCoordinationForgetBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationForgetBody,cb); + _phase.messageSent(msg); + } + + public void getTimeOut(DtxCoordinationGetTimeoutBody dtxCoordinationGetTimeoutBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationGetTimeoutBody,cb); + _phase.messageSent(msg); + } + + public void rollback(DtxCoordinationRollbackBody dtxCoordinationRollbackBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationRollbackBody,cb); + _phase.messageSent(msg); + } + + public void prepare(DtxCoordinationPrepareBody dtxCoordinationPrepareBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationPrepareBody,cb); + _phase.messageSent(msg); + } + + public void recover(DtxCoordinationRecoverBody dtxCoordinationRecoverBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationRecoverBody,cb); + _phase.messageSent(msg); + } + + public void setTimeOut(DtxCoordinationSetTimeoutBody dtxCoordinationSetTimeoutBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationSetTimeoutBody,cb); + _phase.messageSent(msg); + } + + /**------------------------------------------- + * AMQPMethodListener methods + *-------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + long localCorrelationId = evt.getLocalCorrelationId(); + AMQMethodBody methodBody = evt.getMethod(); + if ( methodBody instanceof DtxCoordinationCommitOkBody || + methodBody instanceof DtxCoordinationForgetOkBody || + methodBody instanceof DtxCoordinationGetTimeoutOkBody || + methodBody instanceof DtxCoordinationPrepareOkBody || + methodBody instanceof DtxCoordinationRecoverOkBody + ) + { + invokeCallBack(localCorrelationId,methodBody); + return true; + } + else + { + return false; + } + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java index eb2cdb4d01..fd7b6f8ec6 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.nclient.amqp.qpid; import java.util.concurrent.locks.Condition; @@ -52,10 +72,17 @@ public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMeth private final Condition _dtxNotSelected = _lock.newCondition(); - private final Condition _channelNotClosed = _lock.newCondition(); + private final Condition _dtxNotStarted = _lock.newCondition(); + + // maybe it needs a better name + private final Condition _dtxNotEnd = _lock.newCondition(); private DtxDemarcationSelectOkBody _dtxDemarcationSelectOkBody; + private DtxDemarcationStartOkBody _dtxDemarcationStartOkBody; + + private DtxDemarcationEndOkBody _dtxDemarcationEndOkBody; + protected QpidAMQPDtxDemarcation(int channelId, Phase phase, AMQPStateManager stateManager) { _channelId = channelId; @@ -77,14 +104,14 @@ public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMeth { _dtxDemarcationSelectOkBody = null; checkIfValidStateTransition(AMQPState.DTX_CHANNEL_NOT_SELECTED, _currentState, AMQPState.DTX_NOT_STARTED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationSelectOkBody, QpidConstants.EMPTY_CORRELATION_ID); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, dtxDemarcationSelectBody, QpidConstants.EMPTY_CORRELATION_ID); _phase.messageSent(msg); - //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS); + //_dtxNotSelected.await(_serverTimeOut, TimeUnit.MILLISECONDS); _dtxNotSelected.await(); AMQPValidator.throwExceptionOnNull(_dtxDemarcationSelectOkBody, "The broker didn't send the DtxDemarcationSelectOkBody in time"); - notifyState(AMQPState.CHANNEL_OPENED); - _currentState = AMQPState.CHANNEL_OPENED; + notifyState(AMQPState.DTX_NOT_STARTED); + _currentState = AMQPState.DTX_NOT_STARTED; return _dtxDemarcationSelectOkBody; } catch (Exception e) @@ -99,14 +126,56 @@ public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMeth public DtxDemarcationStartOkBody start(DtxDemarcationStartBody dtxDemarcationStartBody) throws AMQPException { - // TODO Auto-generated method stub - return null; + _lock.lock(); + try + { + _dtxDemarcationStartOkBody = null; + checkIfValidStateTransition(_validStartStates, _currentState, AMQPState.DTX_STARTED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationStartOkBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_dtxNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _dtxNotStarted.await(); + AMQPValidator.throwExceptionOnNull(_dtxDemarcationStartOkBody, "The broker didn't send the DtxDemarcationStartOkBody in time"); + notifyState(AMQPState.DTX_STARTED); + _currentState = AMQPState.DTX_STARTED; + return _dtxDemarcationStartOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in dtx.start", e); + } + finally + { + _lock.unlock(); + } } public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException { - // TODO Auto-generated method stub - return null; + _lock.lock(); + try + { + _dtxDemarcationEndOkBody = null; + checkIfValidStateTransition(_validEndStates, _currentState, AMQPState.DTX_END); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationEndOkBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_dtxNotEnd.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _dtxNotEnd.await(); + AMQPValidator.throwExceptionOnNull(_dtxDemarcationEndOkBody, "The broker didn't send the DtxDemarcationEndOkBody in time"); + notifyState(AMQPState.DTX_END); + _currentState = AMQPState.DTX_END; + return _dtxDemarcationEndOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in dtx.start", e); + } + finally + { + _lock.unlock(); + } } /** @@ -116,7 +185,36 @@ public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMeth */ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException { - return true; + _lock.lock(); + try + { + if (evt.getMethod() instanceof DtxDemarcationSelectOkBody) + { + _dtxDemarcationEndOkBody = (DtxDemarcationEndOkBody) evt.getMethod(); + _dtxNotSelected.signal(); + return true; + } + else if (evt.getMethod() instanceof DtxDemarcationStartOkBody) + { + _dtxDemarcationStartOkBody = (DtxDemarcationStartOkBody) evt.getMethod(); + _dtxNotStarted.signal(); + return true; + } + else if (evt.getMethod() instanceof DtxDemarcationEndOkBody) + { + _dtxDemarcationEndOkBody = (DtxDemarcationEndOkBody) evt.getMethod(); + _dtxNotEnd.signal(); + return true; + } + else + { + return false; + } + } + finally + { + _lock.unlock(); + } } private void notifyState(AMQPState newState) throws AMQPException |