diff options
Diffstat (limited to 'java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java')
-rw-r--r-- | java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java | 220 |
1 files changed, 19 insertions, 201 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java index f90a5231a2..4976daa4fa 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java @@ -20,216 +20,39 @@ */ package org.apache.qpid.nclient.amqp; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelFlowBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.ChannelOkBody; -import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionCloseOkBody; -import org.apache.qpid.framing.ConnectionOpenOkBody; -import org.apache.qpid.framing.ConnectionSecureBody; -import org.apache.qpid.framing.ConnectionStartBody; -import org.apache.qpid.framing.ConnectionTuneBody; -import org.apache.qpid.framing.ExchangeDeclareOkBody; -import org.apache.qpid.framing.ExchangeDeleteOkBody; -import org.apache.qpid.framing.MessageAppendBody; -import org.apache.qpid.framing.MessageCancelBody; -import org.apache.qpid.framing.MessageCheckpointBody; -import org.apache.qpid.framing.MessageCloseBody; -import org.apache.qpid.framing.MessageGetBody; -import org.apache.qpid.framing.MessageOffsetBody; -import org.apache.qpid.framing.MessageOkBody; -import org.apache.qpid.framing.MessageOpenBody; -import org.apache.qpid.framing.MessageQosBody; -import org.apache.qpid.framing.MessageRecoverBody; -import org.apache.qpid.framing.MessageRejectBody; -import org.apache.qpid.framing.MessageResumeBody; -import org.apache.qpid.framing.MessageTransferBody; -import org.apache.qpid.framing.QueueBindOkBody; -import org.apache.qpid.framing.QueueDeclareOkBody; -import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.framing.QueuePurgeOkBody; -import org.apache.qpid.framing.QueueUnbindOkBody; import org.apache.qpid.nclient.amqp.event.AMQPEventManager; +import org.apache.qpid.nclient.amqp.qpid.QpidAMQPChannel; +import org.apache.qpid.nclient.amqp.qpid.QpidAMQPExchange; +import org.apache.qpid.nclient.amqp.qpid.QpidAMQPMessage; +import org.apache.qpid.nclient.amqp.qpid.QpidAMQPQueue; import org.apache.qpid.nclient.amqp.state.AMQPStateManager; import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.DefaultPhaseContext; -import org.apache.qpid.nclient.core.Phase; -import org.apache.qpid.nclient.core.PhaseContext; -import org.apache.qpid.nclient.core.QpidConstants; -import org.apache.qpid.nclient.transport.AMQPConnectionURL; import org.apache.qpid.nclient.transport.ConnectionURL; -import org.apache.qpid.nclient.transport.TransportConnection; -import org.apache.qpid.nclient.transport.TransportConnectionFactory; import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType; import org.apache.qpid.url.URLSyntaxException; -/** - * The Class Factory creates AMQP Class - * equivalents defined in the spec. - * - * There should one instance per connection. - * The factory class creates all the support - * classes and provides an instance of the - * AMQP class in ready-to-use state. - * - */ -public class AMQPClassFactory +public interface AMQPClassFactory { - //Need an event manager per connection - private AMQPEventManager _eventManager = new QpidEventManager(); - - // Need a state manager per connection - private AMQPStateManager _stateManager = new QpidStateManager(); - - //Need a phase pipe per connection - private Phase _phase; - - //One instance per connection - private AMQPConnection _amqpConnection; - - public AMQPClassFactory() - { - } + public abstract AMQPConnection createConnectionClass(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException; - public AMQPConnection createConnection(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException - { - AMQPConnectionURL url = new AMQPConnectionURL(urlStr); - return createConnectionClass(url, type); - } + public abstract AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException; - public AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException - { - if (_amqpConnection == null) - { - PhaseContext ctx = new DefaultPhaseContext(); - ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager); + public abstract AMQPChannel createChannelClass(int channel) throws AMQPException; - TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type, ctx); - _amqpConnection = new AMQPConnection(conn, _stateManager); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection); - } - return _amqpConnection; - } + public abstract void destroyChannelClass(int channel, QpidAMQPChannel amqpChannel) throws AMQPException; - public AMQPChannel createChannelClass(int channel) throws AMQPException - { - checkIfConnectionStarted(); - AMQPChannel amqpChannel = new AMQPChannel(channel, _phase,_stateManager); - _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelCloseBody.class, amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelFlowBody.class, amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelOkBody.class, amqpChannel); - return amqpChannel; - } + public abstract AMQPExchange createExchangeClass(int channel) throws AMQPException; - public void destroyChannelClass(int channel, AMQPChannel amqpChannel) throws AMQPException - { - _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelOkBody.class, amqpChannel); - } + public abstract void destoryExchangeClass(int channel, QpidAMQPExchange amqpExchange) throws AMQPException; - public AMQPExchange createExchangeClass(int channel) throws AMQPException - { - checkIfConnectionStarted(); - AMQPExchange amqpExchange = new AMQPExchange(channel, _phase); - _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange); - _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange); - return amqpExchange; - } + public abstract AMQPQueue createQueueClass(int channel) throws AMQPException; - public void destoryExchangeClass(int channel, AMQPExchange amqpExchange) throws AMQPException - { - _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange); - _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange); - } + public abstract void destroyQueueClass(int channel, QpidAMQPQueue amqpQueue) throws AMQPException; - public AMQPQueue createQueueClass(int channel) throws AMQPException - { - checkIfConnectionStarted(); - AMQPQueue amqpQueue = new AMQPQueue(channel, _phase); - _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue); - _eventManager.addMethodEventListener(channel, QueueBindOkBody.class, amqpQueue); - _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue); - _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue); - _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue); - return amqpQueue; - } + public abstract AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException; - public void destroyQueueClass(int channel, AMQPQueue amqpQueue) throws AMQPException - { - _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue); - _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, amqpQueue); - _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue); - _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue); - _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue); - } - - public AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException - { - checkIfConnectionStarted(); - AMQPMessage amqpMessage = new AMQPMessage(channel, _phase, messageCb); - _eventManager.addMethodEventListener(channel, MessageAppendBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageCancelBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageCloseBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageGetBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageOffsetBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageOkBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageOpenBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageRecoverBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageRejectBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageResumeBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageQosBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageTransferBody.class, amqpMessage); - - return amqpMessage; - } - - public void destoryMessageClass(int channel, AMQPMessage amqpMessage) throws AMQPException - { - _eventManager.removeMethodEventListener(channel, MessageAppendBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageCancelBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageCloseBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageGetBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageOkBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageOpenBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageRejectBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageResumeBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageQosBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageTransferBody.class, amqpMessage); - } - - //This class should register as a state listener for AMQPConnection - private void checkIfConnectionStarted() throws AMQPException - { - if (_phase == null) - { - _phase = _amqpConnection.getPhasePipe(); - - if (_phase == null) - { - throw new AMQPException("Cannot create a channel until connection is ready"); - } - } - } + public abstract void destoryMessageClass(int channel, QpidAMQPMessage amqpMessage) throws AMQPException; /** * Extention point @@ -237,10 +60,7 @@ public class AMQPClassFactory * and add listeners to get notified of events * */ - public AMQPEventManager getEventManager() - { - return _eventManager; - } + public abstract AMQPEventManager getEventManager(); /** * Extention point @@ -248,8 +68,6 @@ public class AMQPClassFactory * and add listeners to get notified of state changes * */ - public AMQPStateManager getStateManager() - { - return _stateManager; - } -} + public abstract AMQPStateManager getStateManager(); + +}
\ No newline at end of file |