summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-03-30 16:54:25 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-03-30 16:54:25 +0000
commite97c9492b410c791f13d29e8cfec2103df164e3d (patch)
tree23c06cc5c55442d7561a5560825c21655daa0aaf
parentf50a093a9423f12b69d82996ec432b9198f90f27 (diff)
downloadqpid-python-e97c9492b410c791f13d29e8cfec2103df164e3d.tar.gz
extracted interfaces for protocol level classes
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@524171 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java345
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java220
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java414
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java83
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java225
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java108
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java370
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java286
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java448
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java92
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java256
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java130
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java (renamed from java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java)2
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java (renamed from java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java)2
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java3
15 files changed, 1668 insertions, 1316 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
index e480f38b93..3c92af9619 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
@@ -1,33 +1,5 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
package org.apache.qpid.nclient.amqp;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.ChannelFlowBody;
@@ -36,334 +8,29 @@ import org.apache.qpid.framing.ChannelOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.framing.ChannelResumeBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
-import org.apache.qpid.nclient.amqp.state.AMQPState;
-import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
-import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
-import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
-import org.apache.qpid.nclient.amqp.state.AMQPStateType;
-import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.util.AMQPValidator;
-
-/**
- * This represents the Channel class defined in the AMQP protocol. This class is a finite state machine and is thread
- * safe by design. Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown. Only
- * one thread can enter the methods that change state, at a given time. The AMQP protocol recommends one thread per
- * channel by design.
- *
- * A JMS Session can wrap an instance of this class.
- */
-public class AMQPChannel extends AMQPStateMachine implements AMQPMethodListener
+public interface AMQPChannel
{
- private static final Logger _logger = Logger.getLogger(AMQPChannel.class);
-
- // the channelId assigned for this channel
- private int _channelId;
-
- private Phase _phase;
-
- private AMQPState _currentState;
-
- private AMQPStateManager _stateManager;
-
- private final AMQPState[] _validCloseStates = new AMQPState[]
- { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND };
-
- private final AMQPState[] _validResumeStates = new AMQPState[]
- { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED };
-
- // The wait period until a server sends a respond
- private long _serverTimeOut = 1000;
-
- private final Lock _lock = new ReentrantLock();
-
- private final Condition _channelNotOpend = _lock.newCondition();
-
- private final Condition _channelNotClosed = _lock.newCondition();
-
- private final Condition _channelFlowNotResponded = _lock.newCondition();
-
- private final Condition _channelNotResumed = _lock.newCondition();
-
- private ChannelOpenOkBody _channelOpenOkBody;
-
- private ChannelCloseOkBody _channelCloseOkBody;
-
- private ChannelFlowOkBody _channelFlowOkBody;
-
- private ChannelOkBody _channelOkBody;
-
- private ChannelCloseBody _channelCloseBody;
-
- protected AMQPChannel(int channelId, Phase phase, AMQPStateManager stateManager)
- {
- _channelId = channelId;
- _phase = phase;
- _stateManager = stateManager;
- _currentState = AMQPState.CHANNEL_NOT_OPENED;
- _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
- }
-
- /**
- * -------------------------------------------
- * API Methods
- * --------------------------------------------
- */
/**
* Opens the channel
*/
- public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _channelOpenOkBody = null;
- checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _channelNotOpend.await();
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
- notifyState(AMQPState.CHANNEL_OPENED);
- _currentState = AMQPState.CHANNEL_OPENED;
- return _channelOpenOkBody;
- }
- catch (Exception e)
- {
- throw new AMQPException("Error in channel.open", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
+ public abstract ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException;
/**
* Close the channel
*/
- public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _channelCloseOkBody = null;
- checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _channelNotClosed.await();
- AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time");
- notifyState(AMQPState.CHANNEL_CLOSED);
- _currentState = AMQPState.CHANNEL_CLOSED;
- return _channelCloseOkBody;
- }
- catch (Exception e)
- {
- throw new AMQPException("Error in channel.close", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
+ public abstract ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException;
/**
* Channel Flow
*/
- public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _channelFlowOkBody = null;
- if (channelFlowBody.active)
- {
- checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED);
- }
- else
- {
- checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND);
- }
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _channelFlowNotResponded.await();
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
- handleChannelFlowState(_channelFlowOkBody.active);
- return _channelFlowOkBody;
- }
- catch (Exception e)
- {
- throw new AMQPException("Error in channel.flow", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
+ public abstract ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException;
/**
* Close the channel
*/
- public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _channelOkBody = null;
- checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _channelNotResumed.await();
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_channelOkBody,
- "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
- notifyState(AMQPState.CHANNEL_OPENED);
- _currentState = AMQPState.CHANNEL_OPENED;
- return _channelOkBody;
- }
- catch (Exception e)
- {
- throw new AMQPException("Error in channel.resume", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * -------------------------------------------
- * AMQPMethodListener methods
- * --------------------------------------------
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- _lock.lock();
- try
- {
- if (evt.getMethod() instanceof ChannelOpenOkBody)
- {
- _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod();
- _channelNotOpend.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ChannelCloseOkBody)
- {
- _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod();
- _channelNotClosed.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ChannelCloseBody)
- {
- _channelCloseBody = (ChannelCloseBody) evt.getMethod();
- // release the correct lock as u may have some conditions waiting.
- // while an error occured and the broker has sent a close.
- releaseLocks();
- handleChannelClose(_channelCloseBody);
- return true;
- }
- else if (evt.getMethod() instanceof ChannelFlowOkBody)
- {
- _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod();
- _channelFlowNotResponded.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ChannelFlowBody)
- {
- handleChannelFlow((ChannelFlowBody) evt.getMethod());
- return true;
- }
- else if (evt.getMethod() instanceof ChannelOkBody)
- {
- _channelOkBody = (ChannelOkBody) evt.getMethod();
- // In this case the only method expecting channel-ok is channel-resume
- // haven't implemented ping and pong.
- _channelNotResumed.signal();
- return true;
- }
- else
- {
- return false;
- }
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- private void handleChannelClose(ChannelCloseBody channelCloseBody) throws AMQPException
- {
- notifyState(AMQPState.CHANNEL_CLOSED);
- _currentState = AMQPState.CHANNEL_CLOSED;
- // handle channel related cleanup
- }
-
- private void releaseLocks()
- {
- if (_currentState == AMQPState.CHANNEL_NOT_OPENED)
- {
- _channelNotOpend.signal();
- _channelNotResumed.signal(); // It could be a channel.resume call
- }
- else if (_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND)
- {
- _channelFlowNotResponded.signal();
- }
- else if (_currentState == AMQPState.CHANNEL_CLOSED)
- {
- _channelNotResumed.signal();
- }
- }
-
- private void checkIfConnectionClosed() throws AMQPException
- {
- if (_channelCloseBody != null)
- {
- String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() + " with reply code ("
- + _channelCloseBody.getReplyCode() + ") " + "caused by class " + _channelCloseBody.getClassId() + " and method "
- + _channelCloseBody.getMethod();
-
- throw new AMQPException(error);
- }
- }
-
- private void handleChannelFlow(ChannelFlowBody channelFlowBody)throws AMQPException
- {
- _lock.lock();
- try
- {
- handleChannelFlowState(channelFlowBody.active);
- }
- finally
- {
- _lock.unlock();
- }
- }
+ public abstract ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException;
- private void handleChannelFlowState(boolean flow)throws AMQPException
- {
- notifyState((flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND);
- _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND;
- }
-
- private void notifyState(AMQPState newState) throws AMQPException
- {
- _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE));
- }
-}
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
index f90a5231a2..4976daa4fa 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
@@ -20,216 +20,39 @@
*/
package org.apache.qpid.nclient.amqp;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ChannelOkBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.ConnectionOpenOkBody;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionStartBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
-import org.apache.qpid.framing.ExchangeDeleteOkBody;
-import org.apache.qpid.framing.MessageAppendBody;
-import org.apache.qpid.framing.MessageCancelBody;
-import org.apache.qpid.framing.MessageCheckpointBody;
-import org.apache.qpid.framing.MessageCloseBody;
-import org.apache.qpid.framing.MessageGetBody;
-import org.apache.qpid.framing.MessageOffsetBody;
-import org.apache.qpid.framing.MessageOkBody;
-import org.apache.qpid.framing.MessageOpenBody;
-import org.apache.qpid.framing.MessageQosBody;
-import org.apache.qpid.framing.MessageRecoverBody;
-import org.apache.qpid.framing.MessageRejectBody;
-import org.apache.qpid.framing.MessageResumeBody;
-import org.apache.qpid.framing.MessageTransferBody;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.QueuePurgeOkBody;
-import org.apache.qpid.framing.QueueUnbindOkBody;
import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
+import org.apache.qpid.nclient.amqp.qpid.QpidAMQPChannel;
+import org.apache.qpid.nclient.amqp.qpid.QpidAMQPExchange;
+import org.apache.qpid.nclient.amqp.qpid.QpidAMQPMessage;
+import org.apache.qpid.nclient.amqp.qpid.QpidAMQPQueue;
import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.DefaultPhaseContext;
-import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.core.PhaseContext;
-import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.transport.AMQPConnectionURL;
import org.apache.qpid.nclient.transport.ConnectionURL;
-import org.apache.qpid.nclient.transport.TransportConnection;
-import org.apache.qpid.nclient.transport.TransportConnectionFactory;
import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
import org.apache.qpid.url.URLSyntaxException;
-/**
- * The Class Factory creates AMQP Class
- * equivalents defined in the spec.
- *
- * There should one instance per connection.
- * The factory class creates all the support
- * classes and provides an instance of the
- * AMQP class in ready-to-use state.
- *
- */
-public class AMQPClassFactory
+public interface AMQPClassFactory
{
- //Need an event manager per connection
- private AMQPEventManager _eventManager = new QpidEventManager();
-
- // Need a state manager per connection
- private AMQPStateManager _stateManager = new QpidStateManager();
-
- //Need a phase pipe per connection
- private Phase _phase;
-
- //One instance per connection
- private AMQPConnection _amqpConnection;
-
- public AMQPClassFactory()
- {
- }
+ public abstract AMQPConnection createConnectionClass(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException;
- public AMQPConnection createConnection(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException
- {
- AMQPConnectionURL url = new AMQPConnectionURL(urlStr);
- return createConnectionClass(url, type);
- }
+ public abstract AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException;
- public AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException
- {
- if (_amqpConnection == null)
- {
- PhaseContext ctx = new DefaultPhaseContext();
- ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager);
+ public abstract AMQPChannel createChannelClass(int channel) throws AMQPException;
- TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type, ctx);
- _amqpConnection = new AMQPConnection(conn, _stateManager);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection);
- }
- return _amqpConnection;
- }
+ public abstract void destroyChannelClass(int channel, QpidAMQPChannel amqpChannel) throws AMQPException;
- public AMQPChannel createChannelClass(int channel) throws AMQPException
- {
- checkIfConnectionStarted();
- AMQPChannel amqpChannel = new AMQPChannel(channel, _phase,_stateManager);
- _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
- return amqpChannel;
- }
+ public abstract AMQPExchange createExchangeClass(int channel) throws AMQPException;
- public void destroyChannelClass(int channel, AMQPChannel amqpChannel) throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
- }
+ public abstract void destoryExchangeClass(int channel, QpidAMQPExchange amqpExchange) throws AMQPException;
- public AMQPExchange createExchangeClass(int channel) throws AMQPException
- {
- checkIfConnectionStarted();
- AMQPExchange amqpExchange = new AMQPExchange(channel, _phase);
- _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
- _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
- return amqpExchange;
- }
+ public abstract AMQPQueue createQueueClass(int channel) throws AMQPException;
- public void destoryExchangeClass(int channel, AMQPExchange amqpExchange) throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
- _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
- }
+ public abstract void destroyQueueClass(int channel, QpidAMQPQueue amqpQueue) throws AMQPException;
- public AMQPQueue createQueueClass(int channel) throws AMQPException
- {
- checkIfConnectionStarted();
- AMQPQueue amqpQueue = new AMQPQueue(channel, _phase);
- _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
- _eventManager.addMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
- _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
- _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
- _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
- return amqpQueue;
- }
+ public abstract AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException;
- public void destroyQueueClass(int channel, AMQPQueue amqpQueue) throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
- }
-
- public AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException
- {
- checkIfConnectionStarted();
- AMQPMessage amqpMessage = new AMQPMessage(channel, _phase, messageCb);
- _eventManager.addMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageGetBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageOkBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageQosBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
-
- return amqpMessage;
- }
-
- public void destoryMessageClass(int channel, AMQPMessage amqpMessage) throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageGetBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageOkBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageQosBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
- }
-
- //This class should register as a state listener for AMQPConnection
- private void checkIfConnectionStarted() throws AMQPException
- {
- if (_phase == null)
- {
- _phase = _amqpConnection.getPhasePipe();
-
- if (_phase == null)
- {
- throw new AMQPException("Cannot create a channel until connection is ready");
- }
- }
- }
+ public abstract void destoryMessageClass(int channel, QpidAMQPMessage amqpMessage) throws AMQPException;
/**
* Extention point
@@ -237,10 +60,7 @@ public class AMQPClassFactory
* and add listeners to get notified of events
*
*/
- public AMQPEventManager getEventManager()
- {
- return _eventManager;
- }
+ public abstract AMQPEventManager getEventManager();
/**
* Extention point
@@ -248,8 +68,6 @@ public class AMQPClassFactory
* and add listeners to get notified of state changes
*
*/
- public AMQPStateManager getStateManager()
- {
- return _stateManager;
- }
-}
+ public abstract AMQPStateManager getStateManager();
+
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
index 1d6541ec3e..99eb690ede 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
@@ -1,440 +1,40 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
package org.apache.qpid.nclient.amqp;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.log4j.Logger;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.framing.ConnectionOpenBody;
import org.apache.qpid.framing.ConnectionOpenOkBody;
-import org.apache.qpid.framing.ConnectionSecureBody;
import org.apache.qpid.framing.ConnectionSecureOkBody;
import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
-import org.apache.qpid.nclient.amqp.state.AMQPState;
-import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
-import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
-import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
-import org.apache.qpid.nclient.amqp.state.AMQPStateType;
-import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.transport.TransportConnection;
-import org.apache.qpid.nclient.util.AMQPValidator;
-/**
- * This maps directly to the Connection class defined in the AMQP protocol This class is a finite state machine and is
- * thread safe by design A particular method (state changing) can only be invoked once and only in sequence or else an
- * IllegalStateTransitionException will be thrown Also only one thread can enter those methods at a given time.
- */
-public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListener
+public interface AMQPConnection
{
- private static final Logger _logger = Logger.getLogger(AMQPConnection.class);
-
- private Phase _phase;
-
- private TransportConnection _connection;
-
- private long _correlationId;
-
- private AMQPState _currentState;
-
- private AMQPStateManager _stateManager;
-
- private final AMQPState[] _validCloseStates = new AMQPState[]
- { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED,
- AMQPState.CONNECTION_OPEN, };
-
- // The wait period until a server sends a respond
- private long _serverTimeOut = 1000;
-
- private final Lock _lock = new ReentrantLock();
-
- private final Condition _connectionNotStarted = _lock.newCondition();
-
- private final Condition _connectionNotSecure = _lock.newCondition();
-
- private final Condition _connectionNotTuned = _lock.newCondition();
-
- private final Condition _connectionNotOpened = _lock.newCondition();
-
- private final Condition _connectionNotClosed = _lock.newCondition();
-
- private ConnectionStartBody _connectionStartBody;
-
- private ConnectionSecureBody _connectionSecureBody;
-
- private ConnectionTuneBody _connectionTuneBody;
-
- private ConnectionOpenOkBody _connectionOpenOkBody;
-
- private ConnectionCloseOkBody _connectionCloseOkBody;
-
- private ConnectionCloseBody _connectionCloseBody;
-
- protected AMQPConnection(TransportConnection connection, AMQPStateManager stateManager)
- {
- _connection = connection;
- _stateManager = stateManager;
- _currentState = AMQPState.CONNECTION_UNDEFINED;
- _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
- }
-
- /**
- * -------------------------------------------
- * API Methods
- * --------------------------------------------
- */
/**
* Opens the TCP connection and let the formalities begin.
*/
- public ConnectionStartBody openTCPConnection() throws AMQPException
- {
- _lock.lock();
- // open the TCP connection
- try
- {
- _connectionStartBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED);
- _phase = _connection.connect();
-
- // waiting for ConnectionStartBody or error in connection
- //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotStarted.await();
-
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time");
- notifyState(AMQPState.CONNECTION_NOT_STARTED);
- _currentState = AMQPState.CONNECTION_NOT_STARTED;
- return _connectionStartBody;
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error opening connection to broker", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
+ public abstract ConnectionStartBody openTCPConnection() throws AMQPException;
/**
* The current java broker implementation can send a connection tune body
* as a response to the startOk. Not sure if that is the correct behaviour.
*/
- public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _connectionSecureBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
- _phase.messageSent(msg);
- // _connectionNotSecure.await(_serverTimeOut,TimeUnit.MILLISECONDS);
- _connectionNotSecure.await();
-
- checkIfConnectionClosed();
- if (_connectionTuneBody != null)
- {
- notifyState(AMQPState.CONNECTION_NOT_TUNED);
- _currentState = AMQPState.CONNECTION_NOT_TUNED;
- return _connectionTuneBody;
- }
- else if (_connectionSecureBody != null)
- { // oops the server sent another challenge
- notifyState(AMQPState.CONNECTION_NOT_SECURE);
- _currentState = AMQPState.CONNECTION_NOT_SECURE;
- return _connectionSecureBody;
- }
- else
- {
- throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
- }
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.startOk", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
+ public abstract AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException;
/**
* The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
* issue a new challenge
*/
- public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _connectionTuneBody = null;
- _connectionSecureBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
-
- _connectionSecureBody = null; // The server could send a fresh challenge
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
- _phase.messageSent(msg);
-
- //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotTuned.await();
- checkIfConnectionClosed();
-
- if (_connectionTuneBody != null)
- {
- notifyState(AMQPState.CONNECTION_NOT_TUNED);
- _currentState = AMQPState.CONNECTION_NOT_TUNED;
- return _connectionTuneBody;
- }
- else if (_connectionSecureBody != null)
- { // oops the server sent another challenge
- notifyState(AMQPState.CONNECTION_NOT_SECURE);
- _currentState = AMQPState.CONNECTION_NOT_SECURE;
- return _connectionSecureBody;
- }
- else
- {
- throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
- }
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.secureOk", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED);
- _connectionSecureBody = null;
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId);
- _phase.messageSent(msg);
- notifyState(AMQPState.CONNECTION_NOT_OPENED);
- _currentState = AMQPState.CONNECTION_NOT_OPENED;
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- // If the broker sends a connection close due to an error with the
- // Connection tune ok, then this call will verify that
- checkIfConnectionClosed();
-
- _connectionOpenOkBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotOpened.await();
-
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time");
- notifyState(AMQPState.CONNECTION_OPEN);
- _currentState = AMQPState.CONNECTION_OPEN;
- return _connectionOpenOkBody;
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.open", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _connectionCloseOkBody = null;
- checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
- _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time");
- notifyState(AMQPState.CONNECTION_CLOSED);
- _currentState = AMQPState.CONNECTION_CLOSED;
- return _connectionCloseOkBody;
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.close", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * -------------------------------------------
- * AMQMethodListener methods
- * --------------------------------------------
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- _lock.lock();
- try
- {
- _correlationId = evt.getCorrelationId();
-
- if (evt.getMethod() instanceof ConnectionStartBody)
- {
- _connectionStartBody = (ConnectionStartBody) evt.getMethod();
- _connectionNotStarted.signalAll();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionSecureBody)
- {
- _connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
- _connectionNotSecure.signal();
- _connectionNotTuned.signal(); // in case the server has sent another chanllenge
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionTuneBody)
- {
- _connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
- _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk
- _connectionNotTuned.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionOpenOkBody)
- {
- _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
- _connectionNotOpened.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionCloseOkBody)
- {
- _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
- _connectionNotClosed.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionCloseBody)
- {
- _connectionCloseBody = (ConnectionCloseBody) evt.getMethod();
- // release the correct lock as u may have some conditions waiting.
- // while an error occured and the broker has sent a close.
- releaseLocks();
- handleClose();
- return true;
- }
- else
- {
- return false;
- }
- }
- finally
- {
- _lock.unlock();
- }
- }
-
-
- public Phase getPhasePipe()
- {
- return _phase;
- }
-
- private void handleClose() throws AMQPException
- {
- try
- {
- notifyState(AMQPState.CONNECTION_CLOSING);
- _currentState = AMQPState.CONNECTION_CLOSING;
- // do the required cleanup and send a ConnectionCloseOkBody
- }
- catch (Exception e)
- {
- throw new AMQPException("Error handling connection.close from broker", e);
- }
- }
-
- private void checkIfConnectionClosed() throws AMQPException
- {
- if (_connectionCloseBody != null)
- {
- String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + " with reply code ("
- + _connectionCloseBody.getReplyCode() + ") " + "caused by class " + _connectionCloseBody.getClassId() + " and method "
- + _connectionCloseBody.getMethod();
+ public abstract AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException;
- throw new AMQPException(error);
- }
- }
+ public abstract void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException;
- private void releaseLocks()
- {
- if (_currentState == AMQPState.CONNECTION_NOT_OPENED)
- {
- _connectionNotOpened.signal();
- }
- else if (_currentState == AMQPState.CONNECTION_UNDEFINED)
- {
- _connectionNotStarted.signal();
- }
- else if (_currentState == AMQPState.CONNECTION_NOT_STARTED)
- {
- _connectionNotSecure.signal();
- }
- else if (_currentState == AMQPState.CONNECTION_NOT_SECURE)
- {
- _connectionNotTuned.signal();
- }
- }
+ public abstract ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException;
- private void notifyState(AMQPState newState) throws AMQPException
- {
- _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CONNECTION_STATE));
- }
+ public abstract ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException;
} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java
index 35db9c6a75..f59ad5acf5 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java
@@ -1,88 +1,19 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
package org.apache.qpid.nclient.amqp;
-import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
import org.apache.qpid.framing.ExchangeDeleteBody;
-import org.apache.qpid.framing.ExchangeDeleteOkBody;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.Phase;
-/**
- *
- * This class represents the Exchange class defined in AMQP.
- * Each method takes an @see AMQPCallBack object if it wants to know
- * the response from the broker to particular method.
- * Clients can handle the reponse asynchronously or block for a response
- * using AMQPCallBack.isComplete() periodically using a loop.
- */
-public class AMQPExchange extends AMQPCallBackSupport implements AMQPMethodListener
+public interface AMQPExchange
{
- private Phase _phase;
-
- protected AMQPExchange(int channelId,Phase phase)
- {
- super(channelId);
- _phase = phase;
- }
-
+
/**
* -----------------------------------------------
* API Methods
* -----------------------------------------------
*/
- public void declare(ExchangeDeclareBody exchangeDeclareBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleNoWait(exchangeDeclareBody.nowait,exchangeDeclareBody,cb);
- _phase.messageSent(msg);
- }
-
- public void delete(ExchangeDeleteBody exchangeDeleteBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleNoWait(exchangeDeleteBody.nowait,exchangeDeleteBody,cb);
- _phase.messageSent(msg);
- }
-
-
- /**-------------------------------------------
- * AMQPMethodListener methods
- *--------------------------------------------
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- long localCorrelationId = evt.getLocalCorrelationId();
- AMQMethodBody methodBody = evt.getMethod();
- if ( methodBody instanceof ExchangeDeclareOkBody || methodBody instanceof ExchangeDeleteOkBody)
- {
- invokeCallBack(localCorrelationId,methodBody);
- return true;
- }
- else
- {
- return false;
- }
- }
-}
+ public abstract void declare(ExchangeDeclareBody exchangeDeclareBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void delete(ExchangeDeleteBody exchangeDeleteBody, AMQPCallBack cb) throws AMQPException;
+
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java
index 1b86108411..6f21ca2507 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java
@@ -1,32 +1,10 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
package org.apache.qpid.nclient.amqp;
-import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.MessageAppendBody;
import org.apache.qpid.framing.MessageCancelBody;
import org.apache.qpid.framing.MessageCheckpointBody;
import org.apache.qpid.framing.MessageCloseBody;
import org.apache.qpid.framing.MessageConsumeBody;
-import org.apache.qpid.framing.MessageEmptyBody;
import org.apache.qpid.framing.MessageGetBody;
import org.apache.qpid.framing.MessageOffsetBody;
import org.apache.qpid.framing.MessageOkBody;
@@ -36,198 +14,61 @@ import org.apache.qpid.framing.MessageRecoverBody;
import org.apache.qpid.framing.MessageRejectBody;
import org.apache.qpid.framing.MessageResumeBody;
import org.apache.qpid.framing.MessageTransferBody;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.Phase;
-
-/**
- * This class represents the AMQP Message class.
- * You need an instance of this class per channel.
- * A @see AMQPMessageCallBack class is taken as an argument in the constructor.
- * A client can use this class to issue Message class methods on the broker.
- * When the broker issues Message class methods on the client, the client is notified
- * via the AMQPMessageCallBack interface.
- *
- * A JMS Message producer implementation can wrap an instance if this and map
- * JMS method calls to the appropriate AMQP methods.
- *
- * AMQPMessageCallBack can be implemented by the JMS MessageConsumer implementation.
- *
- */
-public class AMQPMessage extends AMQPCallBackSupport implements AMQPMethodListener
+
+public interface AMQPMessage
{
- private Phase _phase;
- private AMQPMessageCallBack _messageCb;
-
- protected AMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb)
- {
- super(channelId);
- _phase = phase;
- _messageCb = messageCb;
- }
-
+
/**
* -----------------------------------------------
* API Methods
* -----------------------------------------------
*/
-
- public void transfer(MessageTransferBody messageTransferBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageTransferBody,cb);
- _phase.messageSent(msg);
- }
-
- public void consume(MessageConsumeBody messageConsumeBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageConsumeBody,cb);
- _phase.messageSent(msg);
- }
-
- public void cancel(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb);
- _phase.messageSent(msg);
- }
-
- public void get(MessageGetBody messageGetBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageGetBody,cb);
- _phase.messageSent(msg);
- }
-
- public void recover(MessageRecoverBody messageRecoverBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageRecoverBody,cb);
- _phase.messageSent(msg);
- }
-
- public void open(MessageOpenBody messageOpenBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageOpenBody,cb);
- _phase.messageSent(msg);
- }
-
- public void close(MessageCloseBody messageCloseBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageCloseBody,cb);
- _phase.messageSent(msg);
- }
-
- public void append(MessageAppendBody messageAppendBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageAppendBody,cb);
- _phase.messageSent(msg);
- }
-
- public void checkpoint(MessageCheckpointBody messageCheckpointBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageCheckpointBody,cb);
- _phase.messageSent(msg);
- }
-
- public void resume(MessageResumeBody messageResumeBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageResumeBody,cb);
- _phase.messageSent(msg);
- }
-
- public void qos(MessageQosBody messageQosBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageQosBody,cb);
- _phase.messageSent(msg);
- }
-
+
+ public abstract void transfer(MessageTransferBody messageTransferBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void consume(MessageConsumeBody messageConsumeBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void cancel(MessageCancelBody messageCancelBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void get(MessageGetBody messageGetBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void recover(MessageRecoverBody messageRecoverBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void open(MessageOpenBody messageOpenBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void close(MessageCloseBody messageCloseBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void append(MessageAppendBody messageAppendBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void checkpoint(MessageCheckpointBody messageCheckpointBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void resume(MessageResumeBody messageResumeBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void qos(MessageQosBody messageQosBody, AMQPCallBack cb) throws AMQPException;
+
/**
* The correlationId from the request.
* For example if a message.transfer is sent with correlationId "ABCD"
* then u need to pass that in. This correlation id is used by the execution layer
* to handle the correlation of method requests and responses
*/
- public void ok(MessageOkBody messageOkBody,long correlationId) throws AMQPException
- {
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOkBody,correlationId);
- _phase.messageSent(msg);
- }
-
+ public abstract void ok(MessageOkBody messageOkBody, long correlationId) throws AMQPException;
+
/**
* The correlationId from the request.
* For example if a message.transfer is sent with correlationId "ABCD"
* then u need to pass that in. This correlation id is used by the execution layer
* to handle the correlation of method requests and responses
*/
- public void reject(MessageRejectBody messageRejectBody,long correlationId) throws AMQPException
- {
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageRejectBody,correlationId);
- _phase.messageSent(msg);
- }
-
+ public abstract void reject(MessageRejectBody messageRejectBody, long correlationId) throws AMQPException;
+
/**
* The correlationId from the request.
* For example if a message.resume is sent with correlationId "ABCD"
* then u need to pass that in. This correlation id is used by the execution layer
* to handle the correlation of method requests and responses
*/
- public void offset(MessageOffsetBody messageOffsetBody,long correlationId) throws AMQPException
- {
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOffsetBody,correlationId);
- _phase.messageSent(msg);
- }
-
- /**-------------------------------------------
- * AMQPMethodListener methods
- *--------------------------------------------
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- long localCorrelationId = evt.getLocalCorrelationId();
- AMQMethodBody methodBody = evt.getMethod();
- if ( methodBody instanceof MessageOkBody ||
- methodBody instanceof MessageRejectBody ||
- methodBody instanceof MessageEmptyBody)
- {
- invokeCallBack(localCorrelationId,methodBody);
- return true;
- }
- else if (methodBody instanceof MessageTransferBody)
- {
- _messageCb.transfer((MessageTransferBody)methodBody, evt.getCorrelationId());
- return true;
- }
- else if (methodBody instanceof MessageAppendBody)
- {
- _messageCb.append((MessageAppendBody)methodBody, evt.getCorrelationId());
- return true;
- }
- else if (methodBody instanceof MessageOpenBody)
- {
- _messageCb.open((MessageOpenBody)methodBody, evt.getCorrelationId());
- return true;
- }
- else if (methodBody instanceof MessageCloseBody)
- {
- _messageCb.close((MessageCloseBody)methodBody, evt.getCorrelationId());
- return true;
- }
- else if (methodBody instanceof MessageCheckpointBody)
- {
- _messageCb.checkpoint((MessageCheckpointBody)methodBody, evt.getCorrelationId());
- return true;
- }
- else if (methodBody instanceof MessageRecoverBody)
- {
- _messageCb.recover((MessageRecoverBody)methodBody, evt.getCorrelationId());
- return true;
- }
- else if (methodBody instanceof MessageResumeBody)
- {
- _messageCb.resume((MessageResumeBody)methodBody, evt.getCorrelationId());
- return true;
- }
- else
- {
- return false;
- }
- }
-}
+ public abstract void offset(MessageOffsetBody messageOffsetBody, long correlationId) throws AMQPException;
+
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java
index cfe87cb3eb..4559b1f96c 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java
@@ -1,117 +1,29 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
package org.apache.qpid.nclient.amqp;
-import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueBindOkBody;
import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
import org.apache.qpid.framing.QueuePurgeBody;
-import org.apache.qpid.framing.QueuePurgeOkBody;
import org.apache.qpid.framing.QueueUnbindBody;
-import org.apache.qpid.framing.QueueUnbindOkBody;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.Phase;
-/**
- *
- * This class represents the Queue class defined in AMQP.
- * Each method takes an @see AMQPCallBack object if it wants to know
- * the response from the broker to a particular method.
- * Clients can handle the reponse asynchronously or block for a response
- * using AMQPCallBack.isComplete() periodically using a loop.
- */
-public class AMQPQueue extends AMQPCallBackSupport implements AMQPMethodListener
+public interface AMQPQueue
{
- private Phase _phase;
- protected AMQPQueue(int channelId,Phase phase)
- {
- super(channelId);
- _phase = phase;
- }
-
/**
* -----------------------------------------------
* API Methods
* -----------------------------------------------
*/
- public void declare(QueueDeclareBody queueDeclareBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleNoWait(queueDeclareBody.nowait,queueDeclareBody,cb);
- _phase.messageSent(msg);
- }
-
- public void bind(QueueBindBody queueBindBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleNoWait(queueBindBody.nowait,queueBindBody,cb);
- _phase.messageSent(msg);
- }
-
+ public abstract void declare(QueueDeclareBody queueDeclareBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void bind(QueueBindBody queueBindBody, AMQPCallBack cb) throws AMQPException;
+
// Queue.unbind doesn't have nowait
- public void unbind(QueueUnbindBody queueUnbindBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(queueUnbindBody,cb);
- _phase.messageSent(msg);
- }
-
- public void purge(QueuePurgeBody queuePurgeBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleNoWait(queuePurgeBody.nowait,queuePurgeBody,cb);
- _phase.messageSent(msg);
- }
+ public abstract void unbind(QueueUnbindBody queueUnbindBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void purge(QueuePurgeBody queuePurgeBody, AMQPCallBack cb) throws AMQPException;
- public void delete(QueueDeleteBody queueDeleteBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleNoWait(queueDeleteBody.nowait,queueDeleteBody,cb);
- _phase.messageSent(msg);
- }
+ public abstract void delete(QueueDeleteBody queueDeleteBody, AMQPCallBack cb) throws AMQPException;
-
- /**-------------------------------------------
- * AMQPMethodListener methods
- *--------------------------------------------
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- long localCorrelationId = evt.getLocalCorrelationId();
- AMQMethodBody methodBody = evt.getMethod();
- if ( methodBody instanceof QueueDeclareOkBody ||
- methodBody instanceof QueueBindOkBody ||
- methodBody instanceof QueueUnbindOkBody ||
- methodBody instanceof QueuePurgeOkBody ||
- methodBody instanceof QueueDeleteOkBody
- )
- {
- invokeCallBack(localCorrelationId,methodBody);
- return true;
- }
- else
- {
- return false;
- }
- }
-}
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
new file mode 100644
index 0000000000..b73aa8e6fa
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
@@ -0,0 +1,370 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.nclient.amqp.qpid;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ChannelOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ChannelResumeBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.nclient.amqp.AMQPChannel;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
+import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.util.AMQPValidator;
+
+/**
+ * This represents the Channel class defined in the AMQP protocol. This class is a finite state machine and is thread
+ * safe by design. Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown. Only
+ * one thread can enter the methods that change state, at a given time. The AMQP protocol recommends one thread per
+ * channel by design.
+ *
+ * A JMS Session can wrap an instance of this class.
+ */
+
+public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListener, AMQPChannel
+{
+ private static final Logger _logger = Logger.getLogger(QpidAMQPChannel.class);
+
+ // the channelId assigned for this channel
+ private int _channelId;
+
+ private Phase _phase;
+
+ private AMQPState _currentState;
+
+ private AMQPStateManager _stateManager;
+
+ private final AMQPState[] _validCloseStates = new AMQPState[]
+ { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND };
+
+ private final AMQPState[] _validResumeStates = new AMQPState[]
+ { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED };
+
+ // The wait period until a server sends a respond
+ private long _serverTimeOut = 1000;
+
+ private final Lock _lock = new ReentrantLock();
+
+ private final Condition _channelNotOpend = _lock.newCondition();
+
+ private final Condition _channelNotClosed = _lock.newCondition();
+
+ private final Condition _channelFlowNotResponded = _lock.newCondition();
+
+ private final Condition _channelNotResumed = _lock.newCondition();
+
+ private ChannelOpenOkBody _channelOpenOkBody;
+
+ private ChannelCloseOkBody _channelCloseOkBody;
+
+ private ChannelFlowOkBody _channelFlowOkBody;
+
+ private ChannelOkBody _channelOkBody;
+
+ private ChannelCloseBody _channelCloseBody;
+
+ protected QpidAMQPChannel(int channelId, Phase phase, AMQPStateManager stateManager)
+ {
+ _channelId = channelId;
+ _phase = phase;
+ _stateManager = stateManager;
+ _currentState = AMQPState.CHANNEL_NOT_OPENED;
+ _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+ }
+
+ /**
+ * -------------------------------------------
+ * API Methods
+ * --------------------------------------------
+ */
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPChannel#open(org.apache.qpid.framing.ChannelOpenBody)
+ */
+ public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelOpenOkBody = null;
+ checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelNotOpend.await();
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
+ notifyState(AMQPState.CHANNEL_OPENED);
+ _currentState = AMQPState.CHANNEL_OPENED;
+ return _channelOpenOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.open", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPChannel#close(org.apache.qpid.framing.ChannelCloseBody)
+ */
+ public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelCloseOkBody = null;
+ checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelNotClosed.await();
+ AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time");
+ notifyState(AMQPState.CHANNEL_CLOSED);
+ _currentState = AMQPState.CHANNEL_CLOSED;
+ return _channelCloseOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.close", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPChannel#flow(org.apache.qpid.framing.ChannelFlowBody)
+ */
+ public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelFlowOkBody = null;
+ if (channelFlowBody.active)
+ {
+ checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED);
+ }
+ else
+ {
+ checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND);
+ }
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelFlowNotResponded.await();
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
+ handleChannelFlowState(_channelFlowOkBody.active);
+ return _channelFlowOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.flow", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPChannel#resume(org.apache.qpid.framing.ChannelResumeBody)
+ */
+ public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelOkBody = null;
+ checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelNotResumed.await();
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_channelOkBody,
+ "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
+ notifyState(AMQPState.CHANNEL_OPENED);
+ _currentState = AMQPState.CHANNEL_OPENED;
+ return _channelOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.resume", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * -------------------------------------------
+ * AMQPMethodListener methods
+ * --------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ if (evt.getMethod() instanceof ChannelOpenOkBody)
+ {
+ _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod();
+ _channelNotOpend.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelCloseOkBody)
+ {
+ _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod();
+ _channelNotClosed.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelCloseBody)
+ {
+ _channelCloseBody = (ChannelCloseBody) evt.getMethod();
+ // release the correct lock as u may have some conditions waiting.
+ // while an error occured and the broker has sent a close.
+ releaseLocks();
+ handleChannelClose(_channelCloseBody);
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelFlowOkBody)
+ {
+ _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod();
+ _channelFlowNotResponded.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelFlowBody)
+ {
+ handleChannelFlow((ChannelFlowBody) evt.getMethod());
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelOkBody)
+ {
+ _channelOkBody = (ChannelOkBody) evt.getMethod();
+ // In this case the only method expecting channel-ok is channel-resume
+ // haven't implemented ping and pong.
+ _channelNotResumed.signal();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ private void handleChannelClose(ChannelCloseBody channelCloseBody) throws AMQPException
+ {
+ notifyState(AMQPState.CHANNEL_CLOSED);
+ _currentState = AMQPState.CHANNEL_CLOSED;
+ // handle channel related cleanup
+ }
+
+ private void releaseLocks()
+ {
+ if (_currentState == AMQPState.CHANNEL_NOT_OPENED)
+ {
+ _channelNotOpend.signal();
+ _channelNotResumed.signal(); // It could be a channel.resume call
+ }
+ else if (_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND)
+ {
+ _channelFlowNotResponded.signal();
+ }
+ else if (_currentState == AMQPState.CHANNEL_CLOSED)
+ {
+ _channelNotResumed.signal();
+ }
+ }
+
+ private void checkIfConnectionClosed() throws AMQPException
+ {
+ if (_channelCloseBody != null)
+ {
+ String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() + " with reply code ("
+ + _channelCloseBody.getReplyCode() + ") " + "caused by class " + _channelCloseBody.getClassId() + " and method "
+ + _channelCloseBody.getMethod();
+
+ throw new AMQPException(error);
+ }
+ }
+
+ private void handleChannelFlow(ChannelFlowBody channelFlowBody)throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ handleChannelFlowState(channelFlowBody.active);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ private void handleChannelFlowState(boolean flow)throws AMQPException
+ {
+ notifyState((flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND);
+ _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND;
+ }
+
+ private void notifyState(AMQPState newState) throws AMQPException
+ {
+ _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE));
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java
new file mode 100644
index 0000000000..809aa57dab
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java
@@ -0,0 +1,286 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.qpid;
+
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ChannelOkBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.framing.MessageOffsetBody;
+import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageQosBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageRejectBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.framing.QueueUnbindOkBody;
+import org.apache.qpid.nclient.amqp.AMQPChannel;
+import org.apache.qpid.nclient.amqp.AMQPClassFactory;
+import org.apache.qpid.nclient.amqp.AMQPConnection;
+import org.apache.qpid.nclient.amqp.AMQPExchange;
+import org.apache.qpid.nclient.amqp.AMQPMessage;
+import org.apache.qpid.nclient.amqp.AMQPMessageCallBack;
+import org.apache.qpid.nclient.amqp.AMQPQueue;
+import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.DefaultPhaseContext;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.PhaseContext;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.transport.AMQPConnectionURL;
+import org.apache.qpid.nclient.transport.ConnectionURL;
+import org.apache.qpid.nclient.transport.TransportConnection;
+import org.apache.qpid.nclient.transport.TransportConnectionFactory;
+import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * The Class Factory creates AMQP Class
+ * equivalents defined in the spec.
+ *
+ * There should one instance per connection.
+ * The factory class creates all the support
+ * classes and provides an instance of the
+ * AMQP class in ready-to-use state.
+ *
+ */
+public class QpidAMQPClassFactory implements AMQPClassFactory
+{
+ //Need an event manager per connection
+ private AMQPEventManager _eventManager = new QpidEventManager();
+
+ // Need a state manager per connection
+ private AMQPStateManager _stateManager = new QpidStateManager();
+
+ //Need a phase pipe per connection
+ private Phase _phase;
+
+ //One instance per connection
+ private QpidAMQPConnection _amqpConnection;
+
+ public QpidAMQPClassFactory()
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createConnection(java.lang.String, org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType)
+ */
+ public AMQPConnection createConnectionClass(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException
+ {
+ AMQPConnectionURL url = new AMQPConnectionURL(urlStr);
+ return createConnectionClass(url, type);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createConnectionClass(org.apache.qpid.nclient.transport.ConnectionURL, org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType)
+ */
+ public AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException
+ {
+ if (_amqpConnection == null)
+ {
+ PhaseContext ctx = new DefaultPhaseContext();
+ ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager);
+
+ TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type, ctx);
+ _amqpConnection = new QpidAMQPConnection(conn, _stateManager);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection);
+ }
+ return _amqpConnection;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createChannelClass(int)
+ */
+ public AMQPChannel createChannelClass(int channel) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ QpidAMQPChannel amqpChannel = new QpidAMQPChannel(channel, _phase,_stateManager);
+ _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
+ return amqpChannel;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destroyChannelClass(int, org.apache.qpid.nclient.amqp.AMQPChannel)
+ */
+ public void destroyChannelClass(int channel, QpidAMQPChannel amqpChannel) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createExchangeClass(int)
+ */
+ public AMQPExchange createExchangeClass(int channel) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ QpidAMQPExchange amqpExchange = new QpidAMQPExchange(channel, _phase);
+ _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
+ _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
+ return amqpExchange;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destoryExchangeClass(int, org.apache.qpid.nclient.amqp.AMQPExchange)
+ */
+ public void destoryExchangeClass(int channel, QpidAMQPExchange amqpExchange) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
+ _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createQueueClass(int)
+ */
+ public AMQPQueue createQueueClass(int channel) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ QpidAMQPQueue amqpQueue = new QpidAMQPQueue(channel, _phase);
+ _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
+ return amqpQueue;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destroyQueueClass(int, org.apache.qpid.nclient.amqp.AMQPQueue)
+ */
+ public void destroyQueueClass(int channel, QpidAMQPQueue amqpQueue) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createMessageClass(int, org.apache.qpid.nclient.amqp.AMQPMessageCallBack)
+ */
+ public AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ QpidAMQPMessage amqpMessage = new QpidAMQPMessage(channel, _phase, messageCb);
+ _eventManager.addMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageGetBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOkBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageQosBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
+
+ return amqpMessage;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destoryMessageClass(int, org.apache.qpid.nclient.amqp.AMQPMessage)
+ */
+ public void destoryMessageClass(int channel, QpidAMQPMessage amqpMessage) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageGetBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOkBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageQosBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
+ }
+
+ //This class should register as a state listener for AMQPConnection
+ private void checkIfConnectionStarted() throws AMQPException
+ {
+ if (_phase == null)
+ {
+ _phase = _amqpConnection.getPhasePipe();
+
+ if (_phase == null)
+ {
+ throw new AMQPException("Cannot create a channel until connection is ready");
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#getEventManager()
+ */
+ public AMQPEventManager getEventManager()
+ {
+ return _eventManager;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#getStateManager()
+ */
+ public AMQPStateManager getStateManager()
+ {
+ return _stateManager;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java
new file mode 100644
index 0000000000..9b4d776cc5
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java
@@ -0,0 +1,448 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.qpid;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.nclient.amqp.AMQPConnection;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
+import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.transport.TransportConnection;
+import org.apache.qpid.nclient.util.AMQPValidator;
+
+/**
+ * This maps directly to the Connection class defined in the AMQP protocol This class is a finite state machine and is
+ * thread safe by design A particular method (state changing) can only be invoked once and only in sequence or else an
+ * IllegalStateTransitionException will be thrown Also only one thread can enter those methods at a given time.
+ */
+public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodListener, AMQPConnection
+{
+ private static final Logger _logger = Logger.getLogger(QpidAMQPConnection.class);
+
+ private Phase _phase;
+
+ private TransportConnection _connection;
+
+ private long _correlationId;
+
+ private AMQPState _currentState;
+
+ private AMQPStateManager _stateManager;
+
+ private final AMQPState[] _validCloseStates = new AMQPState[]
+ { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED,
+ AMQPState.CONNECTION_OPEN, };
+
+ // The wait period until a server sends a respond
+ private long _serverTimeOut = 1000;
+
+ private final Lock _lock = new ReentrantLock();
+
+ private final Condition _connectionNotStarted = _lock.newCondition();
+
+ private final Condition _connectionNotSecure = _lock.newCondition();
+
+ private final Condition _connectionNotTuned = _lock.newCondition();
+
+ private final Condition _connectionNotOpened = _lock.newCondition();
+
+ private final Condition _connectionNotClosed = _lock.newCondition();
+
+ private ConnectionStartBody _connectionStartBody;
+
+ private ConnectionSecureBody _connectionSecureBody;
+
+ private ConnectionTuneBody _connectionTuneBody;
+
+ private ConnectionOpenOkBody _connectionOpenOkBody;
+
+ private ConnectionCloseOkBody _connectionCloseOkBody;
+
+ private ConnectionCloseBody _connectionCloseBody;
+
+ protected QpidAMQPConnection(TransportConnection connection, AMQPStateManager stateManager)
+ {
+ _connection = connection;
+ _stateManager = stateManager;
+ _currentState = AMQPState.CONNECTION_UNDEFINED;
+ _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+ }
+
+ /**
+ * -------------------------------------------
+ * API Methods
+ * --------------------------------------------
+ */
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPConnection#openTCPConnection()
+ */
+ public ConnectionStartBody openTCPConnection() throws AMQPException
+ {
+ _lock.lock();
+ // open the TCP connection
+ try
+ {
+ _connectionStartBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED);
+ _phase = _connection.connect();
+
+ // waiting for ConnectionStartBody or error in connection
+ //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotStarted.await();
+
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time");
+ notifyState(AMQPState.CONNECTION_NOT_STARTED);
+ _currentState = AMQPState.CONNECTION_NOT_STARTED;
+ return _connectionStartBody;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error opening connection to broker", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPConnection#startOk(org.apache.qpid.framing.ConnectionStartOkBody)
+ */
+ public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _connectionSecureBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
+ _phase.messageSent(msg);
+ // _connectionNotSecure.await(_serverTimeOut,TimeUnit.MILLISECONDS);
+ _connectionNotSecure.await();
+
+ checkIfConnectionClosed();
+ if (_connectionTuneBody != null)
+ {
+ notifyState(AMQPState.CONNECTION_NOT_TUNED);
+ _currentState = AMQPState.CONNECTION_NOT_TUNED;
+ return _connectionTuneBody;
+ }
+ else if (_connectionSecureBody != null)
+ { // oops the server sent another challenge
+ notifyState(AMQPState.CONNECTION_NOT_SECURE);
+ _currentState = AMQPState.CONNECTION_NOT_SECURE;
+ return _connectionSecureBody;
+ }
+ else
+ {
+ throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.startOk", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPConnection#secureOk(org.apache.qpid.framing.ConnectionSecureOkBody)
+ */
+ public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _connectionTuneBody = null;
+ _connectionSecureBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
+
+ _connectionSecureBody = null; // The server could send a fresh challenge
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
+ _phase.messageSent(msg);
+
+ //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotTuned.await();
+ checkIfConnectionClosed();
+
+ if (_connectionTuneBody != null)
+ {
+ notifyState(AMQPState.CONNECTION_NOT_TUNED);
+ _currentState = AMQPState.CONNECTION_NOT_TUNED;
+ return _connectionTuneBody;
+ }
+ else if (_connectionSecureBody != null)
+ { // oops the server sent another challenge
+ notifyState(AMQPState.CONNECTION_NOT_SECURE);
+ _currentState = AMQPState.CONNECTION_NOT_SECURE;
+ return _connectionSecureBody;
+ }
+ else
+ {
+ throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.secureOk", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPConnection#tuneOk(org.apache.qpid.framing.ConnectionTuneOkBody)
+ */
+ public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED);
+ _connectionSecureBody = null;
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId);
+ _phase.messageSent(msg);
+ notifyState(AMQPState.CONNECTION_NOT_OPENED);
+ _currentState = AMQPState.CONNECTION_NOT_OPENED;
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPConnection#open(org.apache.qpid.framing.ConnectionOpenBody)
+ */
+ public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ // If the broker sends a connection close due to an error with the
+ // Connection tune ok, then this call will verify that
+ checkIfConnectionClosed();
+
+ _connectionOpenOkBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotOpened.await();
+
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time");
+ notifyState(AMQPState.CONNECTION_OPEN);
+ _currentState = AMQPState.CONNECTION_OPEN;
+ return _connectionOpenOkBody;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.open", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPConnection#close(org.apache.qpid.framing.ConnectionCloseBody)
+ */
+ public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _connectionCloseOkBody = null;
+ checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+ _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time");
+ notifyState(AMQPState.CONNECTION_CLOSED);
+ _currentState = AMQPState.CONNECTION_CLOSED;
+ return _connectionCloseOkBody;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.close", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * -------------------------------------------
+ * AMQMethodListener methods
+ * --------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _correlationId = evt.getCorrelationId();
+
+ if (evt.getMethod() instanceof ConnectionStartBody)
+ {
+ _connectionStartBody = (ConnectionStartBody) evt.getMethod();
+ _connectionNotStarted.signalAll();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionSecureBody)
+ {
+ _connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
+ _connectionNotSecure.signal();
+ _connectionNotTuned.signal(); // in case the server has sent another chanllenge
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionTuneBody)
+ {
+ _connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
+ _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk
+ _connectionNotTuned.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionOpenOkBody)
+ {
+ _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
+ _connectionNotOpened.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionCloseOkBody)
+ {
+ _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
+ _connectionNotClosed.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionCloseBody)
+ {
+ _connectionCloseBody = (ConnectionCloseBody) evt.getMethod();
+ // release the correct lock as u may have some conditions waiting.
+ // while an error occured and the broker has sent a close.
+ releaseLocks();
+ handleClose();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+
+ public Phase getPhasePipe()
+ {
+ return _phase;
+ }
+
+ private void handleClose() throws AMQPException
+ {
+ try
+ {
+ notifyState(AMQPState.CONNECTION_CLOSING);
+ _currentState = AMQPState.CONNECTION_CLOSING;
+ // do the required cleanup and send a ConnectionCloseOkBody
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error handling connection.close from broker", e);
+ }
+ }
+
+ private void checkIfConnectionClosed() throws AMQPException
+ {
+ if (_connectionCloseBody != null)
+ {
+ String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + " with reply code ("
+ + _connectionCloseBody.getReplyCode() + ") " + "caused by class " + _connectionCloseBody.getClassId() + " and method "
+ + _connectionCloseBody.getMethod();
+
+ throw new AMQPException(error);
+ }
+ }
+
+ private void releaseLocks()
+ {
+ if (_currentState == AMQPState.CONNECTION_NOT_OPENED)
+ {
+ _connectionNotOpened.signal();
+ }
+ else if (_currentState == AMQPState.CONNECTION_UNDEFINED)
+ {
+ _connectionNotStarted.signal();
+ }
+ else if (_currentState == AMQPState.CONNECTION_NOT_STARTED)
+ {
+ _connectionNotSecure.signal();
+ }
+ else if (_currentState == AMQPState.CONNECTION_NOT_SECURE)
+ {
+ _connectionNotTuned.signal();
+ }
+ }
+
+ private void notifyState(AMQPState newState) throws AMQPException
+ {
+ _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CONNECTION_STATE));
+ }
+
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java
new file mode 100644
index 0000000000..02b22e0755
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.nclient.amqp.qpid;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.ExchangeDeleteBody;
+import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.nclient.amqp.AMQPCallBack;
+import org.apache.qpid.nclient.amqp.AMQPCallBackSupport;
+import org.apache.qpid.nclient.amqp.AMQPExchange;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+
+/**
+ *
+ * This class represents the Exchange class defined in AMQP.
+ * Each method takes an @see AMQPCallBack object if it wants to know
+ * the response from the broker to particular method.
+ * Clients can handle the reponse asynchronously or block for a response
+ * using AMQPCallBack.isComplete() periodically using a loop.
+ */
+public class QpidAMQPExchange extends AMQPCallBackSupport implements AMQPMethodListener, AMQPExchange
+{
+ private Phase _phase;
+
+ protected QpidAMQPExchange(int channelId,Phase phase)
+ {
+ super(channelId);
+ _phase = phase;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPExchange#declare(org.apache.qpid.framing.ExchangeDeclareBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void declare(ExchangeDeclareBody exchangeDeclareBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleNoWait(exchangeDeclareBody.nowait,exchangeDeclareBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPExchange#delete(org.apache.qpid.framing.ExchangeDeleteBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void delete(ExchangeDeleteBody exchangeDeleteBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleNoWait(exchangeDeleteBody.nowait,exchangeDeleteBody,cb);
+ _phase.messageSent(msg);
+ }
+
+
+ /**-------------------------------------------
+ * AMQPMethodListener methods
+ *--------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ long localCorrelationId = evt.getLocalCorrelationId();
+ AMQMethodBody methodBody = evt.getMethod();
+ if ( methodBody instanceof ExchangeDeclareOkBody || methodBody instanceof ExchangeDeleteOkBody)
+ {
+ invokeCallBack(localCorrelationId,methodBody);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java
new file mode 100644
index 0000000000..e40c3cefa2
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java
@@ -0,0 +1,256 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.qpid;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageConsumeBody;
+import org.apache.qpid.framing.MessageEmptyBody;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.framing.MessageOffsetBody;
+import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageQosBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageRejectBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.nclient.amqp.AMQPCallBack;
+import org.apache.qpid.nclient.amqp.AMQPCallBackSupport;
+import org.apache.qpid.nclient.amqp.AMQPMessage;
+import org.apache.qpid.nclient.amqp.AMQPMessageCallBack;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+
+/**
+ * This class represents the AMQP Message class.
+ * You need an instance of this class per channel.
+ * A @see AMQPMessageCallBack class is taken as an argument in the constructor.
+ * A client can use this class to issue Message class methods on the broker.
+ * When the broker issues Message class methods on the client, the client is notified
+ * via the AMQPMessageCallBack interface.
+ *
+ * A JMS Message producer implementation can wrap an instance if this and map
+ * JMS method calls to the appropriate AMQP methods.
+ *
+ * AMQPMessageCallBack can be implemented by the JMS MessageConsumer implementation.
+ *
+ */
+public class QpidAMQPMessage extends AMQPCallBackSupport implements AMQPMethodListener, AMQPMessage
+{
+ private Phase _phase;
+ private AMQPMessageCallBack _messageCb;
+
+ protected QpidAMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb)
+ {
+ super(channelId);
+ _phase = phase;
+ _messageCb = messageCb;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#transfer(org.apache.qpid.framing.MessageTransferBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+
+ public void transfer(MessageTransferBody messageTransferBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageTransferBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#consume(org.apache.qpid.framing.MessageConsumeBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void consume(MessageConsumeBody messageConsumeBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageConsumeBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#cancel(org.apache.qpid.framing.MessageCancelBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void cancel(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#get(org.apache.qpid.framing.MessageGetBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void get(MessageGetBody messageGetBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageGetBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#recover(org.apache.qpid.framing.MessageRecoverBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void recover(MessageRecoverBody messageRecoverBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageRecoverBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#open(org.apache.qpid.framing.MessageOpenBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void open(MessageOpenBody messageOpenBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageOpenBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#close(org.apache.qpid.framing.MessageCloseBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void close(MessageCloseBody messageCloseBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageCloseBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#append(org.apache.qpid.framing.MessageAppendBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void append(MessageAppendBody messageAppendBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageAppendBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#checkpoint(org.apache.qpid.framing.MessageCheckpointBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void checkpoint(MessageCheckpointBody messageCheckpointBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageCheckpointBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#resume(org.apache.qpid.framing.MessageResumeBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void resume(MessageResumeBody messageResumeBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageResumeBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#qos(org.apache.qpid.framing.MessageQosBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void qos(MessageQosBody messageQosBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageQosBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#ok(org.apache.qpid.framing.MessageOkBody, long)
+ */
+ public void ok(MessageOkBody messageOkBody,long correlationId) throws AMQPException
+ {
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOkBody,correlationId);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#reject(org.apache.qpid.framing.MessageRejectBody, long)
+ */
+ public void reject(MessageRejectBody messageRejectBody,long correlationId) throws AMQPException
+ {
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageRejectBody,correlationId);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#offset(org.apache.qpid.framing.MessageOffsetBody, long)
+ */
+ public void offset(MessageOffsetBody messageOffsetBody,long correlationId) throws AMQPException
+ {
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOffsetBody,correlationId);
+ _phase.messageSent(msg);
+ }
+
+ /**-------------------------------------------
+ * AMQPMethodListener methods
+ *--------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ long localCorrelationId = evt.getLocalCorrelationId();
+ AMQMethodBody methodBody = evt.getMethod();
+ if ( methodBody instanceof MessageOkBody ||
+ methodBody instanceof MessageRejectBody ||
+ methodBody instanceof MessageEmptyBody)
+ {
+ invokeCallBack(localCorrelationId,methodBody);
+ return true;
+ }
+ else if (methodBody instanceof MessageTransferBody)
+ {
+ _messageCb.transfer((MessageTransferBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else if (methodBody instanceof MessageAppendBody)
+ {
+ _messageCb.append((MessageAppendBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else if (methodBody instanceof MessageOpenBody)
+ {
+ _messageCb.open((MessageOpenBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else if (methodBody instanceof MessageCloseBody)
+ {
+ _messageCb.close((MessageCloseBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else if (methodBody instanceof MessageCheckpointBody)
+ {
+ _messageCb.checkpoint((MessageCheckpointBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else if (methodBody instanceof MessageRecoverBody)
+ {
+ _messageCb.recover((MessageRecoverBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else if (methodBody instanceof MessageResumeBody)
+ {
+ _messageCb.resume((MessageResumeBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java
new file mode 100644
index 0000000000..323ff0cf06
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.qpid;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.QueuePurgeBody;
+import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.framing.QueueUnbindBody;
+import org.apache.qpid.framing.QueueUnbindOkBody;
+import org.apache.qpid.nclient.amqp.AMQPCallBack;
+import org.apache.qpid.nclient.amqp.AMQPCallBackSupport;
+import org.apache.qpid.nclient.amqp.AMQPQueue;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+
+/**
+ *
+ * This class represents the Queue class defined in AMQP.
+ * Each method takes an @see AMQPCallBack object if it wants to know
+ * the response from the broker to a particular method.
+ * Clients can handle the reponse asynchronously or block for a response
+ * using AMQPCallBack.isComplete() periodically using a loop.
+ */
+public class QpidAMQPQueue extends AMQPCallBackSupport implements AMQPMethodListener, AMQPQueue
+{
+ private Phase _phase;
+
+ protected QpidAMQPQueue(int channelId,Phase phase)
+ {
+ super(channelId);
+ _phase = phase;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPQueue#declare(org.apache.qpid.framing.QueueDeclareBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void declare(QueueDeclareBody queueDeclareBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleNoWait(queueDeclareBody.nowait,queueDeclareBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPQueue#bind(org.apache.qpid.framing.QueueBindBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void bind(QueueBindBody queueBindBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleNoWait(queueBindBody.nowait,queueBindBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ // Queue.unbind doesn't have nowait
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPQueue#unbind(org.apache.qpid.framing.QueueUnbindBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void unbind(QueueUnbindBody queueUnbindBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(queueUnbindBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPQueue#purge(org.apache.qpid.framing.QueuePurgeBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void purge(QueuePurgeBody queuePurgeBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleNoWait(queuePurgeBody.nowait,queuePurgeBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPQueue#delete(org.apache.qpid.framing.QueueDeleteBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void delete(QueueDeleteBody queueDeleteBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleNoWait(queueDeleteBody.nowait,queueDeleteBody,cb);
+ _phase.messageSent(msg);
+ }
+
+
+ /**-------------------------------------------
+ * AMQPMethodListener methods
+ *--------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ long localCorrelationId = evt.getLocalCorrelationId();
+ AMQMethodBody methodBody = evt.getMethod();
+ if ( methodBody instanceof QueueDeclareOkBody ||
+ methodBody instanceof QueueBindOkBody ||
+ methodBody instanceof QueueUnbindOkBody ||
+ methodBody instanceof QueuePurgeOkBody ||
+ methodBody instanceof QueueDeleteOkBody
+ )
+ {
+ invokeCallBack(localCorrelationId,methodBody);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java
index 9655da6912..902c80fe77 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.nclient.amqp;
+package org.apache.qpid.nclient.amqp.qpid;
import java.util.ArrayList;
import java.util.List;
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java
index 20c4921582..3fe1ee4cfd 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.nclient.amqp;
+package org.apache.qpid.nclient.amqp.qpid;
import java.util.ArrayList;
import java.util.List;
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
index f994a99ec3..4d4fadcc22 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
@@ -65,6 +65,7 @@ import org.apache.qpid.nclient.amqp.AMQPConnection;
import org.apache.qpid.nclient.amqp.AMQPExchange;
import org.apache.qpid.nclient.amqp.AMQPMessage;
import org.apache.qpid.nclient.amqp.AMQPQueue;
+import org.apache.qpid.nclient.amqp.qpid.QpidAMQPClassFactory;
import org.apache.qpid.nclient.amqp.state.AMQPStateType;
import org.apache.qpid.nclient.transport.AMQPConnectionURL;
import org.apache.qpid.nclient.transport.ConnectionURL;
@@ -90,7 +91,7 @@ public class TestClient
private static int _channel = 2;
// Need a Class factory per connection
- private AMQPClassFactory _classFactory = new AMQPClassFactory();
+ private AMQPClassFactory _classFactory = new QpidAMQPClassFactory();
private int _ticket;