summaryrefslogtreecommitdiff
path: root/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java')
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java340
1 files changed, 170 insertions, 170 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
index a3f9b2c24d..f90a5231a2 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
@@ -78,178 +78,178 @@ import org.apache.qpid.url.URLSyntaxException;
*/
public class AMQPClassFactory
{
- //Need an event manager per connection
- private AMQPEventManager _eventManager = new QpidEventManager();
-
- // Need a state manager per connection
- private AMQPStateManager _stateManager = new QpidStateManager();
-
- //Need a phase pipe per connection
- private Phase _phase;
-
- //One instance per connection
- private AMQPConnection _amqpConnection;
-
- public AMQPClassFactory()
- {
-
- }
-
- public AMQPConnection createConnection(String urlStr,ConnectionType type)throws AMQPException, URLSyntaxException
- {
- AMQPConnectionURL url = new AMQPConnectionURL(urlStr);
- return createConnectionClass(url,type);
- }
-
- public AMQPConnection createConnectionClass(ConnectionURL url,ConnectionType type)throws AMQPException
- {
- if (_amqpConnection == null)
+ //Need an event manager per connection
+ private AMQPEventManager _eventManager = new QpidEventManager();
+
+ // Need a state manager per connection
+ private AMQPStateManager _stateManager = new QpidStateManager();
+
+ //Need a phase pipe per connection
+ private Phase _phase;
+
+ //One instance per connection
+ private AMQPConnection _amqpConnection;
+
+ public AMQPClassFactory()
+ {
+
+ }
+
+ public AMQPConnection createConnection(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException
+ {
+ AMQPConnectionURL url = new AMQPConnectionURL(urlStr);
+ return createConnectionClass(url, type);
+ }
+
+ public AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException
{
- PhaseContext ctx = new DefaultPhaseContext();
- ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager);
-
- TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type,ctx);
- _amqpConnection = new AMQPConnection(conn);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class,_amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class,_amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class,_amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class,_amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class,_amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class,_amqpConnection);
+ if (_amqpConnection == null)
+ {
+ PhaseContext ctx = new DefaultPhaseContext();
+ ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager);
+
+ TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type, ctx);
+ _amqpConnection = new AMQPConnection(conn, _stateManager);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection);
+ }
+ return _amqpConnection;
}
- return _amqpConnection;
- }
-
- public AMQPChannel createChannelClass(int channel)throws AMQPException
- {
- checkIfConnectionStarted();
- AMQPChannel amqpChannel = new AMQPChannel(channel,_phase);
- _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class,amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelCloseBody.class,amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class,amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelFlowBody.class,amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class,amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelOkBody.class,amqpChannel);
- return amqpChannel;
- }
-
- public void destroyChannelClass(int channel,AMQPChannel amqpChannel)throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class,amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class,amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class,amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class,amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class,amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelOkBody.class,amqpChannel);
- }
-
- public AMQPExchange createExchangeClass(int channel)throws AMQPException
- {
- checkIfConnectionStarted();
- AMQPExchange amqpExchange = new AMQPExchange(channel,_phase);
- _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class,amqpExchange);
- _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class,amqpExchange);
- return amqpExchange;
- }
-
- public void destoryExchangeClass(int channel, AMQPExchange amqpExchange)throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class,amqpExchange);
- _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class,amqpExchange);
- }
-
- public AMQPQueue createQueueClass(int channel)throws AMQPException
- {
- checkIfConnectionStarted();
- AMQPQueue amqpQueue = new AMQPQueue(channel,_phase);
- _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class,amqpQueue);
- _eventManager.addMethodEventListener(channel, QueueBindOkBody.class,amqpQueue);
- _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class,amqpQueue);
- _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class,amqpQueue);
- _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class,amqpQueue);
- return amqpQueue;
- }
-
- public void destroyQueueClass(int channel,AMQPQueue amqpQueue)throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class,amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class,amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class,amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class,amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class,amqpQueue);
- }
-
- public AMQPMessage createMessageClass(int channel,AMQPMessageCallBack messageCb)throws AMQPException
- {
- checkIfConnectionStarted();
- AMQPMessage amqpMessage = new AMQPMessage(channel,_phase,messageCb);
- _eventManager.addMethodEventListener(channel, MessageAppendBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageCancelBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageCloseBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageGetBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageOffsetBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageOkBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageOpenBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageRecoverBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageRejectBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageResumeBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageQosBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageTransferBody.class,amqpMessage);
-
- return amqpMessage;
- }
-
- public void destoryMessageClass(int channel,AMQPMessage amqpMessage)throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, MessageAppendBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageCancelBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageCloseBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageGetBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageOkBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageOpenBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageRejectBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageResumeBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageQosBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageTransferBody.class,amqpMessage);
- }
-
- //This class should register as a state listener for AMQPConnection
- private void checkIfConnectionStarted() throws AMQPException
- {
- if (_phase == null)
+
+ public AMQPChannel createChannelClass(int channel) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPChannel amqpChannel = new AMQPChannel(channel, _phase,_stateManager);
+ _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
+ return amqpChannel;
+ }
+
+ public void destroyChannelClass(int channel, AMQPChannel amqpChannel) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
+ }
+
+ public AMQPExchange createExchangeClass(int channel) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPExchange amqpExchange = new AMQPExchange(channel, _phase);
+ _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
+ _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
+ return amqpExchange;
+ }
+
+ public void destoryExchangeClass(int channel, AMQPExchange amqpExchange) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
+ _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
+ }
+
+ public AMQPQueue createQueueClass(int channel) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPQueue amqpQueue = new AMQPQueue(channel, _phase);
+ _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
+ return amqpQueue;
+ }
+
+ public void destroyQueueClass(int channel, AMQPQueue amqpQueue) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
+ }
+
+ public AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPMessage amqpMessage = new AMQPMessage(channel, _phase, messageCb);
+ _eventManager.addMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageGetBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOkBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageQosBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
+
+ return amqpMessage;
+ }
+
+ public void destoryMessageClass(int channel, AMQPMessage amqpMessage) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageGetBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOkBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageQosBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
+ }
+
+ //This class should register as a state listener for AMQPConnection
+ private void checkIfConnectionStarted() throws AMQPException
+ {
+ if (_phase == null)
+ {
+ _phase = _amqpConnection.getPhasePipe();
+
+ if (_phase == null)
+ {
+ throw new AMQPException("Cannot create a channel until connection is ready");
+ }
+ }
+ }
+
+ /**
+ * Extention point
+ * Other interested parties can obtain a reference to the event manager
+ * and add listeners to get notified of events
+ *
+ */
+ public AMQPEventManager getEventManager()
+ {
+ return _eventManager;
+ }
+
+ /**
+ * Extention point
+ * Other interested parties can obtain a reference to the state manager
+ * and add listeners to get notified of state changes
+ *
+ */
+ public AMQPStateManager getStateManager()
{
- _phase = _amqpConnection.getPhasePipe();
-
- if (_phase == null)
- {
- throw new AMQPException("Cannot create a channel until connection is ready");
- }
+ return _stateManager;
}
- }
-
- /**
- * Extention point
- * Other interested parties can obtain a reference to the event manager
- * and add listeners to get notified of events
- *
- */
- public AMQPEventManager getEventManager()
- {
- return _eventManager;
- }
-
- /**
- * Extention point
- * Other interested parties can obtain a reference to the state manager
- * and add listeners to get notified of state changes
- *
- */
- public AMQPStateManager getStateManager()
- {
- return null;
- }
}