diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-03-30 16:54:25 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-03-30 16:54:25 +0000 |
commit | e97c9492b410c791f13d29e8cfec2103df164e3d (patch) | |
tree | 23c06cc5c55442d7561a5560825c21655daa0aaf | |
parent | f50a093a9423f12b69d82996ec432b9198f90f27 (diff) | |
download | qpid-python-e97c9492b410c791f13d29e8cfec2103df164e3d.tar.gz |
extracted interfaces for protocol level classes
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@524171 13f79535-47bb-0310-9956-ffa450edef68
15 files changed, 1668 insertions, 1316 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java index e480f38b93..3c92af9619 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java @@ -1,33 +1,5 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - package org.apache.qpid.nclient.amqp; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.log4j.Logger; -import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.framing.ChannelFlowBody; @@ -36,334 +8,29 @@ import org.apache.qpid.framing.ChannelOkBody; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; import org.apache.qpid.framing.ChannelResumeBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; -import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; -import org.apache.qpid.nclient.amqp.state.AMQPState; -import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; -import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; -import org.apache.qpid.nclient.amqp.state.AMQPStateManager; -import org.apache.qpid.nclient.amqp.state.AMQPStateType; -import org.apache.qpid.nclient.config.ClientConfiguration; import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.Phase; -import org.apache.qpid.nclient.core.QpidConstants; -import org.apache.qpid.nclient.util.AMQPValidator; - -/** - * This represents the Channel class defined in the AMQP protocol. This class is a finite state machine and is thread - * safe by design. Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown. Only - * one thread can enter the methods that change state, at a given time. The AMQP protocol recommends one thread per - * channel by design. - * - * A JMS Session can wrap an instance of this class. - */ -public class AMQPChannel extends AMQPStateMachine implements AMQPMethodListener +public interface AMQPChannel { - private static final Logger _logger = Logger.getLogger(AMQPChannel.class); - - // the channelId assigned for this channel - private int _channelId; - - private Phase _phase; - - private AMQPState _currentState; - - private AMQPStateManager _stateManager; - - private final AMQPState[] _validCloseStates = new AMQPState[] - { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND }; - - private final AMQPState[] _validResumeStates = new AMQPState[] - { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED }; - - // The wait period until a server sends a respond - private long _serverTimeOut = 1000; - - private final Lock _lock = new ReentrantLock(); - - private final Condition _channelNotOpend = _lock.newCondition(); - - private final Condition _channelNotClosed = _lock.newCondition(); - - private final Condition _channelFlowNotResponded = _lock.newCondition(); - - private final Condition _channelNotResumed = _lock.newCondition(); - - private ChannelOpenOkBody _channelOpenOkBody; - - private ChannelCloseOkBody _channelCloseOkBody; - - private ChannelFlowOkBody _channelFlowOkBody; - - private ChannelOkBody _channelOkBody; - - private ChannelCloseBody _channelCloseBody; - - protected AMQPChannel(int channelId, Phase phase, AMQPStateManager stateManager) - { - _channelId = channelId; - _phase = phase; - _stateManager = stateManager; - _currentState = AMQPState.CHANNEL_NOT_OPENED; - _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); - } - - /** - * ------------------------------------------- - * API Methods - * -------------------------------------------- - */ /** * Opens the channel */ - public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException - { - _lock.lock(); - try - { - _channelOpenOkBody = null; - checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _channelNotOpend.await(); - checkIfConnectionClosed(); - AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time"); - notifyState(AMQPState.CHANNEL_OPENED); - _currentState = AMQPState.CHANNEL_OPENED; - return _channelOpenOkBody; - } - catch (Exception e) - { - throw new AMQPException("Error in channel.open", e); - } - finally - { - _lock.unlock(); - } - } + public abstract ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException; /** * Close the channel */ - public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException - { - _lock.lock(); - try - { - _channelCloseOkBody = null; - checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _channelNotClosed.await(); - AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time"); - notifyState(AMQPState.CHANNEL_CLOSED); - _currentState = AMQPState.CHANNEL_CLOSED; - return _channelCloseOkBody; - } - catch (Exception e) - { - throw new AMQPException("Error in channel.close", e); - } - finally - { - _lock.unlock(); - } - } + public abstract ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException; /** * Channel Flow */ - public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException - { - _lock.lock(); - try - { - _channelFlowOkBody = null; - if (channelFlowBody.active) - { - checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED); - } - else - { - checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND); - } - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _channelFlowNotResponded.await(); - checkIfConnectionClosed(); - AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time"); - handleChannelFlowState(_channelFlowOkBody.active); - return _channelFlowOkBody; - } - catch (Exception e) - { - throw new AMQPException("Error in channel.flow", e); - } - finally - { - _lock.unlock(); - } - } + public abstract ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException; /** * Close the channel */ - public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException - { - _lock.lock(); - try - { - _channelOkBody = null; - checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _channelNotResumed.await(); - checkIfConnectionClosed(); - AMQPValidator.throwExceptionOnNull(_channelOkBody, - "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time"); - notifyState(AMQPState.CHANNEL_OPENED); - _currentState = AMQPState.CHANNEL_OPENED; - return _channelOkBody; - } - catch (Exception e) - { - throw new AMQPException("Error in channel.resume", e); - } - finally - { - _lock.unlock(); - } - } - - /** - * ------------------------------------------- - * AMQPMethodListener methods - * -------------------------------------------- - */ - public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException - { - _lock.lock(); - try - { - if (evt.getMethod() instanceof ChannelOpenOkBody) - { - _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod(); - _channelNotOpend.signal(); - return true; - } - else if (evt.getMethod() instanceof ChannelCloseOkBody) - { - _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod(); - _channelNotClosed.signal(); - return true; - } - else if (evt.getMethod() instanceof ChannelCloseBody) - { - _channelCloseBody = (ChannelCloseBody) evt.getMethod(); - // release the correct lock as u may have some conditions waiting. - // while an error occured and the broker has sent a close. - releaseLocks(); - handleChannelClose(_channelCloseBody); - return true; - } - else if (evt.getMethod() instanceof ChannelFlowOkBody) - { - _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod(); - _channelFlowNotResponded.signal(); - return true; - } - else if (evt.getMethod() instanceof ChannelFlowBody) - { - handleChannelFlow((ChannelFlowBody) evt.getMethod()); - return true; - } - else if (evt.getMethod() instanceof ChannelOkBody) - { - _channelOkBody = (ChannelOkBody) evt.getMethod(); - // In this case the only method expecting channel-ok is channel-resume - // haven't implemented ping and pong. - _channelNotResumed.signal(); - return true; - } - else - { - return false; - } - } - finally - { - _lock.unlock(); - } - } - - private void handleChannelClose(ChannelCloseBody channelCloseBody) throws AMQPException - { - notifyState(AMQPState.CHANNEL_CLOSED); - _currentState = AMQPState.CHANNEL_CLOSED; - // handle channel related cleanup - } - - private void releaseLocks() - { - if (_currentState == AMQPState.CHANNEL_NOT_OPENED) - { - _channelNotOpend.signal(); - _channelNotResumed.signal(); // It could be a channel.resume call - } - else if (_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND) - { - _channelFlowNotResponded.signal(); - } - else if (_currentState == AMQPState.CHANNEL_CLOSED) - { - _channelNotResumed.signal(); - } - } - - private void checkIfConnectionClosed() throws AMQPException - { - if (_channelCloseBody != null) - { - String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() + " with reply code (" - + _channelCloseBody.getReplyCode() + ") " + "caused by class " + _channelCloseBody.getClassId() + " and method " - + _channelCloseBody.getMethod(); - - throw new AMQPException(error); - } - } - - private void handleChannelFlow(ChannelFlowBody channelFlowBody)throws AMQPException - { - _lock.lock(); - try - { - handleChannelFlowState(channelFlowBody.active); - } - finally - { - _lock.unlock(); - } - } + public abstract ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException; - private void handleChannelFlowState(boolean flow)throws AMQPException - { - notifyState((flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND); - _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND; - } - - private void notifyState(AMQPState newState) throws AMQPException - { - _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE)); - } -} +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java index f90a5231a2..4976daa4fa 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java @@ -20,216 +20,39 @@ */ package org.apache.qpid.nclient.amqp; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelFlowBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.ChannelOkBody; -import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionCloseOkBody; -import org.apache.qpid.framing.ConnectionOpenOkBody; -import org.apache.qpid.framing.ConnectionSecureBody; -import org.apache.qpid.framing.ConnectionStartBody; -import org.apache.qpid.framing.ConnectionTuneBody; -import org.apache.qpid.framing.ExchangeDeclareOkBody; -import org.apache.qpid.framing.ExchangeDeleteOkBody; -import org.apache.qpid.framing.MessageAppendBody; -import org.apache.qpid.framing.MessageCancelBody; -import org.apache.qpid.framing.MessageCheckpointBody; -import org.apache.qpid.framing.MessageCloseBody; -import org.apache.qpid.framing.MessageGetBody; -import org.apache.qpid.framing.MessageOffsetBody; -import org.apache.qpid.framing.MessageOkBody; -import org.apache.qpid.framing.MessageOpenBody; -import org.apache.qpid.framing.MessageQosBody; -import org.apache.qpid.framing.MessageRecoverBody; -import org.apache.qpid.framing.MessageRejectBody; -import org.apache.qpid.framing.MessageResumeBody; -import org.apache.qpid.framing.MessageTransferBody; -import org.apache.qpid.framing.QueueBindOkBody; -import org.apache.qpid.framing.QueueDeclareOkBody; -import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.framing.QueuePurgeOkBody; -import org.apache.qpid.framing.QueueUnbindOkBody; import org.apache.qpid.nclient.amqp.event.AMQPEventManager; +import org.apache.qpid.nclient.amqp.qpid.QpidAMQPChannel; +import org.apache.qpid.nclient.amqp.qpid.QpidAMQPExchange; +import org.apache.qpid.nclient.amqp.qpid.QpidAMQPMessage; +import org.apache.qpid.nclient.amqp.qpid.QpidAMQPQueue; import org.apache.qpid.nclient.amqp.state.AMQPStateManager; import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.DefaultPhaseContext; -import org.apache.qpid.nclient.core.Phase; -import org.apache.qpid.nclient.core.PhaseContext; -import org.apache.qpid.nclient.core.QpidConstants; -import org.apache.qpid.nclient.transport.AMQPConnectionURL; import org.apache.qpid.nclient.transport.ConnectionURL; -import org.apache.qpid.nclient.transport.TransportConnection; -import org.apache.qpid.nclient.transport.TransportConnectionFactory; import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType; import org.apache.qpid.url.URLSyntaxException; -/** - * The Class Factory creates AMQP Class - * equivalents defined in the spec. - * - * There should one instance per connection. - * The factory class creates all the support - * classes and provides an instance of the - * AMQP class in ready-to-use state. - * - */ -public class AMQPClassFactory +public interface AMQPClassFactory { - //Need an event manager per connection - private AMQPEventManager _eventManager = new QpidEventManager(); - - // Need a state manager per connection - private AMQPStateManager _stateManager = new QpidStateManager(); - - //Need a phase pipe per connection - private Phase _phase; - - //One instance per connection - private AMQPConnection _amqpConnection; - - public AMQPClassFactory() - { - } + public abstract AMQPConnection createConnectionClass(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException; - public AMQPConnection createConnection(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException - { - AMQPConnectionURL url = new AMQPConnectionURL(urlStr); - return createConnectionClass(url, type); - } + public abstract AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException; - public AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException - { - if (_amqpConnection == null) - { - PhaseContext ctx = new DefaultPhaseContext(); - ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager); + public abstract AMQPChannel createChannelClass(int channel) throws AMQPException; - TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type, ctx); - _amqpConnection = new AMQPConnection(conn, _stateManager); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection); - } - return _amqpConnection; - } + public abstract void destroyChannelClass(int channel, QpidAMQPChannel amqpChannel) throws AMQPException; - public AMQPChannel createChannelClass(int channel) throws AMQPException - { - checkIfConnectionStarted(); - AMQPChannel amqpChannel = new AMQPChannel(channel, _phase,_stateManager); - _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelCloseBody.class, amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelFlowBody.class, amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelOkBody.class, amqpChannel); - return amqpChannel; - } + public abstract AMQPExchange createExchangeClass(int channel) throws AMQPException; - public void destroyChannelClass(int channel, AMQPChannel amqpChannel) throws AMQPException - { - _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelOkBody.class, amqpChannel); - } + public abstract void destoryExchangeClass(int channel, QpidAMQPExchange amqpExchange) throws AMQPException; - public AMQPExchange createExchangeClass(int channel) throws AMQPException - { - checkIfConnectionStarted(); - AMQPExchange amqpExchange = new AMQPExchange(channel, _phase); - _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange); - _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange); - return amqpExchange; - } + public abstract AMQPQueue createQueueClass(int channel) throws AMQPException; - public void destoryExchangeClass(int channel, AMQPExchange amqpExchange) throws AMQPException - { - _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange); - _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange); - } + public abstract void destroyQueueClass(int channel, QpidAMQPQueue amqpQueue) throws AMQPException; - public AMQPQueue createQueueClass(int channel) throws AMQPException - { - checkIfConnectionStarted(); - AMQPQueue amqpQueue = new AMQPQueue(channel, _phase); - _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue); - _eventManager.addMethodEventListener(channel, QueueBindOkBody.class, amqpQueue); - _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue); - _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue); - _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue); - return amqpQueue; - } + public abstract AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException; - public void destroyQueueClass(int channel, AMQPQueue amqpQueue) throws AMQPException - { - _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue); - _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, amqpQueue); - _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue); - _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue); - _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue); - } - - public AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException - { - checkIfConnectionStarted(); - AMQPMessage amqpMessage = new AMQPMessage(channel, _phase, messageCb); - _eventManager.addMethodEventListener(channel, MessageAppendBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageCancelBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageCloseBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageGetBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageOffsetBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageOkBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageOpenBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageRecoverBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageRejectBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageResumeBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageQosBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageTransferBody.class, amqpMessage); - - return amqpMessage; - } - - public void destoryMessageClass(int channel, AMQPMessage amqpMessage) throws AMQPException - { - _eventManager.removeMethodEventListener(channel, MessageAppendBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageCancelBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageCloseBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageGetBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageOkBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageOpenBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageRejectBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageResumeBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageQosBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageTransferBody.class, amqpMessage); - } - - //This class should register as a state listener for AMQPConnection - private void checkIfConnectionStarted() throws AMQPException - { - if (_phase == null) - { - _phase = _amqpConnection.getPhasePipe(); - - if (_phase == null) - { - throw new AMQPException("Cannot create a channel until connection is ready"); - } - } - } + public abstract void destoryMessageClass(int channel, QpidAMQPMessage amqpMessage) throws AMQPException; /** * Extention point @@ -237,10 +60,7 @@ public class AMQPClassFactory * and add listeners to get notified of events * */ - public AMQPEventManager getEventManager() - { - return _eventManager; - } + public abstract AMQPEventManager getEventManager(); /** * Extention point @@ -248,8 +68,6 @@ public class AMQPClassFactory * and add listeners to get notified of state changes * */ - public AMQPStateManager getStateManager() - { - return _stateManager; - } -} + public abstract AMQPStateManager getStateManager(); + +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java index 1d6541ec3e..99eb690ede 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java @@ -1,440 +1,40 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ package org.apache.qpid.nclient.amqp; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.log4j.Logger; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionCloseOkBody; import org.apache.qpid.framing.ConnectionOpenBody; import org.apache.qpid.framing.ConnectionOpenOkBody; -import org.apache.qpid.framing.ConnectionSecureBody; import org.apache.qpid.framing.ConnectionSecureOkBody; import org.apache.qpid.framing.ConnectionStartBody; import org.apache.qpid.framing.ConnectionStartOkBody; -import org.apache.qpid.framing.ConnectionTuneBody; import org.apache.qpid.framing.ConnectionTuneOkBody; -import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; -import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; -import org.apache.qpid.nclient.amqp.state.AMQPState; -import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; -import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; -import org.apache.qpid.nclient.amqp.state.AMQPStateManager; -import org.apache.qpid.nclient.amqp.state.AMQPStateType; -import org.apache.qpid.nclient.config.ClientConfiguration; import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.Phase; -import org.apache.qpid.nclient.core.QpidConstants; -import org.apache.qpid.nclient.transport.TransportConnection; -import org.apache.qpid.nclient.util.AMQPValidator; -/** - * This maps directly to the Connection class defined in the AMQP protocol This class is a finite state machine and is - * thread safe by design A particular method (state changing) can only be invoked once and only in sequence or else an - * IllegalStateTransitionException will be thrown Also only one thread can enter those methods at a given time. - */ -public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListener +public interface AMQPConnection { - private static final Logger _logger = Logger.getLogger(AMQPConnection.class); - - private Phase _phase; - - private TransportConnection _connection; - - private long _correlationId; - - private AMQPState _currentState; - - private AMQPStateManager _stateManager; - - private final AMQPState[] _validCloseStates = new AMQPState[] - { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, - AMQPState.CONNECTION_OPEN, }; - - // The wait period until a server sends a respond - private long _serverTimeOut = 1000; - - private final Lock _lock = new ReentrantLock(); - - private final Condition _connectionNotStarted = _lock.newCondition(); - - private final Condition _connectionNotSecure = _lock.newCondition(); - - private final Condition _connectionNotTuned = _lock.newCondition(); - - private final Condition _connectionNotOpened = _lock.newCondition(); - - private final Condition _connectionNotClosed = _lock.newCondition(); - - private ConnectionStartBody _connectionStartBody; - - private ConnectionSecureBody _connectionSecureBody; - - private ConnectionTuneBody _connectionTuneBody; - - private ConnectionOpenOkBody _connectionOpenOkBody; - - private ConnectionCloseOkBody _connectionCloseOkBody; - - private ConnectionCloseBody _connectionCloseBody; - - protected AMQPConnection(TransportConnection connection, AMQPStateManager stateManager) - { - _connection = connection; - _stateManager = stateManager; - _currentState = AMQPState.CONNECTION_UNDEFINED; - _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); - } - - /** - * ------------------------------------------- - * API Methods - * -------------------------------------------- - */ /** * Opens the TCP connection and let the formalities begin. */ - public ConnectionStartBody openTCPConnection() throws AMQPException - { - _lock.lock(); - // open the TCP connection - try - { - _connectionStartBody = null; - checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED); - _phase = _connection.connect(); - - // waiting for ConnectionStartBody or error in connection - //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _connectionNotStarted.await(); - - checkIfConnectionClosed(); - AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time"); - notifyState(AMQPState.CONNECTION_NOT_STARTED); - _currentState = AMQPState.CONNECTION_NOT_STARTED; - return _connectionStartBody; - } - catch (InterruptedException e) - { - throw new AMQPException("Error opening connection to broker", e); - } - finally - { - _lock.unlock(); - } - } + public abstract ConnectionStartBody openTCPConnection() throws AMQPException; /** * The current java broker implementation can send a connection tune body * as a response to the startOk. Not sure if that is the correct behaviour. */ - public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException - { - _lock.lock(); - try - { - _connectionSecureBody = null; - checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId); - _phase.messageSent(msg); - // _connectionNotSecure.await(_serverTimeOut,TimeUnit.MILLISECONDS); - _connectionNotSecure.await(); - - checkIfConnectionClosed(); - if (_connectionTuneBody != null) - { - notifyState(AMQPState.CONNECTION_NOT_TUNED); - _currentState = AMQPState.CONNECTION_NOT_TUNED; - return _connectionTuneBody; - } - else if (_connectionSecureBody != null) - { // oops the server sent another challenge - notifyState(AMQPState.CONNECTION_NOT_SECURE); - _currentState = AMQPState.CONNECTION_NOT_SECURE; - return _connectionSecureBody; - } - else - { - throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); - } - } - catch (InterruptedException e) - { - throw new AMQPException("Error in connection.startOk", e); - } - finally - { - _lock.unlock(); - } - } + public abstract AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException; /** * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could * issue a new challenge */ - public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException - { - _lock.lock(); - try - { - _connectionTuneBody = null; - _connectionSecureBody = null; - checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED); - - _connectionSecureBody = null; // The server could send a fresh challenge - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId); - _phase.messageSent(msg); - - //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _connectionNotTuned.await(); - checkIfConnectionClosed(); - - if (_connectionTuneBody != null) - { - notifyState(AMQPState.CONNECTION_NOT_TUNED); - _currentState = AMQPState.CONNECTION_NOT_TUNED; - return _connectionTuneBody; - } - else if (_connectionSecureBody != null) - { // oops the server sent another challenge - notifyState(AMQPState.CONNECTION_NOT_SECURE); - _currentState = AMQPState.CONNECTION_NOT_SECURE; - return _connectionSecureBody; - } - else - { - throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); - } - } - catch (InterruptedException e) - { - throw new AMQPException("Error in connection.secureOk", e); - } - finally - { - _lock.unlock(); - } - } - - public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException - { - _lock.lock(); - try - { - checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED); - _connectionSecureBody = null; - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId); - _phase.messageSent(msg); - notifyState(AMQPState.CONNECTION_NOT_OPENED); - _currentState = AMQPState.CONNECTION_NOT_OPENED; - } - finally - { - _lock.unlock(); - } - } - - public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException - { - _lock.lock(); - try - { - // If the broker sends a connection close due to an error with the - // Connection tune ok, then this call will verify that - checkIfConnectionClosed(); - - _connectionOpenOkBody = null; - checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _connectionNotOpened.await(); - - checkIfConnectionClosed(); - AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time"); - notifyState(AMQPState.CONNECTION_OPEN); - _currentState = AMQPState.CONNECTION_OPEN; - return _connectionOpenOkBody; - } - catch (InterruptedException e) - { - throw new AMQPException("Error in connection.open", e); - } - finally - { - _lock.unlock(); - } - } - - public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException - { - _lock.lock(); - try - { - _connectionCloseOkBody = null; - checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); - AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time"); - notifyState(AMQPState.CONNECTION_CLOSED); - _currentState = AMQPState.CONNECTION_CLOSED; - return _connectionCloseOkBody; - } - catch (InterruptedException e) - { - throw new AMQPException("Error in connection.close", e); - } - finally - { - _lock.unlock(); - } - } - - /** - * ------------------------------------------- - * AMQMethodListener methods - * -------------------------------------------- - */ - public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException - { - _lock.lock(); - try - { - _correlationId = evt.getCorrelationId(); - - if (evt.getMethod() instanceof ConnectionStartBody) - { - _connectionStartBody = (ConnectionStartBody) evt.getMethod(); - _connectionNotStarted.signalAll(); - return true; - } - else if (evt.getMethod() instanceof ConnectionSecureBody) - { - _connectionSecureBody = (ConnectionSecureBody) evt.getMethod(); - _connectionNotSecure.signal(); - _connectionNotTuned.signal(); // in case the server has sent another chanllenge - return true; - } - else if (evt.getMethod() instanceof ConnectionTuneBody) - { - _connectionTuneBody = (ConnectionTuneBody) evt.getMethod(); - _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk - _connectionNotTuned.signal(); - return true; - } - else if (evt.getMethod() instanceof ConnectionOpenOkBody) - { - _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod(); - _connectionNotOpened.signal(); - return true; - } - else if (evt.getMethod() instanceof ConnectionCloseOkBody) - { - _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod(); - _connectionNotClosed.signal(); - return true; - } - else if (evt.getMethod() instanceof ConnectionCloseBody) - { - _connectionCloseBody = (ConnectionCloseBody) evt.getMethod(); - // release the correct lock as u may have some conditions waiting. - // while an error occured and the broker has sent a close. - releaseLocks(); - handleClose(); - return true; - } - else - { - return false; - } - } - finally - { - _lock.unlock(); - } - } - - - public Phase getPhasePipe() - { - return _phase; - } - - private void handleClose() throws AMQPException - { - try - { - notifyState(AMQPState.CONNECTION_CLOSING); - _currentState = AMQPState.CONNECTION_CLOSING; - // do the required cleanup and send a ConnectionCloseOkBody - } - catch (Exception e) - { - throw new AMQPException("Error handling connection.close from broker", e); - } - } - - private void checkIfConnectionClosed() throws AMQPException - { - if (_connectionCloseBody != null) - { - String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + " with reply code (" - + _connectionCloseBody.getReplyCode() + ") " + "caused by class " + _connectionCloseBody.getClassId() + " and method " - + _connectionCloseBody.getMethod(); + public abstract AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException; - throw new AMQPException(error); - } - } + public abstract void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException; - private void releaseLocks() - { - if (_currentState == AMQPState.CONNECTION_NOT_OPENED) - { - _connectionNotOpened.signal(); - } - else if (_currentState == AMQPState.CONNECTION_UNDEFINED) - { - _connectionNotStarted.signal(); - } - else if (_currentState == AMQPState.CONNECTION_NOT_STARTED) - { - _connectionNotSecure.signal(); - } - else if (_currentState == AMQPState.CONNECTION_NOT_SECURE) - { - _connectionNotTuned.signal(); - } - } + public abstract ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException; - private void notifyState(AMQPState newState) throws AMQPException - { - _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CONNECTION_STATE)); - } + public abstract ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException; }
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java index 35db9c6a75..f59ad5acf5 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java @@ -1,88 +1,19 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - package org.apache.qpid.nclient.amqp; -import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpid.framing.ExchangeDeclareOkBody; import org.apache.qpid.framing.ExchangeDeleteBody; -import org.apache.qpid.framing.ExchangeDeleteOkBody; -import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; -import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.Phase; -/** - * - * This class represents the Exchange class defined in AMQP. - * Each method takes an @see AMQPCallBack object if it wants to know - * the response from the broker to particular method. - * Clients can handle the reponse asynchronously or block for a response - * using AMQPCallBack.isComplete() periodically using a loop. - */ -public class AMQPExchange extends AMQPCallBackSupport implements AMQPMethodListener +public interface AMQPExchange { - private Phase _phase; - - protected AMQPExchange(int channelId,Phase phase) - { - super(channelId); - _phase = phase; - } - + /** * ----------------------------------------------- * API Methods * ----------------------------------------------- */ - public void declare(ExchangeDeclareBody exchangeDeclareBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleNoWait(exchangeDeclareBody.nowait,exchangeDeclareBody,cb); - _phase.messageSent(msg); - } - - public void delete(ExchangeDeleteBody exchangeDeleteBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleNoWait(exchangeDeleteBody.nowait,exchangeDeleteBody,cb); - _phase.messageSent(msg); - } - - - /**------------------------------------------- - * AMQPMethodListener methods - *-------------------------------------------- - */ - public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException - { - long localCorrelationId = evt.getLocalCorrelationId(); - AMQMethodBody methodBody = evt.getMethod(); - if ( methodBody instanceof ExchangeDeclareOkBody || methodBody instanceof ExchangeDeleteOkBody) - { - invokeCallBack(localCorrelationId,methodBody); - return true; - } - else - { - return false; - } - } -} + public abstract void declare(ExchangeDeclareBody exchangeDeclareBody, AMQPCallBack cb) throws AMQPException; + + public abstract void delete(ExchangeDeleteBody exchangeDeleteBody, AMQPCallBack cb) throws AMQPException; + +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java index 1b86108411..6f21ca2507 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java @@ -1,32 +1,10 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ package org.apache.qpid.nclient.amqp; -import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.MessageAppendBody; import org.apache.qpid.framing.MessageCancelBody; import org.apache.qpid.framing.MessageCheckpointBody; import org.apache.qpid.framing.MessageCloseBody; import org.apache.qpid.framing.MessageConsumeBody; -import org.apache.qpid.framing.MessageEmptyBody; import org.apache.qpid.framing.MessageGetBody; import org.apache.qpid.framing.MessageOffsetBody; import org.apache.qpid.framing.MessageOkBody; @@ -36,198 +14,61 @@ import org.apache.qpid.framing.MessageRecoverBody; import org.apache.qpid.framing.MessageRejectBody; import org.apache.qpid.framing.MessageResumeBody; import org.apache.qpid.framing.MessageTransferBody; -import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; -import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.Phase; - -/** - * This class represents the AMQP Message class. - * You need an instance of this class per channel. - * A @see AMQPMessageCallBack class is taken as an argument in the constructor. - * A client can use this class to issue Message class methods on the broker. - * When the broker issues Message class methods on the client, the client is notified - * via the AMQPMessageCallBack interface. - * - * A JMS Message producer implementation can wrap an instance if this and map - * JMS method calls to the appropriate AMQP methods. - * - * AMQPMessageCallBack can be implemented by the JMS MessageConsumer implementation. - * - */ -public class AMQPMessage extends AMQPCallBackSupport implements AMQPMethodListener + +public interface AMQPMessage { - private Phase _phase; - private AMQPMessageCallBack _messageCb; - - protected AMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb) - { - super(channelId); - _phase = phase; - _messageCb = messageCb; - } - + /** * ----------------------------------------------- * API Methods * ----------------------------------------------- */ - - public void transfer(MessageTransferBody messageTransferBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageTransferBody,cb); - _phase.messageSent(msg); - } - - public void consume(MessageConsumeBody messageConsumeBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageConsumeBody,cb); - _phase.messageSent(msg); - } - - public void cancel(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb); - _phase.messageSent(msg); - } - - public void get(MessageGetBody messageGetBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageGetBody,cb); - _phase.messageSent(msg); - } - - public void recover(MessageRecoverBody messageRecoverBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageRecoverBody,cb); - _phase.messageSent(msg); - } - - public void open(MessageOpenBody messageOpenBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageOpenBody,cb); - _phase.messageSent(msg); - } - - public void close(MessageCloseBody messageCloseBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageCloseBody,cb); - _phase.messageSent(msg); - } - - public void append(MessageAppendBody messageAppendBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageAppendBody,cb); - _phase.messageSent(msg); - } - - public void checkpoint(MessageCheckpointBody messageCheckpointBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageCheckpointBody,cb); - _phase.messageSent(msg); - } - - public void resume(MessageResumeBody messageResumeBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageResumeBody,cb); - _phase.messageSent(msg); - } - - public void qos(MessageQosBody messageQosBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageQosBody,cb); - _phase.messageSent(msg); - } - + + public abstract void transfer(MessageTransferBody messageTransferBody, AMQPCallBack cb) throws AMQPException; + + public abstract void consume(MessageConsumeBody messageConsumeBody, AMQPCallBack cb) throws AMQPException; + + public abstract void cancel(MessageCancelBody messageCancelBody, AMQPCallBack cb) throws AMQPException; + + public abstract void get(MessageGetBody messageGetBody, AMQPCallBack cb) throws AMQPException; + + public abstract void recover(MessageRecoverBody messageRecoverBody, AMQPCallBack cb) throws AMQPException; + + public abstract void open(MessageOpenBody messageOpenBody, AMQPCallBack cb) throws AMQPException; + + public abstract void close(MessageCloseBody messageCloseBody, AMQPCallBack cb) throws AMQPException; + + public abstract void append(MessageAppendBody messageAppendBody, AMQPCallBack cb) throws AMQPException; + + public abstract void checkpoint(MessageCheckpointBody messageCheckpointBody, AMQPCallBack cb) throws AMQPException; + + public abstract void resume(MessageResumeBody messageResumeBody, AMQPCallBack cb) throws AMQPException; + + public abstract void qos(MessageQosBody messageQosBody, AMQPCallBack cb) throws AMQPException; + /** * The correlationId from the request. * For example if a message.transfer is sent with correlationId "ABCD" * then u need to pass that in. This correlation id is used by the execution layer * to handle the correlation of method requests and responses */ - public void ok(MessageOkBody messageOkBody,long correlationId) throws AMQPException - { - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOkBody,correlationId); - _phase.messageSent(msg); - } - + public abstract void ok(MessageOkBody messageOkBody, long correlationId) throws AMQPException; + /** * The correlationId from the request. * For example if a message.transfer is sent with correlationId "ABCD" * then u need to pass that in. This correlation id is used by the execution layer * to handle the correlation of method requests and responses */ - public void reject(MessageRejectBody messageRejectBody,long correlationId) throws AMQPException - { - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageRejectBody,correlationId); - _phase.messageSent(msg); - } - + public abstract void reject(MessageRejectBody messageRejectBody, long correlationId) throws AMQPException; + /** * The correlationId from the request. * For example if a message.resume is sent with correlationId "ABCD" * then u need to pass that in. This correlation id is used by the execution layer * to handle the correlation of method requests and responses */ - public void offset(MessageOffsetBody messageOffsetBody,long correlationId) throws AMQPException - { - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOffsetBody,correlationId); - _phase.messageSent(msg); - } - - /**------------------------------------------- - * AMQPMethodListener methods - *-------------------------------------------- - */ - public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException - { - long localCorrelationId = evt.getLocalCorrelationId(); - AMQMethodBody methodBody = evt.getMethod(); - if ( methodBody instanceof MessageOkBody || - methodBody instanceof MessageRejectBody || - methodBody instanceof MessageEmptyBody) - { - invokeCallBack(localCorrelationId,methodBody); - return true; - } - else if (methodBody instanceof MessageTransferBody) - { - _messageCb.transfer((MessageTransferBody)methodBody, evt.getCorrelationId()); - return true; - } - else if (methodBody instanceof MessageAppendBody) - { - _messageCb.append((MessageAppendBody)methodBody, evt.getCorrelationId()); - return true; - } - else if (methodBody instanceof MessageOpenBody) - { - _messageCb.open((MessageOpenBody)methodBody, evt.getCorrelationId()); - return true; - } - else if (methodBody instanceof MessageCloseBody) - { - _messageCb.close((MessageCloseBody)methodBody, evt.getCorrelationId()); - return true; - } - else if (methodBody instanceof MessageCheckpointBody) - { - _messageCb.checkpoint((MessageCheckpointBody)methodBody, evt.getCorrelationId()); - return true; - } - else if (methodBody instanceof MessageRecoverBody) - { - _messageCb.recover((MessageRecoverBody)methodBody, evt.getCorrelationId()); - return true; - } - else if (methodBody instanceof MessageResumeBody) - { - _messageCb.resume((MessageResumeBody)methodBody, evt.getCorrelationId()); - return true; - } - else - { - return false; - } - } -} + public abstract void offset(MessageOffsetBody messageOffsetBody, long correlationId) throws AMQPException; + +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java index cfe87cb3eb..4559b1f96c 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java @@ -1,117 +1,29 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ package org.apache.qpid.nclient.amqp; -import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.QueueBindBody; -import org.apache.qpid.framing.QueueBindOkBody; import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.QueueDeclareOkBody; import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.QueueDeleteOkBody; import org.apache.qpid.framing.QueuePurgeBody; -import org.apache.qpid.framing.QueuePurgeOkBody; import org.apache.qpid.framing.QueueUnbindBody; -import org.apache.qpid.framing.QueueUnbindOkBody; -import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; -import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.Phase; -/** - * - * This class represents the Queue class defined in AMQP. - * Each method takes an @see AMQPCallBack object if it wants to know - * the response from the broker to a particular method. - * Clients can handle the reponse asynchronously or block for a response - * using AMQPCallBack.isComplete() periodically using a loop. - */ -public class AMQPQueue extends AMQPCallBackSupport implements AMQPMethodListener +public interface AMQPQueue { - private Phase _phase; - protected AMQPQueue(int channelId,Phase phase) - { - super(channelId); - _phase = phase; - } - /** * ----------------------------------------------- * API Methods * ----------------------------------------------- */ - public void declare(QueueDeclareBody queueDeclareBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleNoWait(queueDeclareBody.nowait,queueDeclareBody,cb); - _phase.messageSent(msg); - } - - public void bind(QueueBindBody queueBindBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleNoWait(queueBindBody.nowait,queueBindBody,cb); - _phase.messageSent(msg); - } - + public abstract void declare(QueueDeclareBody queueDeclareBody, AMQPCallBack cb) throws AMQPException; + + public abstract void bind(QueueBindBody queueBindBody, AMQPCallBack cb) throws AMQPException; + // Queue.unbind doesn't have nowait - public void unbind(QueueUnbindBody queueUnbindBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(queueUnbindBody,cb); - _phase.messageSent(msg); - } - - public void purge(QueuePurgeBody queuePurgeBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleNoWait(queuePurgeBody.nowait,queuePurgeBody,cb); - _phase.messageSent(msg); - } + public abstract void unbind(QueueUnbindBody queueUnbindBody, AMQPCallBack cb) throws AMQPException; + + public abstract void purge(QueuePurgeBody queuePurgeBody, AMQPCallBack cb) throws AMQPException; - public void delete(QueueDeleteBody queueDeleteBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleNoWait(queueDeleteBody.nowait,queueDeleteBody,cb); - _phase.messageSent(msg); - } + public abstract void delete(QueueDeleteBody queueDeleteBody, AMQPCallBack cb) throws AMQPException; - - /**------------------------------------------- - * AMQPMethodListener methods - *-------------------------------------------- - */ - public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException - { - long localCorrelationId = evt.getLocalCorrelationId(); - AMQMethodBody methodBody = evt.getMethod(); - if ( methodBody instanceof QueueDeclareOkBody || - methodBody instanceof QueueBindOkBody || - methodBody instanceof QueueUnbindOkBody || - methodBody instanceof QueuePurgeOkBody || - methodBody instanceof QueueDeleteOkBody - ) - { - invokeCallBack(localCorrelationId,methodBody); - return true; - } - else - { - return false; - } - } -} +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java new file mode 100644 index 0000000000..b73aa8e6fa --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java @@ -0,0 +1,370 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.nclient.amqp.qpid; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.framing.ChannelOkBody; +import org.apache.qpid.framing.ChannelOpenBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.ChannelResumeBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.nclient.amqp.AMQPChannel; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; +import org.apache.qpid.nclient.amqp.state.AMQPState; +import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; +import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; +import org.apache.qpid.nclient.amqp.state.AMQPStateManager; +import org.apache.qpid.nclient.amqp.state.AMQPStateType; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.util.AMQPValidator; + +/** + * This represents the Channel class defined in the AMQP protocol. This class is a finite state machine and is thread + * safe by design. Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown. Only + * one thread can enter the methods that change state, at a given time. The AMQP protocol recommends one thread per + * channel by design. + * + * A JMS Session can wrap an instance of this class. + */ + +public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListener, AMQPChannel +{ + private static final Logger _logger = Logger.getLogger(QpidAMQPChannel.class); + + // the channelId assigned for this channel + private int _channelId; + + private Phase _phase; + + private AMQPState _currentState; + + private AMQPStateManager _stateManager; + + private final AMQPState[] _validCloseStates = new AMQPState[] + { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND }; + + private final AMQPState[] _validResumeStates = new AMQPState[] + { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED }; + + // The wait period until a server sends a respond + private long _serverTimeOut = 1000; + + private final Lock _lock = new ReentrantLock(); + + private final Condition _channelNotOpend = _lock.newCondition(); + + private final Condition _channelNotClosed = _lock.newCondition(); + + private final Condition _channelFlowNotResponded = _lock.newCondition(); + + private final Condition _channelNotResumed = _lock.newCondition(); + + private ChannelOpenOkBody _channelOpenOkBody; + + private ChannelCloseOkBody _channelCloseOkBody; + + private ChannelFlowOkBody _channelFlowOkBody; + + private ChannelOkBody _channelOkBody; + + private ChannelCloseBody _channelCloseBody; + + protected QpidAMQPChannel(int channelId, Phase phase, AMQPStateManager stateManager) + { + _channelId = channelId; + _phase = phase; + _stateManager = stateManager; + _currentState = AMQPState.CHANNEL_NOT_OPENED; + _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); + } + + /** + * ------------------------------------------- + * API Methods + * -------------------------------------------- + */ + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPChannel#open(org.apache.qpid.framing.ChannelOpenBody) + */ + public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException + { + _lock.lock(); + try + { + _channelOpenOkBody = null; + checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _channelNotOpend.await(); + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time"); + notifyState(AMQPState.CHANNEL_OPENED); + _currentState = AMQPState.CHANNEL_OPENED; + return _channelOpenOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in channel.open", e); + } + finally + { + _lock.unlock(); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPChannel#close(org.apache.qpid.framing.ChannelCloseBody) + */ + public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException + { + _lock.lock(); + try + { + _channelCloseOkBody = null; + checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _channelNotClosed.await(); + AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time"); + notifyState(AMQPState.CHANNEL_CLOSED); + _currentState = AMQPState.CHANNEL_CLOSED; + return _channelCloseOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in channel.close", e); + } + finally + { + _lock.unlock(); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPChannel#flow(org.apache.qpid.framing.ChannelFlowBody) + */ + public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException + { + _lock.lock(); + try + { + _channelFlowOkBody = null; + if (channelFlowBody.active) + { + checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED); + } + else + { + checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND); + } + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _channelFlowNotResponded.await(); + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time"); + handleChannelFlowState(_channelFlowOkBody.active); + return _channelFlowOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in channel.flow", e); + } + finally + { + _lock.unlock(); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPChannel#resume(org.apache.qpid.framing.ChannelResumeBody) + */ + public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException + { + _lock.lock(); + try + { + _channelOkBody = null; + checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _channelNotResumed.await(); + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_channelOkBody, + "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time"); + notifyState(AMQPState.CHANNEL_OPENED); + _currentState = AMQPState.CHANNEL_OPENED; + return _channelOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in channel.resume", e); + } + finally + { + _lock.unlock(); + } + } + + /** + * ------------------------------------------- + * AMQPMethodListener methods + * -------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + _lock.lock(); + try + { + if (evt.getMethod() instanceof ChannelOpenOkBody) + { + _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod(); + _channelNotOpend.signal(); + return true; + } + else if (evt.getMethod() instanceof ChannelCloseOkBody) + { + _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod(); + _channelNotClosed.signal(); + return true; + } + else if (evt.getMethod() instanceof ChannelCloseBody) + { + _channelCloseBody = (ChannelCloseBody) evt.getMethod(); + // release the correct lock as u may have some conditions waiting. + // while an error occured and the broker has sent a close. + releaseLocks(); + handleChannelClose(_channelCloseBody); + return true; + } + else if (evt.getMethod() instanceof ChannelFlowOkBody) + { + _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod(); + _channelFlowNotResponded.signal(); + return true; + } + else if (evt.getMethod() instanceof ChannelFlowBody) + { + handleChannelFlow((ChannelFlowBody) evt.getMethod()); + return true; + } + else if (evt.getMethod() instanceof ChannelOkBody) + { + _channelOkBody = (ChannelOkBody) evt.getMethod(); + // In this case the only method expecting channel-ok is channel-resume + // haven't implemented ping and pong. + _channelNotResumed.signal(); + return true; + } + else + { + return false; + } + } + finally + { + _lock.unlock(); + } + } + + private void handleChannelClose(ChannelCloseBody channelCloseBody) throws AMQPException + { + notifyState(AMQPState.CHANNEL_CLOSED); + _currentState = AMQPState.CHANNEL_CLOSED; + // handle channel related cleanup + } + + private void releaseLocks() + { + if (_currentState == AMQPState.CHANNEL_NOT_OPENED) + { + _channelNotOpend.signal(); + _channelNotResumed.signal(); // It could be a channel.resume call + } + else if (_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND) + { + _channelFlowNotResponded.signal(); + } + else if (_currentState == AMQPState.CHANNEL_CLOSED) + { + _channelNotResumed.signal(); + } + } + + private void checkIfConnectionClosed() throws AMQPException + { + if (_channelCloseBody != null) + { + String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() + " with reply code (" + + _channelCloseBody.getReplyCode() + ") " + "caused by class " + _channelCloseBody.getClassId() + " and method " + + _channelCloseBody.getMethod(); + + throw new AMQPException(error); + } + } + + private void handleChannelFlow(ChannelFlowBody channelFlowBody)throws AMQPException + { + _lock.lock(); + try + { + handleChannelFlowState(channelFlowBody.active); + } + finally + { + _lock.unlock(); + } + } + + private void handleChannelFlowState(boolean flow)throws AMQPException + { + notifyState((flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND); + _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND; + } + + private void notifyState(AMQPState newState) throws AMQPException + { + _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE)); + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java new file mode 100644 index 0000000000..809aa57dab --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java @@ -0,0 +1,286 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.qpid; + +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.framing.ChannelOkBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.framing.ConnectionOpenOkBody; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.framing.ExchangeDeleteOkBody; +import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageGetBody; +import org.apache.qpid.framing.MessageOffsetBody; +import org.apache.qpid.framing.MessageOkBody; +import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.framing.MessageQosBody; +import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.framing.MessageRejectBody; +import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.framing.QueueBindOkBody; +import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.QueuePurgeOkBody; +import org.apache.qpid.framing.QueueUnbindOkBody; +import org.apache.qpid.nclient.amqp.AMQPChannel; +import org.apache.qpid.nclient.amqp.AMQPClassFactory; +import org.apache.qpid.nclient.amqp.AMQPConnection; +import org.apache.qpid.nclient.amqp.AMQPExchange; +import org.apache.qpid.nclient.amqp.AMQPMessage; +import org.apache.qpid.nclient.amqp.AMQPMessageCallBack; +import org.apache.qpid.nclient.amqp.AMQPQueue; +import org.apache.qpid.nclient.amqp.event.AMQPEventManager; +import org.apache.qpid.nclient.amqp.state.AMQPStateManager; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.DefaultPhaseContext; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.PhaseContext; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.transport.AMQPConnectionURL; +import org.apache.qpid.nclient.transport.ConnectionURL; +import org.apache.qpid.nclient.transport.TransportConnection; +import org.apache.qpid.nclient.transport.TransportConnectionFactory; +import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType; +import org.apache.qpid.url.URLSyntaxException; + +/** + * The Class Factory creates AMQP Class + * equivalents defined in the spec. + * + * There should one instance per connection. + * The factory class creates all the support + * classes and provides an instance of the + * AMQP class in ready-to-use state. + * + */ +public class QpidAMQPClassFactory implements AMQPClassFactory +{ + //Need an event manager per connection + private AMQPEventManager _eventManager = new QpidEventManager(); + + // Need a state manager per connection + private AMQPStateManager _stateManager = new QpidStateManager(); + + //Need a phase pipe per connection + private Phase _phase; + + //One instance per connection + private QpidAMQPConnection _amqpConnection; + + public QpidAMQPClassFactory() + { + + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createConnection(java.lang.String, org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType) + */ + public AMQPConnection createConnectionClass(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException + { + AMQPConnectionURL url = new AMQPConnectionURL(urlStr); + return createConnectionClass(url, type); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createConnectionClass(org.apache.qpid.nclient.transport.ConnectionURL, org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType) + */ + public AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException + { + if (_amqpConnection == null) + { + PhaseContext ctx = new DefaultPhaseContext(); + ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager); + + TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type, ctx); + _amqpConnection = new QpidAMQPConnection(conn, _stateManager); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection); + } + return _amqpConnection; + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createChannelClass(int) + */ + public AMQPChannel createChannelClass(int channel) throws AMQPException + { + checkIfConnectionStarted(); + QpidAMQPChannel amqpChannel = new QpidAMQPChannel(channel, _phase,_stateManager); + _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelCloseBody.class, amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelFlowBody.class, amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelOkBody.class, amqpChannel); + return amqpChannel; + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destroyChannelClass(int, org.apache.qpid.nclient.amqp.AMQPChannel) + */ + public void destroyChannelClass(int channel, QpidAMQPChannel amqpChannel) throws AMQPException + { + _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelOkBody.class, amqpChannel); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createExchangeClass(int) + */ + public AMQPExchange createExchangeClass(int channel) throws AMQPException + { + checkIfConnectionStarted(); + QpidAMQPExchange amqpExchange = new QpidAMQPExchange(channel, _phase); + _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange); + _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange); + return amqpExchange; + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destoryExchangeClass(int, org.apache.qpid.nclient.amqp.AMQPExchange) + */ + public void destoryExchangeClass(int channel, QpidAMQPExchange amqpExchange) throws AMQPException + { + _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange); + _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createQueueClass(int) + */ + public AMQPQueue createQueueClass(int channel) throws AMQPException + { + checkIfConnectionStarted(); + QpidAMQPQueue amqpQueue = new QpidAMQPQueue(channel, _phase); + _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue); + _eventManager.addMethodEventListener(channel, QueueBindOkBody.class, amqpQueue); + _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue); + _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue); + _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue); + return amqpQueue; + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destroyQueueClass(int, org.apache.qpid.nclient.amqp.AMQPQueue) + */ + public void destroyQueueClass(int channel, QpidAMQPQueue amqpQueue) throws AMQPException + { + _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue); + _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, amqpQueue); + _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue); + _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue); + _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createMessageClass(int, org.apache.qpid.nclient.amqp.AMQPMessageCallBack) + */ + public AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException + { + checkIfConnectionStarted(); + QpidAMQPMessage amqpMessage = new QpidAMQPMessage(channel, _phase, messageCb); + _eventManager.addMethodEventListener(channel, MessageAppendBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageCancelBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageCloseBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageGetBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageOffsetBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageOkBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageOpenBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageRecoverBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageRejectBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageResumeBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageQosBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageTransferBody.class, amqpMessage); + + return amqpMessage; + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destoryMessageClass(int, org.apache.qpid.nclient.amqp.AMQPMessage) + */ + public void destoryMessageClass(int channel, QpidAMQPMessage amqpMessage) throws AMQPException + { + _eventManager.removeMethodEventListener(channel, MessageAppendBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageCancelBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageCloseBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageGetBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageOkBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageOpenBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageRejectBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageResumeBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageQosBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageTransferBody.class, amqpMessage); + } + + //This class should register as a state listener for AMQPConnection + private void checkIfConnectionStarted() throws AMQPException + { + if (_phase == null) + { + _phase = _amqpConnection.getPhasePipe(); + + if (_phase == null) + { + throw new AMQPException("Cannot create a channel until connection is ready"); + } + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#getEventManager() + */ + public AMQPEventManager getEventManager() + { + return _eventManager; + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#getStateManager() + */ + public AMQPStateManager getStateManager() + { + return _stateManager; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java new file mode 100644 index 0000000000..9b4d776cc5 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java @@ -0,0 +1,448 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.qpid; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.framing.ConnectionOpenBody; +import org.apache.qpid.framing.ConnectionOpenOkBody; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionSecureOkBody; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ConnectionStartOkBody; +import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.nclient.amqp.AMQPConnection; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; +import org.apache.qpid.nclient.amqp.state.AMQPState; +import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; +import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; +import org.apache.qpid.nclient.amqp.state.AMQPStateManager; +import org.apache.qpid.nclient.amqp.state.AMQPStateType; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.transport.TransportConnection; +import org.apache.qpid.nclient.util.AMQPValidator; + +/** + * This maps directly to the Connection class defined in the AMQP protocol This class is a finite state machine and is + * thread safe by design A particular method (state changing) can only be invoked once and only in sequence or else an + * IllegalStateTransitionException will be thrown Also only one thread can enter those methods at a given time. + */ +public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodListener, AMQPConnection +{ + private static final Logger _logger = Logger.getLogger(QpidAMQPConnection.class); + + private Phase _phase; + + private TransportConnection _connection; + + private long _correlationId; + + private AMQPState _currentState; + + private AMQPStateManager _stateManager; + + private final AMQPState[] _validCloseStates = new AMQPState[] + { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, + AMQPState.CONNECTION_OPEN, }; + + // The wait period until a server sends a respond + private long _serverTimeOut = 1000; + + private final Lock _lock = new ReentrantLock(); + + private final Condition _connectionNotStarted = _lock.newCondition(); + + private final Condition _connectionNotSecure = _lock.newCondition(); + + private final Condition _connectionNotTuned = _lock.newCondition(); + + private final Condition _connectionNotOpened = _lock.newCondition(); + + private final Condition _connectionNotClosed = _lock.newCondition(); + + private ConnectionStartBody _connectionStartBody; + + private ConnectionSecureBody _connectionSecureBody; + + private ConnectionTuneBody _connectionTuneBody; + + private ConnectionOpenOkBody _connectionOpenOkBody; + + private ConnectionCloseOkBody _connectionCloseOkBody; + + private ConnectionCloseBody _connectionCloseBody; + + protected QpidAMQPConnection(TransportConnection connection, AMQPStateManager stateManager) + { + _connection = connection; + _stateManager = stateManager; + _currentState = AMQPState.CONNECTION_UNDEFINED; + _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); + } + + /** + * ------------------------------------------- + * API Methods + * -------------------------------------------- + */ + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPConnection#openTCPConnection() + */ + public ConnectionStartBody openTCPConnection() throws AMQPException + { + _lock.lock(); + // open the TCP connection + try + { + _connectionStartBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED); + _phase = _connection.connect(); + + // waiting for ConnectionStartBody or error in connection + //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotStarted.await(); + + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time"); + notifyState(AMQPState.CONNECTION_NOT_STARTED); + _currentState = AMQPState.CONNECTION_NOT_STARTED; + return _connectionStartBody; + } + catch (InterruptedException e) + { + throw new AMQPException("Error opening connection to broker", e); + } + finally + { + _lock.unlock(); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPConnection#startOk(org.apache.qpid.framing.ConnectionStartOkBody) + */ + public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException + { + _lock.lock(); + try + { + _connectionSecureBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId); + _phase.messageSent(msg); + // _connectionNotSecure.await(_serverTimeOut,TimeUnit.MILLISECONDS); + _connectionNotSecure.await(); + + checkIfConnectionClosed(); + if (_connectionTuneBody != null) + { + notifyState(AMQPState.CONNECTION_NOT_TUNED); + _currentState = AMQPState.CONNECTION_NOT_TUNED; + return _connectionTuneBody; + } + else if (_connectionSecureBody != null) + { // oops the server sent another challenge + notifyState(AMQPState.CONNECTION_NOT_SECURE); + _currentState = AMQPState.CONNECTION_NOT_SECURE; + return _connectionSecureBody; + } + else + { + throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); + } + } + catch (InterruptedException e) + { + throw new AMQPException("Error in connection.startOk", e); + } + finally + { + _lock.unlock(); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPConnection#secureOk(org.apache.qpid.framing.ConnectionSecureOkBody) + */ + public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException + { + _lock.lock(); + try + { + _connectionTuneBody = null; + _connectionSecureBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED); + + _connectionSecureBody = null; // The server could send a fresh challenge + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId); + _phase.messageSent(msg); + + //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotTuned.await(); + checkIfConnectionClosed(); + + if (_connectionTuneBody != null) + { + notifyState(AMQPState.CONNECTION_NOT_TUNED); + _currentState = AMQPState.CONNECTION_NOT_TUNED; + return _connectionTuneBody; + } + else if (_connectionSecureBody != null) + { // oops the server sent another challenge + notifyState(AMQPState.CONNECTION_NOT_SECURE); + _currentState = AMQPState.CONNECTION_NOT_SECURE; + return _connectionSecureBody; + } + else + { + throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); + } + } + catch (InterruptedException e) + { + throw new AMQPException("Error in connection.secureOk", e); + } + finally + { + _lock.unlock(); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPConnection#tuneOk(org.apache.qpid.framing.ConnectionTuneOkBody) + */ + public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException + { + _lock.lock(); + try + { + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED); + _connectionSecureBody = null; + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId); + _phase.messageSent(msg); + notifyState(AMQPState.CONNECTION_NOT_OPENED); + _currentState = AMQPState.CONNECTION_NOT_OPENED; + } + finally + { + _lock.unlock(); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPConnection#open(org.apache.qpid.framing.ConnectionOpenBody) + */ + public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException + { + _lock.lock(); + try + { + // If the broker sends a connection close due to an error with the + // Connection tune ok, then this call will verify that + checkIfConnectionClosed(); + + _connectionOpenOkBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotOpened.await(); + + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time"); + notifyState(AMQPState.CONNECTION_OPEN); + _currentState = AMQPState.CONNECTION_OPEN; + return _connectionOpenOkBody; + } + catch (InterruptedException e) + { + throw new AMQPException("Error in connection.open", e); + } + finally + { + _lock.unlock(); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPConnection#close(org.apache.qpid.framing.ConnectionCloseBody) + */ + public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException + { + _lock.lock(); + try + { + _connectionCloseOkBody = null; + checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time"); + notifyState(AMQPState.CONNECTION_CLOSED); + _currentState = AMQPState.CONNECTION_CLOSED; + return _connectionCloseOkBody; + } + catch (InterruptedException e) + { + throw new AMQPException("Error in connection.close", e); + } + finally + { + _lock.unlock(); + } + } + + /** + * ------------------------------------------- + * AMQMethodListener methods + * -------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + _lock.lock(); + try + { + _correlationId = evt.getCorrelationId(); + + if (evt.getMethod() instanceof ConnectionStartBody) + { + _connectionStartBody = (ConnectionStartBody) evt.getMethod(); + _connectionNotStarted.signalAll(); + return true; + } + else if (evt.getMethod() instanceof ConnectionSecureBody) + { + _connectionSecureBody = (ConnectionSecureBody) evt.getMethod(); + _connectionNotSecure.signal(); + _connectionNotTuned.signal(); // in case the server has sent another chanllenge + return true; + } + else if (evt.getMethod() instanceof ConnectionTuneBody) + { + _connectionTuneBody = (ConnectionTuneBody) evt.getMethod(); + _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk + _connectionNotTuned.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionOpenOkBody) + { + _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod(); + _connectionNotOpened.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionCloseOkBody) + { + _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod(); + _connectionNotClosed.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionCloseBody) + { + _connectionCloseBody = (ConnectionCloseBody) evt.getMethod(); + // release the correct lock as u may have some conditions waiting. + // while an error occured and the broker has sent a close. + releaseLocks(); + handleClose(); + return true; + } + else + { + return false; + } + } + finally + { + _lock.unlock(); + } + } + + + public Phase getPhasePipe() + { + return _phase; + } + + private void handleClose() throws AMQPException + { + try + { + notifyState(AMQPState.CONNECTION_CLOSING); + _currentState = AMQPState.CONNECTION_CLOSING; + // do the required cleanup and send a ConnectionCloseOkBody + } + catch (Exception e) + { + throw new AMQPException("Error handling connection.close from broker", e); + } + } + + private void checkIfConnectionClosed() throws AMQPException + { + if (_connectionCloseBody != null) + { + String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + " with reply code (" + + _connectionCloseBody.getReplyCode() + ") " + "caused by class " + _connectionCloseBody.getClassId() + " and method " + + _connectionCloseBody.getMethod(); + + throw new AMQPException(error); + } + } + + private void releaseLocks() + { + if (_currentState == AMQPState.CONNECTION_NOT_OPENED) + { + _connectionNotOpened.signal(); + } + else if (_currentState == AMQPState.CONNECTION_UNDEFINED) + { + _connectionNotStarted.signal(); + } + else if (_currentState == AMQPState.CONNECTION_NOT_STARTED) + { + _connectionNotSecure.signal(); + } + else if (_currentState == AMQPState.CONNECTION_NOT_SECURE) + { + _connectionNotTuned.signal(); + } + } + + private void notifyState(AMQPState newState) throws AMQPException + { + _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CONNECTION_STATE)); + } + +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java new file mode 100644 index 0000000000..02b22e0755 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.nclient.amqp.qpid; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.framing.ExchangeDeleteBody; +import org.apache.qpid.framing.ExchangeDeleteOkBody; +import org.apache.qpid.nclient.amqp.AMQPCallBack; +import org.apache.qpid.nclient.amqp.AMQPCallBackSupport; +import org.apache.qpid.nclient.amqp.AMQPExchange; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; + +/** + * + * This class represents the Exchange class defined in AMQP. + * Each method takes an @see AMQPCallBack object if it wants to know + * the response from the broker to particular method. + * Clients can handle the reponse asynchronously or block for a response + * using AMQPCallBack.isComplete() periodically using a loop. + */ +public class QpidAMQPExchange extends AMQPCallBackSupport implements AMQPMethodListener, AMQPExchange +{ + private Phase _phase; + + protected QpidAMQPExchange(int channelId,Phase phase) + { + super(channelId); + _phase = phase; + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPExchange#declare(org.apache.qpid.framing.ExchangeDeclareBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void declare(ExchangeDeclareBody exchangeDeclareBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(exchangeDeclareBody.nowait,exchangeDeclareBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPExchange#delete(org.apache.qpid.framing.ExchangeDeleteBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void delete(ExchangeDeleteBody exchangeDeleteBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(exchangeDeleteBody.nowait,exchangeDeleteBody,cb); + _phase.messageSent(msg); + } + + + /**------------------------------------------- + * AMQPMethodListener methods + *-------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + long localCorrelationId = evt.getLocalCorrelationId(); + AMQMethodBody methodBody = evt.getMethod(); + if ( methodBody instanceof ExchangeDeclareOkBody || methodBody instanceof ExchangeDeleteOkBody) + { + invokeCallBack(localCorrelationId,methodBody); + return true; + } + else + { + return false; + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java new file mode 100644 index 0000000000..e40c3cefa2 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java @@ -0,0 +1,256 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.qpid; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageConsumeBody; +import org.apache.qpid.framing.MessageEmptyBody; +import org.apache.qpid.framing.MessageGetBody; +import org.apache.qpid.framing.MessageOffsetBody; +import org.apache.qpid.framing.MessageOkBody; +import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.framing.MessageQosBody; +import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.framing.MessageRejectBody; +import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.nclient.amqp.AMQPCallBack; +import org.apache.qpid.nclient.amqp.AMQPCallBackSupport; +import org.apache.qpid.nclient.amqp.AMQPMessage; +import org.apache.qpid.nclient.amqp.AMQPMessageCallBack; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; + +/** + * This class represents the AMQP Message class. + * You need an instance of this class per channel. + * A @see AMQPMessageCallBack class is taken as an argument in the constructor. + * A client can use this class to issue Message class methods on the broker. + * When the broker issues Message class methods on the client, the client is notified + * via the AMQPMessageCallBack interface. + * + * A JMS Message producer implementation can wrap an instance if this and map + * JMS method calls to the appropriate AMQP methods. + * + * AMQPMessageCallBack can be implemented by the JMS MessageConsumer implementation. + * + */ +public class QpidAMQPMessage extends AMQPCallBackSupport implements AMQPMethodListener, AMQPMessage +{ + private Phase _phase; + private AMQPMessageCallBack _messageCb; + + protected QpidAMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb) + { + super(channelId); + _phase = phase; + _messageCb = messageCb; + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#transfer(org.apache.qpid.framing.MessageTransferBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + + public void transfer(MessageTransferBody messageTransferBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageTransferBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#consume(org.apache.qpid.framing.MessageConsumeBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void consume(MessageConsumeBody messageConsumeBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageConsumeBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#cancel(org.apache.qpid.framing.MessageCancelBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void cancel(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#get(org.apache.qpid.framing.MessageGetBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void get(MessageGetBody messageGetBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageGetBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#recover(org.apache.qpid.framing.MessageRecoverBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void recover(MessageRecoverBody messageRecoverBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageRecoverBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#open(org.apache.qpid.framing.MessageOpenBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void open(MessageOpenBody messageOpenBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageOpenBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#close(org.apache.qpid.framing.MessageCloseBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void close(MessageCloseBody messageCloseBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageCloseBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#append(org.apache.qpid.framing.MessageAppendBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void append(MessageAppendBody messageAppendBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageAppendBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#checkpoint(org.apache.qpid.framing.MessageCheckpointBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void checkpoint(MessageCheckpointBody messageCheckpointBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageCheckpointBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#resume(org.apache.qpid.framing.MessageResumeBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void resume(MessageResumeBody messageResumeBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageResumeBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#qos(org.apache.qpid.framing.MessageQosBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void qos(MessageQosBody messageQosBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageQosBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#ok(org.apache.qpid.framing.MessageOkBody, long) + */ + public void ok(MessageOkBody messageOkBody,long correlationId) throws AMQPException + { + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOkBody,correlationId); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#reject(org.apache.qpid.framing.MessageRejectBody, long) + */ + public void reject(MessageRejectBody messageRejectBody,long correlationId) throws AMQPException + { + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageRejectBody,correlationId); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#offset(org.apache.qpid.framing.MessageOffsetBody, long) + */ + public void offset(MessageOffsetBody messageOffsetBody,long correlationId) throws AMQPException + { + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOffsetBody,correlationId); + _phase.messageSent(msg); + } + + /**------------------------------------------- + * AMQPMethodListener methods + *-------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + long localCorrelationId = evt.getLocalCorrelationId(); + AMQMethodBody methodBody = evt.getMethod(); + if ( methodBody instanceof MessageOkBody || + methodBody instanceof MessageRejectBody || + methodBody instanceof MessageEmptyBody) + { + invokeCallBack(localCorrelationId,methodBody); + return true; + } + else if (methodBody instanceof MessageTransferBody) + { + _messageCb.transfer((MessageTransferBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageAppendBody) + { + _messageCb.append((MessageAppendBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageOpenBody) + { + _messageCb.open((MessageOpenBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageCloseBody) + { + _messageCb.close((MessageCloseBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageCheckpointBody) + { + _messageCb.checkpoint((MessageCheckpointBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageRecoverBody) + { + _messageCb.recover((MessageRecoverBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageResumeBody) + { + _messageCb.resume((MessageResumeBody)methodBody, evt.getCorrelationId()); + return true; + } + else + { + return false; + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java new file mode 100644 index 0000000000..323ff0cf06 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.qpid; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueBindOkBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.QueuePurgeBody; +import org.apache.qpid.framing.QueuePurgeOkBody; +import org.apache.qpid.framing.QueueUnbindBody; +import org.apache.qpid.framing.QueueUnbindOkBody; +import org.apache.qpid.nclient.amqp.AMQPCallBack; +import org.apache.qpid.nclient.amqp.AMQPCallBackSupport; +import org.apache.qpid.nclient.amqp.AMQPQueue; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; + +/** + * + * This class represents the Queue class defined in AMQP. + * Each method takes an @see AMQPCallBack object if it wants to know + * the response from the broker to a particular method. + * Clients can handle the reponse asynchronously or block for a response + * using AMQPCallBack.isComplete() periodically using a loop. + */ +public class QpidAMQPQueue extends AMQPCallBackSupport implements AMQPMethodListener, AMQPQueue +{ + private Phase _phase; + + protected QpidAMQPQueue(int channelId,Phase phase) + { + super(channelId); + _phase = phase; + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPQueue#declare(org.apache.qpid.framing.QueueDeclareBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void declare(QueueDeclareBody queueDeclareBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(queueDeclareBody.nowait,queueDeclareBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPQueue#bind(org.apache.qpid.framing.QueueBindBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void bind(QueueBindBody queueBindBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(queueBindBody.nowait,queueBindBody,cb); + _phase.messageSent(msg); + } + + // Queue.unbind doesn't have nowait + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPQueue#unbind(org.apache.qpid.framing.QueueUnbindBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void unbind(QueueUnbindBody queueUnbindBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(queueUnbindBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPQueue#purge(org.apache.qpid.framing.QueuePurgeBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void purge(QueuePurgeBody queuePurgeBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(queuePurgeBody.nowait,queuePurgeBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPQueue#delete(org.apache.qpid.framing.QueueDeleteBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void delete(QueueDeleteBody queueDeleteBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(queueDeleteBody.nowait,queueDeleteBody,cb); + _phase.messageSent(msg); + } + + + /**------------------------------------------- + * AMQPMethodListener methods + *-------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + long localCorrelationId = evt.getLocalCorrelationId(); + AMQMethodBody methodBody = evt.getMethod(); + if ( methodBody instanceof QueueDeclareOkBody || + methodBody instanceof QueueBindOkBody || + methodBody instanceof QueueUnbindOkBody || + methodBody instanceof QueuePurgeOkBody || + methodBody instanceof QueueDeleteOkBody + ) + { + invokeCallBack(localCorrelationId,methodBody); + return true; + } + else + { + return false; + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java index 9655da6912..902c80fe77 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.nclient.amqp; +package org.apache.qpid.nclient.amqp.qpid; import java.util.ArrayList; import java.util.List; diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java index 20c4921582..3fe1ee4cfd 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.nclient.amqp; +package org.apache.qpid.nclient.amqp.qpid; import java.util.ArrayList; import java.util.List; diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java index f994a99ec3..4d4fadcc22 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java @@ -65,6 +65,7 @@ import org.apache.qpid.nclient.amqp.AMQPConnection; import org.apache.qpid.nclient.amqp.AMQPExchange; import org.apache.qpid.nclient.amqp.AMQPMessage; import org.apache.qpid.nclient.amqp.AMQPQueue; +import org.apache.qpid.nclient.amqp.qpid.QpidAMQPClassFactory; import org.apache.qpid.nclient.amqp.state.AMQPStateType; import org.apache.qpid.nclient.transport.AMQPConnectionURL; import org.apache.qpid.nclient.transport.ConnectionURL; @@ -90,7 +91,7 @@ public class TestClient private static int _channel = 2; // Need a Class factory per connection - private AMQPClassFactory _classFactory = new AMQPClassFactory(); + private AMQPClassFactory _classFactory = new QpidAMQPClassFactory(); private int _ticket; |