summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-03-29 22:24:20 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-03-29 22:24:20 +0000
commit3c9e433f7068925533a69ca65e32af0379594757 (patch)
tree01283e54db06bc9705e8ec93dd668238bd966b90
parent0f9044243547ded8521af0c8d0ff81d791d8048d (diff)
downloadqpid-python-3c9e433f7068925533a69ca65e32af0379594757.tar.gz
First cut of the AMQP java API
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@523854 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/newclient/.project29
-rw-r--r--java/newclient/src/main/java/client.log4j5
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java56
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java484
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java255
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java262
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java6
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java11
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java2
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java6
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java129
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java123
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java42
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java11
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java78
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java (renamed from java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java)2
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java57
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java395
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java48
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml112
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java11
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java19
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java247
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java48
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java2
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java2
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java679
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java1
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java1
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java64
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java74
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java13
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java105
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java62
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java17
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java22
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java27
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java12
38 files changed, 2615 insertions, 904 deletions
diff --git a/java/newclient/.project b/java/newclient/.project
index 0522e6b5dc..f1c93ad7a9 100644
--- a/java/newclient/.project
+++ b/java/newclient/.project
@@ -1,17 +1,14 @@
-<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
- <name>qpid-newclient</name>
- <comment></comment>
- <projects>
- </projects>
- <buildSpec>
- <buildCommand>
- <name>org.eclipse.jdt.core.javabuilder</name>
- <arguments>
- </arguments>
- </buildCommand>
- </buildSpec>
- <natures>
- <nature>org.eclipse.jdt.core.javanature</nature>
- </natures>
-</projectDescription>
+ <name>qpid-newclient</name>
+ <comment/>
+ <projects/>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments/>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription> \ No newline at end of file
diff --git a/java/newclient/src/main/java/client.log4j b/java/newclient/src/main/java/client.log4j
index 525433e9a9..c2c7326479 100644
--- a/java/newclient/src/main/java/client.log4j
+++ b/java/newclient/src/main/java/client.log4j
@@ -16,10 +16,11 @@
# specific language governing permissions and limitations
# under the License.
#
-log4j.rootLogger=${root.logging.level}
+log4j.rootLogger=DEBUG
-log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+#log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+log4j.logger.org.apache.qpid=DEBUG, console
log4j.additivity.org.apache.qpid=false
log4j.appender.console=org.apache.log4j.ConsoleAppender
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
index 890b0dd6eb..f984e812c2 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
@@ -1,11 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.nclient.amqp;
import java.security.SecureRandom;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
public abstract class AMQPCallBackSupport
{
@@ -31,16 +52,16 @@ public abstract class AMQPCallBackSupport
{
if(noWait)
{
- // u only need to register if u are expecting a response
- long localCorrelationId = getNextCorrelationId();
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId);
- _cbMap.put(localCorrelationId, cb);
- return msg;
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID);
+ return msg;
}
else
{
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID);
- return msg;
+ // u only need to register if u are expecting a response
+ long localCorrelationId = getNextCorrelationId();
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId);
+ _cbMap.put(localCorrelationId, cb);
+ return msg;
}
}
@@ -52,12 +73,25 @@ public abstract class AMQPCallBackSupport
return msg;
}
- protected void invokeCallBack(long localCorrelationId, AMQMethodBody methodBody)
+ protected void invokeCallBack(long localCorrelationId, AMQMethodBody methodBody)throws AMQPException
{
- if(_cbMap.contains(localCorrelationId))
+ if(_cbMap.containsKey(localCorrelationId))
{
AMQPCallBack cb = (AMQPCallBack)_cbMap.get(localCorrelationId);
- cb.brokerResponded(methodBody);
+ if(cb == null)
+ {
+ throw new AMQPException("Unable to find the callback object responsible for handling " + methodBody);
+ }
+ else
+ {
+ cb.setIsComplete(true);
+ cb.brokerResponded(methodBody);
+ }
+ _cbMap.remove(localCorrelationId);
+ }
+ else
+ {
+ //ignore, as this event is for another class instance
}
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
index d86f948e28..7ba9120415 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
@@ -36,249 +36,317 @@ import org.apache.qpid.framing.ChannelOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.framing.ChannelResumeBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.amqp.state.AMQPState;
import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.nclient.model.AMQPMethodListener;
import org.apache.qpid.nclient.util.AMQPValidator;
/**
- * This represents the Channel class defined in the AMQP protocol.
- * This class is a finite state machine and is thread safe by design.
- * Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown.
- * Only one thread can enter the methods that change state, at a given time.
- * The AMQP protocol recommends one thread per channel by design.
- *
- * A JMS Session can wrap an instance of this class.
+ * This represents the Channel class defined in the AMQP protocol. This class is a finite state machine and is thread
+ * safe by design. Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown. Only
+ * one thread can enter the methods that change state, at a given time. The AMQP protocol recommends one thread per
+ * channel by design.
+ *
+ * A JMS Session can wrap an instance of this class.
*/
public class AMQPChannel extends AMQPStateMachine implements AMQPMethodListener
{
-private static final Logger _logger = Logger.getLogger(AMQPChannel.class);
-
- //the channelId assigned for this channel
- private int _channelId;
- private Phase _phase;
- private AMQPState _currentState;
- private final AMQPState[] _validCloseStates = new AMQPState[]{AMQPState.CHANNEL_OPENED,AMQPState.CHANNEL_SUSPEND};
- private final AMQPState[] _validResumeStates = new AMQPState[]{AMQPState.CHANNEL_CLOSED,AMQPState.CHANNEL_NOT_OPENED};
-
- // The wait period until a server sends a respond
- private long _serverTimeOut = 1000;
- private final Lock _lock = new ReentrantLock();
- private final Condition _channelNotOpend = _lock.newCondition();
- private final Condition _channelNotClosed = _lock.newCondition();
- private final Condition _channelFlowNotResponded = _lock.newCondition();
- private final Condition _channelNotResumed = _lock.newCondition();
-
- private ChannelOpenOkBody _channelOpenOkBody;
- private ChannelCloseOkBody _channelCloseOkBody;
+ private static final Logger _logger = Logger.getLogger(AMQPChannel.class);
+
+ // the channelId assigned for this channel
+ private int _channelId;
+
+ private Phase _phase;
+
+ private AMQPState _currentState;
+
+ private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND };
+
+ private final AMQPState[] _validResumeStates = new AMQPState[] { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED };
+
+ // The wait period until a server sends a respond
+ private long _serverTimeOut = 1000;
+
+ private final Lock _lock = new ReentrantLock();
+
+ private final Condition _channelNotOpend = _lock.newCondition();
+
+ private final Condition _channelNotClosed = _lock.newCondition();
+
+ private final Condition _channelFlowNotResponded = _lock.newCondition();
+
+ private final Condition _channelNotResumed = _lock.newCondition();
+
+ private ChannelOpenOkBody _channelOpenOkBody;
+
+ private ChannelCloseOkBody _channelCloseOkBody;
+
private ChannelFlowOkBody _channelFlowOkBody;
+
private ChannelOkBody _channelOkBody;
- public AMQPChannel(int channelId)
+ private ChannelCloseBody _channelCloseBody;
+
+ protected AMQPChannel(int channelId, Phase phase)
{
- _channelId = channelId;
- _currentState = AMQPState.CHANNEL_NOT_OPENED;
- _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+ _channelId = channelId;
+ _phase = phase;
+ _currentState = AMQPState.CHANNEL_NOT_OPENED;
+ _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
}
-
- /**-------------------------------------------
+
+ /**
+ * -------------------------------------------
* API Methods
- *--------------------------------------------
+ * --------------------------------------------
*/
-
- /**
- * Opens the channel
- */
- public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException
+
+ /**
+ * Opens the channel
+ */
+ public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelOpenOkBody = null;
+ checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelNotOpend.await();
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
+ _currentState = AMQPState.CHANNEL_OPENED;
+ return _channelOpenOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.open", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Close the channel
+ */
+ public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelCloseOkBody = null;
+ checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelNotClosed.await();
+ AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time");
+ _currentState = AMQPState.CHANNEL_CLOSED;
+ return _channelCloseOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.close", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Channel Flow
+ */
+ public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelFlowOkBody = null;
+ if (channelFlowBody.active)
+ {
+ checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED);
+ }
+ else
+ {
+ checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND);
+ }
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelFlowNotResponded.await();
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
+ handleChannelFlowState(_channelFlowOkBody.active);
+ return _channelFlowOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.flow", e);
+ }
+ finally
{
- _lock.lock();
- try {
- _channelOpenOkBody = null;
- checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED,_currentState,AMQPState.CHANNEL_OPENED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelOpenBody,QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
- _channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
- _currentState = AMQPState.CHANNEL_OPENED;
- return _channelOpenOkBody;
- }
- catch(Exception e)
- {
- throw new AMQPException("XXX");
- }
- finally
- {
- _lock.unlock();
- }
+ _lock.unlock();
}
-
- /**
- * Close the channel
- */
- public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException
+ }
+
+ /**
+ * Close the channel
+ */
+ public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException
+ {
+ _lock.lock();
+ try
{
- _lock.lock();
- try {
- _channelCloseOkBody = null;
- checkIfValidStateTransition(_validCloseStates,_currentState,AMQPState.CHANNEL_CLOSED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelCloseBody,QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
- _channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time");
- _currentState = AMQPState.CHANNEL_CLOSED;
- return _channelCloseOkBody;
- }
- catch(Exception e)
- {
- throw new AMQPException("XXX");
- }
- finally
- {
- _lock.unlock();
- }
+ _channelOkBody = null;
+ checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelNotResumed.await();
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_channelOkBody,
+ "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
+ _currentState = AMQPState.CHANNEL_OPENED;
+ return _channelOkBody;
}
-
- /**
- * Channel Flow
- */
- public ChannelFlowOkBody close(ChannelFlowBody channelFlowBody) throws AMQPException
+ catch (Exception e)
{
- _lock.lock();
- try {
- _channelFlowOkBody = null;
- if(channelFlowBody.active)
- {
- checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND,_currentState,AMQPState.CHANNEL_OPENED);
- }
- else
- {
- checkIfValidStateTransition(AMQPState.CHANNEL_OPENED,_currentState,AMQPState.CHANNEL_SUSPEND);
- }
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelFlowBody,QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
- _channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
- handleChannelFlowState(_channelFlowOkBody.active);
- return _channelFlowOkBody;
- }
- catch(Exception e)
- {
- throw new AMQPException("XXX");
- }
- finally
- {
- _lock.unlock();
- }
+ throw new AMQPException("Error in channel.resume", e);
}
-
- /**
- * Close the channel
- */
- public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException
+ finally
{
- _lock.lock();
- try {
- _channelOkBody = null;
- checkIfValidStateTransition(_validResumeStates,_currentState,AMQPState.CHANNEL_OPENED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelResumeBody,QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
- _channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_channelOkBody, "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
- _currentState = AMQPState.CHANNEL_OPENED;
- return _channelOkBody;
- }
- catch(Exception e)
- {
- throw new AMQPException("XXX");
- }
- finally
- {
- _lock.unlock();
- }
+ _lock.unlock();
}
-
- /**-------------------------------------------
+ }
+
+ /**
+ * -------------------------------------------
* AMQPMethodListener methods
- *--------------------------------------------
+ * --------------------------------------------
*/
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- if (evt.getMethod() instanceof ChannelOpenOkBody)
- {
- _channelOpenOkBody = (ChannelOpenOkBody)evt.getMethod();
- _channelNotOpend.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ChannelCloseOkBody)
- {
- _channelCloseOkBody = (ChannelCloseOkBody)evt.getMethod();
- _channelNotClosed.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ChannelCloseBody)
- {
- handleChannelClose((ChannelCloseBody)evt.getMethod());
- return true;
- }
- else if (evt.getMethod() instanceof ChannelFlowOkBody)
- {
- _channelFlowOkBody = (ChannelFlowOkBody)evt.getMethod();
- _channelFlowNotResponded.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ChannelFlowBody)
- {
- handleChannelFlow((ChannelFlowBody)evt.getMethod());
- return true;
- }
- else if (evt.getMethod() instanceof ChannelOkBody)
- {
- _channelOkBody = (ChannelOkBody)evt.getMethod();
- //In this case the only method expecting channel-ok is channel-resume
- // haven't implemented ping and pong.
- _channelNotResumed.signal();
- return true;
- }
- else
- {
- return false;
- }
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ if (evt.getMethod() instanceof ChannelOpenOkBody)
+ {
+ _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod();
+ _channelNotOpend.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelCloseOkBody)
+ {
+ _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod();
+ _channelNotClosed.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelCloseBody)
+ {
+ _channelCloseBody = (ChannelCloseBody)evt.getMethod();
+ // release the correct lock as u may have some conditions waiting.
+ // while an error occured and the broker has sent a close.
+ releaseLocks();
+ handleChannelClose(_channelCloseBody);
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelFlowOkBody)
+ {
+ _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod();
+ _channelFlowNotResponded.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelFlowBody)
+ {
+ handleChannelFlow((ChannelFlowBody) evt.getMethod());
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelOkBody)
+ {
+ _channelOkBody = (ChannelOkBody) evt.getMethod();
+ // In this case the only method expecting channel-ok is channel-resume
+ // haven't implemented ping and pong.
+ _channelNotResumed.signal();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
-
- private void handleChannelClose(ChannelCloseBody channelCloseBody)
+
+ private void handleChannelClose(ChannelCloseBody channelCloseBody)
+ {
+ _currentState = AMQPState.CHANNEL_CLOSED;
+ // handle channel related cleanup
+ }
+
+ private void releaseLocks()
+ {
+ if(_currentState == AMQPState.CHANNEL_NOT_OPENED)
+ {
+ _channelNotOpend.signal();
+ _channelNotResumed.signal(); // It could be a channel.resume call
+ }
+ else if(_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND)
{
- try
- {
- _lock.lock();
- _currentState = AMQPState.CHANNEL_CLOSED;
- }
- finally
- {
- _lock.unlock();
- }
+ _channelFlowNotResponded.signal();
}
-
- private void handleChannelFlow(ChannelFlowBody channelFlowBody)
+ else if(_currentState == AMQPState.CHANNEL_CLOSED)
{
- _lock.lock();
- try
- {
- handleChannelFlowState(channelFlowBody.active);
- }
- finally
- {
- _lock.unlock();
- }
+ _channelNotResumed.signal();
}
-
- private void handleChannelFlowState(boolean flow)
+ }
+
+ private void checkIfConnectionClosed()throws AMQPException
+ {
+ if (_channelCloseBody != null)
{
- _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND;
+ String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() +
+ " with reply code (" + _channelCloseBody.getReplyCode() + ") " +
+ "caused by class " + _channelCloseBody.getClassId() +
+ " and method " + _channelCloseBody.getMethod();
+
+ throw new AMQPException(error);
}
+ }
+
+ private void handleChannelFlow(ChannelFlowBody channelFlowBody)
+ {
+ _lock.lock();
+ try
+ {
+ handleChannelFlowState(channelFlowBody.active);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ private void handleChannelFlowState(boolean flow)
+ {
+ _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND;
+ }
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
new file mode 100644
index 0000000000..a3f9b2c24d
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
@@ -0,0 +1,255 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ChannelOkBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.framing.MessageOffsetBody;
+import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageQosBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageRejectBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.framing.QueueUnbindOkBody;
+import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.DefaultPhaseContext;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.PhaseContext;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.transport.AMQPConnectionURL;
+import org.apache.qpid.nclient.transport.ConnectionURL;
+import org.apache.qpid.nclient.transport.TransportConnection;
+import org.apache.qpid.nclient.transport.TransportConnectionFactory;
+import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * The Class Factory creates AMQP Class
+ * equivalents defined in the spec.
+ *
+ * There should one instance per connection.
+ * The factory class creates all the support
+ * classes and provides an instance of the
+ * AMQP class in ready-to-use state.
+ *
+ */
+public class AMQPClassFactory
+{
+ //Need an event manager per connection
+ private AMQPEventManager _eventManager = new QpidEventManager();
+
+ // Need a state manager per connection
+ private AMQPStateManager _stateManager = new QpidStateManager();
+
+ //Need a phase pipe per connection
+ private Phase _phase;
+
+ //One instance per connection
+ private AMQPConnection _amqpConnection;
+
+ public AMQPClassFactory()
+ {
+
+ }
+
+ public AMQPConnection createConnection(String urlStr,ConnectionType type)throws AMQPException, URLSyntaxException
+ {
+ AMQPConnectionURL url = new AMQPConnectionURL(urlStr);
+ return createConnectionClass(url,type);
+ }
+
+ public AMQPConnection createConnectionClass(ConnectionURL url,ConnectionType type)throws AMQPException
+ {
+ if (_amqpConnection == null)
+ {
+ PhaseContext ctx = new DefaultPhaseContext();
+ ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager);
+
+ TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type,ctx);
+ _amqpConnection = new AMQPConnection(conn);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class,_amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class,_amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class,_amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class,_amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class,_amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class,_amqpConnection);
+ }
+ return _amqpConnection;
+ }
+
+ public AMQPChannel createChannelClass(int channel)throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPChannel amqpChannel = new AMQPChannel(channel,_phase);
+ _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class,amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelCloseBody.class,amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class,amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelFlowBody.class,amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class,amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelOkBody.class,amqpChannel);
+ return amqpChannel;
+ }
+
+ public void destroyChannelClass(int channel,AMQPChannel amqpChannel)throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class,amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class,amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class,amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class,amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class,amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelOkBody.class,amqpChannel);
+ }
+
+ public AMQPExchange createExchangeClass(int channel)throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPExchange amqpExchange = new AMQPExchange(channel,_phase);
+ _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class,amqpExchange);
+ _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class,amqpExchange);
+ return amqpExchange;
+ }
+
+ public void destoryExchangeClass(int channel, AMQPExchange amqpExchange)throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class,amqpExchange);
+ _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class,amqpExchange);
+ }
+
+ public AMQPQueue createQueueClass(int channel)throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPQueue amqpQueue = new AMQPQueue(channel,_phase);
+ _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class,amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueBindOkBody.class,amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class,amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class,amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class,amqpQueue);
+ return amqpQueue;
+ }
+
+ public void destroyQueueClass(int channel,AMQPQueue amqpQueue)throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class,amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class,amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class,amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class,amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class,amqpQueue);
+ }
+
+ public AMQPMessage createMessageClass(int channel,AMQPMessageCallBack messageCb)throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPMessage amqpMessage = new AMQPMessage(channel,_phase,messageCb);
+ _eventManager.addMethodEventListener(channel, MessageAppendBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCancelBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCloseBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageGetBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOffsetBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOkBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOpenBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageRecoverBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageRejectBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageResumeBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageQosBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageTransferBody.class,amqpMessage);
+
+ return amqpMessage;
+ }
+
+ public void destoryMessageClass(int channel,AMQPMessage amqpMessage)throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, MessageAppendBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCancelBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCloseBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageGetBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOkBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOpenBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageRejectBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageResumeBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageQosBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageTransferBody.class,amqpMessage);
+ }
+
+ //This class should register as a state listener for AMQPConnection
+ private void checkIfConnectionStarted() throws AMQPException
+ {
+ if (_phase == null)
+ {
+ _phase = _amqpConnection.getPhasePipe();
+
+ if (_phase == null)
+ {
+ throw new AMQPException("Cannot create a channel until connection is ready");
+ }
+ }
+ }
+
+ /**
+ * Extention point
+ * Other interested parties can obtain a reference to the event manager
+ * and add listeners to get notified of events
+ *
+ */
+ public AMQPEventManager getEventManager()
+ {
+ return _eventManager;
+ }
+
+ /**
+ * Extention point
+ * Other interested parties can obtain a reference to the state manager
+ * and add listeners to get notified of state changes
+ *
+ */
+ public AMQPStateManager getStateManager()
+ {
+ return null;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
index 9c9e913cd3..aea25a403b 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
@@ -37,14 +37,14 @@ import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.ConnectionTuneBody;
import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.amqp.state.AMQPState;
import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.nclient.model.AMQPMethodListener;
import org.apache.qpid.nclient.transport.TransportConnection;
import org.apache.qpid.nclient.util.AMQPValidator;
@@ -65,9 +65,8 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen
private AMQPState _currentState;
- private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED,
- AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED,
- AMQPState.CONNECTION_OPEN, };
+ private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE,
+ AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, AMQPState.CONNECTION_OPEN, };
// The wait period until a server sends a respond
private long _serverTimeOut = 1000;
@@ -93,8 +92,10 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen
private ConnectionOpenOkBody _connectionOpenOkBody;
private ConnectionCloseOkBody _connectionCloseOkBody;
+
+ private ConnectionCloseBody _connectionCloseBody;
- public AMQPConnection(TransportConnection connection)
+ protected AMQPConnection(TransportConnection connection)
{
_connection = connection;
_currentState = AMQPState.CONNECTION_UNDEFINED;
@@ -102,12 +103,14 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen
}
/**
- * ------------------------------------------- API Methods --------------------------------------------
- */
+ * -------------------------------------------
+ * API Methods
+ * --------------------------------------------
+ */
/**
- * Opens the TCP connection and let the formalities begin.
- */
+ * Opens the TCP connection and let the formalities begin.
+ */
public ConnectionStartBody openTCPConnection() throws AMQPException
{
_lock.lock();
@@ -119,15 +122,17 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen
_phase = _connection.connect();
// waiting for ConnectionStartBody or error in connection
- _connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_connectionStartBody,
- "The broker didn't send the ConnectionStartBody in time");
+ //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotStarted.await();
+
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time");
_currentState = AMQPState.CONNECTION_NOT_STARTED;
return _connectionStartBody;
}
- catch (Exception e)
+ catch (InterruptedException e)
{
- throw new AMQPException("XXX");
+ throw new AMQPException("Error opening connection to broker", e);
}
finally
{
@@ -135,25 +140,43 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen
}
}
- public ConnectionSecureBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
+ /**
+ * The current java broker implementation can send a connection tune body
+ * as a response to the startOk. Not sure if that is the correct behaviour.
+ */
+ public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
{
_lock.lock();
try
{
_connectionSecureBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState,
- AMQPState.CONNECTION_NOT_SECURE);
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE);
AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
_phase.messageSent(msg);
- _connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_connectionSecureBody,
- "The broker didn't send the ConnectionSecureBody in time");
- _currentState = AMQPState.CONNECTION_NOT_SECURE;
- return _connectionSecureBody;
+ //_connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotSecure.await();
+ //AMQPValidator.throwExceptionOnNull(_connectionSecureBody, "The broker didn't send the ConnectionSecureBody in time");
+ //_currentState = AMQPState.CONNECTION_NOT_SECURE;
+
+ checkIfConnectionClosed();
+ if (_connectionTuneBody != null)
+ {
+ _currentState = AMQPState.CONNECTION_NOT_TUNED;
+ return _connectionTuneBody;
+ }
+ else if (_connectionSecureBody != null)
+ { // oops the server sent another challenge
+ _currentState = AMQPState.CONNECTION_NOT_SECURE;
+ return _connectionSecureBody;
+ }
+ else
+ {
+ throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
+ }
}
- catch (Exception e)
+ catch (InterruptedException e)
{
- throw new AMQPException("XXX");
+ throw new AMQPException("Error in connection.startOk", e);
}
finally
{
@@ -162,9 +185,9 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen
}
/**
- * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
- * issue a new challenge
- */
+ * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
+ * issue a new challenge
+ */
public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException
{
_lock.lock();
@@ -173,11 +196,15 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen
_connectionTuneBody = null;
_connectionSecureBody = null;
checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
+
_connectionSecureBody = null; // The server could send a fresh challenge
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody,
- _correlationId);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
_phase.messageSent(msg);
- _connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+
+ //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotTuned.await();
+ checkIfConnectionClosed();
+
if (_connectionTuneBody != null)
{
_currentState = AMQPState.CONNECTION_NOT_TUNED;
@@ -193,9 +220,9 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen
throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
}
}
- catch (Exception e)
+ catch (InterruptedException e)
{
- throw new AMQPException("XXX");
+ throw new AMQPException("Error in connection.secureOk", e);
}
finally
{
@@ -214,10 +241,6 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen
_phase.messageSent(msg);
_currentState = AMQPState.CONNECTION_NOT_OPENED;
}
- catch (Exception e)
- {
- throw new AMQPException("XXX");
- }
finally
{
_lock.unlock();
@@ -229,20 +252,26 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen
_lock.lock();
try
{
+ // If the broker sends a connection close due to an error with the
+ // Connection tune ok, then this call will verify that
+ checkIfConnectionClosed();
+
_connectionOpenOkBody = null;
checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody,
- QpidConstants.EMPTY_CORRELATION_ID);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
_phase.messageSent(msg);
- _connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody,
- "The broker didn't send the ConnectionOpenOkBody in time");
+
+ //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotOpened.await();
+
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time");
_currentState = AMQPState.CONNECTION_OPEN;
return _connectionOpenOkBody;
}
- catch (Exception e)
+ catch (InterruptedException e)
{
- throw new AMQPException("XXX");
+ throw new AMQPException("Error in connection.open", e);
}
finally
{
@@ -257,18 +286,16 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen
{
_connectionCloseOkBody = null;
checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody,
- QpidConstants.EMPTY_CORRELATION_ID);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID);
_phase.messageSent(msg);
_connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody,
- "The broker didn't send the ConnectionCloseOkBody in time");
+ AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time");
_currentState = AMQPState.CONNECTION_CLOSED;
return _connectionCloseOkBody;
}
- catch (Exception e)
+ catch (InterruptedException e)
{
- throw new AMQPException("XXX");
+ throw new AMQPException("Error in connection.close", e);
}
finally
{
@@ -277,72 +304,117 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen
}
/**
- * ------------------------------------------- AMQMethodListener methods
- * --------------------------------------------
- */
+ * ------------------------------------------- AMQMethodListener methods
+ * --------------------------------------------
+ */
public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
{
- _correlationId = evt.getCorrelationId();
+ _lock.lock();
+ try
+ {
+ _correlationId = evt.getCorrelationId();
- if (evt.getMethod() instanceof ConnectionStartBody)
+ if (evt.getMethod() instanceof ConnectionStartBody)
+ {
+ _connectionStartBody = (ConnectionStartBody) evt.getMethod();
+ _connectionNotStarted.signalAll();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionSecureBody)
+ {
+ _connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
+ _connectionNotSecure.signal();
+ _connectionNotTuned.signal(); // in case the server has sent another chanllenge
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionTuneBody)
+ {
+ _connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
+ _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk
+ _connectionNotTuned.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionOpenOkBody)
+ {
+ _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
+ _connectionNotOpened.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionCloseOkBody)
+ {
+ _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
+ _connectionNotClosed.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionCloseBody)
+ {
+ _connectionCloseBody = (ConnectionCloseBody)evt.getMethod();
+ // release the correct lock as u may have some conditions waiting.
+ // while an error occured and the broker has sent a close.
+ releaseLocks();
+ handleClose();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ finally
{
- _connectionStartBody = (ConnectionStartBody) evt.getMethod();
- _connectionNotStarted.signal();
- return true;
+ _lock.unlock();
}
- else if (evt.getMethod() instanceof ConnectionSecureBody)
+ }
+
+ private void handleClose() throws AMQPException
+ {
+ try
+ {
+ _currentState = AMQPState.CONNECTION_CLOSING;
+ // do the required cleanup and send a ConnectionCloseOkBody
+ }
+ catch (Exception e)
{
- _connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
- _connectionNotSecure.signal();
- _connectionNotTuned.signal(); // in case the server has sent another chanllenge
- return true;
+ throw new AMQPException("Error handling connection.close from broker", e);
}
- else if (evt.getMethod() instanceof ConnectionTuneBody)
+ }
+
+ private void checkIfConnectionClosed()throws AMQPException
+ {
+ if (_connectionCloseBody != null)
{
- _connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
- _connectionNotTuned.signal();
- return true;
+ String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() +
+ " with reply code (" + _connectionCloseBody.getReplyCode() + ") " +
+ "caused by class " + _connectionCloseBody.getClassId() +
+ " and method " + _connectionCloseBody.getMethod();
+
+ throw new AMQPException(error);
}
- else if (evt.getMethod() instanceof ConnectionOpenOkBody)
+ }
+
+ private void releaseLocks()
+ {
+ if(_currentState == AMQPState.CONNECTION_NOT_OPENED)
{
- _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
_connectionNotOpened.signal();
- return true;
}
- else if (evt.getMethod() instanceof ConnectionCloseOkBody)
+ else if(_currentState == AMQPState.CONNECTION_UNDEFINED)
{
- _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
- _connectionNotClosed.signal();
- return true;
+ _connectionNotStarted.signal();
}
- else if (evt.getMethod() instanceof ConnectionCloseBody)
+ else if(_currentState == AMQPState.CONNECTION_NOT_STARTED)
{
- handleClose();
- return true;
+ _connectionNotSecure.signal();
}
- else
+ else if(_currentState == AMQPState.CONNECTION_NOT_SECURE)
{
- return false;
+ _connectionNotTuned.signal();
}
}
- public void handleClose() throws AMQPException
+ public Phase getPhasePipe()
{
- _lock.lock();
- try
- {
- checkIfValidStateTransition(AMQPState.CONNECTION_OPEN, _currentState, AMQPState.CONNECTION_CLOSING);
- _currentState = AMQPState.CONNECTION_CLOSING;
- // do the required cleanup and send a ConnectionCloseOkBody
- }
- catch (Exception e)
- {
- throw new AMQPException("XXX");
- }
- finally
- {
- _lock.unlock();
- }
+ return _phase;
}
} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java
index 5315f7f318..35db9c6a75 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java
@@ -26,10 +26,10 @@ import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.ExchangeDeclareOkBody;
import org.apache.qpid.framing.ExchangeDeleteBody;
import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.nclient.model.AMQPMethodListener;
/**
*
@@ -43,7 +43,7 @@ public class AMQPExchange extends AMQPCallBackSupport implements AMQPMethodListe
{
private Phase _phase;
- public AMQPExchange(int channelId,Phase phase)
+ protected AMQPExchange(int channelId,Phase phase)
{
super(channelId);
_phase = phase;
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java
index e3ad9d6306..1b86108411 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java
@@ -25,6 +25,7 @@ import org.apache.qpid.framing.MessageAppendBody;
import org.apache.qpid.framing.MessageCancelBody;
import org.apache.qpid.framing.MessageCheckpointBody;
import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageConsumeBody;
import org.apache.qpid.framing.MessageEmptyBody;
import org.apache.qpid.framing.MessageGetBody;
import org.apache.qpid.framing.MessageOffsetBody;
@@ -35,10 +36,10 @@ import org.apache.qpid.framing.MessageRecoverBody;
import org.apache.qpid.framing.MessageRejectBody;
import org.apache.qpid.framing.MessageResumeBody;
import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.nclient.model.AMQPMethodListener;
/**
* This class represents the AMQP Message class.
@@ -59,7 +60,7 @@ public class AMQPMessage extends AMQPCallBackSupport implements AMQPMethodListen
private Phase _phase;
private AMQPMessageCallBack _messageCb;
- public AMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb)
+ protected AMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb)
{
super(channelId);
_phase = phase;
@@ -78,9 +79,9 @@ public class AMQPMessage extends AMQPCallBackSupport implements AMQPMethodListen
_phase.messageSent(msg);
}
- public void consume(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException
+ public void consume(MessageConsumeBody messageConsumeBody,AMQPCallBack cb) throws AMQPException
{
- AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb);
+ AMQPMethodEvent msg = handleAsynchronousCall(messageConsumeBody,cb);
_phase.messageSent(msg);
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java
index 183ed9dba8..c10d6975c6 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java
@@ -21,10 +21,8 @@
package org.apache.qpid.nclient.amqp;
import org.apache.qpid.framing.MessageAppendBody;
-import org.apache.qpid.framing.MessageCancelBody;
import org.apache.qpid.framing.MessageCheckpointBody;
import org.apache.qpid.framing.MessageCloseBody;
-import org.apache.qpid.framing.MessageGetBody;
import org.apache.qpid.framing.MessageOpenBody;
import org.apache.qpid.framing.MessageRecoverBody;
import org.apache.qpid.framing.MessageResumeBody;
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java
index a5fe6de298..cfe87cb3eb 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java
@@ -31,10 +31,10 @@ import org.apache.qpid.framing.QueuePurgeBody;
import org.apache.qpid.framing.QueuePurgeOkBody;
import org.apache.qpid.framing.QueueUnbindBody;
import org.apache.qpid.framing.QueueUnbindOkBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.nclient.model.AMQPMethodListener;
/**
*
@@ -48,7 +48,7 @@ public class AMQPQueue extends AMQPCallBackSupport implements AMQPMethodListener
{
private Phase _phase;
- public AMQPQueue(int channelId,Phase phase)
+ protected AMQPQueue(int channelId,Phase phase)
{
super(channelId);
_phase = phase;
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java
deleted file mode 100644
index 38421cfca3..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.nclient.model.AMQPMethodListener;
-
-/**
- * This class registeres with the ModelPhase as a AMQMethodListener,
- * to receive method events and then it distributes methods to other listerners
- * using a filtering criteria. The criteria is channel id and method body class.
- * The method listeners are added and removed dynamically
- *
- * <p/>
- */
-public class EventManager implements AMQPMethodListener
-{
- private static final Logger _logger = Logger.getLogger(EventManager.class);
-
- private Map <Integer,Map> _channelMap = new ConcurrentHashMap<Integer,Map>();
-
- /**
- * ------------------------------------------------
- * methods introduced by AMQPMethodEventManager
- * ------------------------------------------------
- */
- public void addMethodEventListener(int channelId,Class clazz,AMQPMethodListener l)
- {
- Map<Class,List> _methodListenerMap;
- if (_channelMap.containsKey(channelId))
- {
- _methodListenerMap = _channelMap.get(channelId);
-
- }
- else
- {
- _methodListenerMap = new ConcurrentHashMap<Class,List>();
- _channelMap.put(channelId, _methodListenerMap);
- }
-
- List<AMQPMethodListener> _listeners;
- if (_methodListenerMap.containsKey(clazz))
- {
- _listeners = _methodListenerMap.get(clazz);
-
- }
- else
- {
- _listeners = new ArrayList<AMQPMethodListener>();
- _methodListenerMap.put(clazz, _listeners);
- }
-
- _listeners.add(l);
-
- }
-
- public void removeMethodEventListener(int channelId,Class clazz,AMQPMethodListener l)
- {
- if (_channelMap.containsKey(channelId))
- {
- Map<Class,List> _methodListenerMap = _channelMap.get(channelId);
-
- if (_methodListenerMap.containsKey(clazz))
- {
- List<AMQPMethodListener> _listeners = _methodListenerMap.get(clazz);
- _listeners.remove(l);
- }
-
- }
- }
-
-
- /**
- * ------------------------------------------------
- * methods introduced by AMQMethodListener
- * ------------------------------------------------
- */
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.model.AMQStateManager#methodReceived(org.apache.qpid.protocol.AMQMethodEvent)
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- if (_channelMap.containsKey(evt.getChannelId()))
- {
- Map<Class,List> _methodListenerMap = _channelMap.get(evt.getChannelId());
-
- if (_methodListenerMap.containsKey(evt.getMethod().getClass()))
- {
-
- List<AMQPMethodListener> _listeners = _methodListenerMap.get(evt.getMethod().getClass());
- for (AMQPMethodListener l:_listeners)
- {
- l.methodReceived(evt);
- }
-
- return (_listeners.size()>0);
- }
-
- }
-
- return false;
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java
new file mode 100644
index 0000000000..9655da6912
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.core.AMQPException;
+
+/**
+ * This class registeres with the ModelPhase as a AMQMethodListener,
+ * to receive method events and then it distributes methods to other listerners
+ * using a filtering criteria. The criteria is channel id and method body class.
+ * The method listeners are added and removed dynamically
+ *
+ * <p/>
+ */
+public class QpidEventManager implements AMQPEventManager
+{
+ private static final Logger _logger = Logger.getLogger(QpidEventManager.class);
+
+ private Map<Integer, Map> _channelMap = new ConcurrentHashMap<Integer, Map>();
+
+ /**
+ * ------------------------------------------------
+ * methods introduced by AMQEventManager
+ * ------------------------------------------------
+ */
+ public void addMethodEventListener(int channelId, Class clazz, AMQPMethodListener l)
+ {
+ Map<Class, List> _methodListenerMap;
+ if (_channelMap.containsKey(channelId))
+ {
+ _methodListenerMap = _channelMap.get(channelId);
+
+ }
+ else
+ {
+ _methodListenerMap = new ConcurrentHashMap<Class, List>();
+ _channelMap.put(channelId, _methodListenerMap);
+ }
+
+ List<AMQPMethodListener> _listeners;
+ if (_methodListenerMap.containsKey(clazz))
+ {
+ _listeners = _methodListenerMap.get(clazz);
+ }
+ else
+ {
+ _listeners = new ArrayList<AMQPMethodListener>();
+ _methodListenerMap.put(clazz, _listeners);
+ }
+
+ _listeners.add(l);
+
+ }
+
+ public void removeMethodEventListener(int channelId, Class clazz, AMQPMethodListener l)
+ {
+ if (_channelMap.containsKey(channelId))
+ {
+ Map<Class, List> _methodListenerMap = _channelMap.get(channelId);
+
+ if (_methodListenerMap.containsKey(clazz))
+ {
+ List<AMQPMethodListener> _listeners = _methodListenerMap.get(clazz);
+ _listeners.remove(l);
+ }
+
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.model.AMQStateManager#methodReceived(org.apache.qpid.protocol.AMQMethodEvent)
+ */
+ public <B extends AMQMethodBody> boolean notifyEvent(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ if (_channelMap.containsKey(evt.getChannelId()))
+ {
+ Map<Class, List> _methodListenerMap = _channelMap.get(evt.getChannelId());
+
+ if (_methodListenerMap.containsKey(evt.getMethod().getClass()))
+ {
+
+ List<AMQPMethodListener> _listeners = _methodListenerMap.get(evt.getMethod().getClass());
+ for (AMQPMethodListener l : _listeners)
+ {
+ l.methodReceived(evt);
+ }
+
+ return (_listeners.size() > 0);
+ }
+
+ }
+
+ return false;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java
new file mode 100644
index 0000000000..e49524ee30
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.nclient.amqp.state.AMQPStateListener;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+
+public class QpidStateManager implements AMQPStateManager
+{
+
+ public void addListener(AMQPStateListener l) throws AMQException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void removeListener(AMQPStateListener l) throws AMQException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java
new file mode 100644
index 0000000000..f682659f9e
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java
@@ -0,0 +1,11 @@
+package org.apache.qpid.nclient.amqp.event;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPEventManager
+{
+ public void addMethodEventListener(int channelId, Class clazz, AMQPMethodListener l);
+ public void removeMethodEventListener(int channelId, Class clazz, AMQPMethodListener l);
+ public <B extends AMQMethodBody> boolean notifyEvent(AMQPMethodEvent<B> evt) throws AMQPException;
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java
new file mode 100644
index 0000000000..c6641890e0
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java
@@ -0,0 +1,78 @@
+package org.apache.qpid.nclient.amqp.event;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+/**
+ * This class is exactly the same as the AMQMethod event.
+ * Except I renamed requestId to corelationId, so I could use it both ways.
+ *
+ * I didn't want to modify anything in common so that there is no
+ * impact on the existing code.
+ *
+ */
+public class AMQPMethodEvent<M extends AMQMethodBody>
+{
+
+ private final M _method;
+
+ private final int _channelId;
+
+ /**
+ * This is the rquest id from the broker when it sent me a request
+ * when I respond I remember this id and copy this to the outgoing
+ * response.
+ */
+ private final long _correlationId;
+
+ /**
+ * I could use _correlationId, bcos when I send a request
+ * this field is blank and is only used internally. But I
+ * used a seperate field to make it more clear.
+ */
+ private long _localCorrletionId = 0;
+
+ public AMQPMethodEvent(int channelId, M method, long correlationId, long localCorrletionId)
+ {
+ _channelId = channelId;
+ _method = method;
+ _correlationId = correlationId;
+ _localCorrletionId = localCorrletionId;
+ }
+
+ public AMQPMethodEvent(int channelId, M method, long correlationId)
+ {
+ _channelId = channelId;
+ _method = method;
+ _correlationId = correlationId;
+ }
+
+ public M getMethod()
+ {
+ return _method;
+ }
+
+ public int getChannelId()
+ {
+ return _channelId;
+ }
+
+ public long getCorrelationId()
+ {
+ return _correlationId;
+ }
+
+ public long getLocalCorrelationId()
+ {
+ return _localCorrletionId;
+ }
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("Method event: \n");
+ buf.append("Channel id: \n").append(_channelId);
+ buf.append("Method: \n").append(_method);
+ buf.append("Request Id: ").append(_correlationId);
+ buf.append("Local Correlation Id: ").append(_localCorrletionId);
+ return buf.toString();
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java
index 52b9f6de91..e77a38121c 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java
@@ -1,4 +1,4 @@
-package org.apache.qpid.nclient.model;
+package org.apache.qpid.nclient.amqp.event;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.nclient.core.AMQPException;
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java
new file mode 100644
index 0000000000..528b47beb4
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java
@@ -0,0 +1,57 @@
+package org.apache.qpid.nclient.amqp.sample;
+
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.nclient.amqp.AMQPMessageCallBack;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class MessageHelper implements AMQPMessageCallBack
+{
+
+ public void append(MessageAppendBody messageAppendBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void checkpoint(MessageCheckpointBody messageCheckpointBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void close(MessageCloseBody messageCloseBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void open(MessageOpenBody messageOpenBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void recover(MessageRecoverBody messageRecoverBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void resume(MessageResumeBody messageResumeBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void transfer(MessageTransferBody messageTransferBody, long correlationId) throws AMQPException
+ {
+ System.out.println("The Broker has sent a message" + messageTransferBody.toString());
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
index 69d2564112..166d565f81 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
@@ -1,25 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.nclient.amqp.sample;
import java.util.StringTokenizer;
+import java.util.UUID;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
+import org.apache.qpid.AMQException;
import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
import org.apache.qpid.framing.ConnectionSecureBody;
import org.apache.qpid.framing.ConnectionSecureOkBody;
import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.ConnectionTuneBody;
import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageConsumeBody;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.QueuePurgeBody;
+import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.nclient.amqp.AMQPCallBack;
+import org.apache.qpid.nclient.amqp.AMQPChannel;
+import org.apache.qpid.nclient.amqp.AMQPClassFactory;
import org.apache.qpid.nclient.amqp.AMQPConnection;
+import org.apache.qpid.nclient.amqp.AMQPExchange;
+import org.apache.qpid.nclient.amqp.AMQPMessage;
+import org.apache.qpid.nclient.amqp.AMQPQueue;
import org.apache.qpid.nclient.transport.AMQPConnectionURL;
import org.apache.qpid.nclient.transport.ConnectionURL;
-import org.apache.qpid.nclient.transport.TransportConnection;
-import org.apache.qpid.nclient.transport.TransportConnectionFactory;
import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
/**
@@ -27,26 +74,34 @@ import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionTy
* Notes this is just a simple demo.
*
* I have used Helper classes to keep the code cleaner.
+ * Will break this into unit tests later on
*/
+
+@SuppressWarnings("unused")
public class TestClient
{
- private byte major;
- private byte minor;
+ private byte _major;
+ private byte _minor;
private ConnectionURL _url;
+ private static int _channel = 2;
+ // Need a Class factory per connection
+ private AMQPClassFactory _classFactory = new AMQPClassFactory();
+ private int _ticket;
public AMQPConnection openConnection() throws Exception
{
- _url = new AMQPConnectionURL("");
- TransportConnection conn = TransportConnectionFactory.createTransportConnection(_url, ConnectionType.VM);
- return new AMQPConnection(conn);
+ //_url = new AMQPConnectionURL("amqp://guest:guest@test/localhost?brokerlist='vm://:3'");
+
+ _url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'");
+ return _classFactory.createConnectionClass(_url,ConnectionType.TCP);
}
- public void handleProtocolNegotiation(AMQPConnection con) throws Exception
+ public void handleConnectionNegotiation(AMQPConnection con) throws Exception
{
// ConnectionStartBody
ConnectionStartBody connectionStartBody = con.openTCPConnection();
- major = connectionStartBody.getMajor();
- minor = connectionStartBody.getMajor();
+ _major = connectionStartBody.getMajor();
+ _minor = connectionStartBody.getMinor();
FieldTable clientProperties = FieldTableFactory.newFieldTable();
clientProperties.put(new AMQShortString(ClientProperties.instance.toString()),"Test"); // setting only the client id
@@ -61,33 +116,335 @@ public class TestClient
null, SecurityHelper.createCallbackHandler(mechanism,_url));
ConnectionStartOkBody connectionStartOkBody =
- ConnectionStartOkBody.createMethodBody(major, minor, clientProperties,
+ ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties,
new AMQShortString(tokenizer.nextToken()),
new AMQShortString(mechanism),
(sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null));
// ConnectionSecureBody
- ConnectionSecureBody connectionSecureBody = con.startOk(connectionStartOkBody);
+ AMQMethodBody body = con.startOk(connectionStartOkBody);
+ ConnectionTuneBody connectionTuneBody;
- ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(
- major,minor,sc.evaluateChallenge(connectionSecureBody.getChallenge()));
+ if (body instanceof ConnectionSecureBody)
+ {
+ ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody)body;
+ ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(
+ _major,_minor,sc.evaluateChallenge(connectionSecureBody.getChallenge()));
+ //Assuming the server is not going to send another challenge
+ connectionTuneBody = (ConnectionTuneBody)con.secureOk(connectionSecureOkBody);
+
+ }
+ else
+ {
+ connectionTuneBody = (ConnectionTuneBody)body;
+ }
- // Assuming the server is not going to send another challenge
- ConnectionTuneBody connectionTuneBody = (ConnectionTuneBody)con.secureOk(connectionSecureOkBody);
// Using broker supplied values
ConnectionTuneOkBody connectionTuneOkBody =
- ConnectionTuneOkBody.createMethodBody(major,minor,
+ ConnectionTuneOkBody.createMethodBody(_major,_minor,
connectionTuneBody.getChannelMax(),
connectionTuneBody.getFrameMax(),
connectionTuneBody.getHeartbeat());
con.tuneOk(connectionTuneOkBody);
+
+ ConnectionOpenBody connectionOpenBody =
+ ConnectionOpenBody.createMethodBody(_major,_minor,null, true,new AMQShortString(_url.getVirtualHost()));
+
+ ConnectionOpenOkBody connectionOpenOkBody = con.open(connectionOpenBody);
+ }
+
+ public void handleChannelNegotiation() throws Exception
+ {
+ AMQPChannel channel = _classFactory.createChannelClass(_channel);
+
+ ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1"));
+ ChannelOpenOkBody channelOpenOkBody = channel.open(channelOpenBody);
+
+ //lets have some fun
+ ChannelFlowBody channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, false);
+
+ ChannelFlowOkBody channelFlowOkBody = channel.flow(channelFlowBody);
+ System.out.println("Channel is " + (channelFlowOkBody.getActive()? "active" : "suspend"));
+
+ channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, true);
+ channelFlowOkBody = channel.flow(channelFlowBody);
+ System.out.println("Channel is " + (channelFlowOkBody.getActive()? "active" : "suspend"));
+ }
+
+ public void createExchange() throws Exception
+ {
+ AMQPExchange exchange = _classFactory.createExchangeClass(_channel);
+
+ ExchangeDeclareBody exchangeDeclareBody =
+ ExchangeDeclareBody.createMethodBody(_major, _minor,
+ null, // arguments
+ false,//auto delete
+ false,// durable
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),
+ true, //internal
+ false,// nowait
+ false,// passive
+ _ticket,
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS));
+
+ AMQPCallBack cb = createCallBackWithMessage("Broker has created the exchange");
+ exchange.declare(exchangeDeclareBody, cb);
+ // Blocking for response
+ while (!cb.isComplete()){}
+ }
+
+
+ public void createAndBindQueue()throws Exception
+ {
+ AMQPQueue queue = _classFactory.createQueueClass(_channel);
+
+ QueueDeclareBody queueDeclareBody =
+ QueueDeclareBody.createMethodBody(_major, _minor,
+ null, //arguments
+ false,//auto delete
+ false,// durable
+ false, //exclusive,
+ false, //nowait,
+ false, //passive,
+ new AMQShortString("MyTestQueue"),
+ 0);
+
+ AMQPCallBack cb = new AMQPCallBack(){
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody)body;
+ System.out.println("[Broker has created the queue, " +
+ "message count " + queueDeclareOkBody.getMessageCount() +
+ "consumer count " + queueDeclareOkBody.getConsumerCount() + "]\n");
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ queue.declare(queueDeclareBody, cb);
+ //Blocking for response
+ while (!cb.isComplete()){}
+
+ QueueBindBody queueBindBody =
+ QueueBindBody.createMethodBody(_major, _minor,
+ null, //arguments
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),//exchange
+ false, //nowait
+ new AMQShortString("MyTestQueue"), //queue
+ new AMQShortString("RH"), //routingKey
+ 0 //ticket
+ );
+
+ cb = createCallBackWithMessage("Broker has bound the queue");
+ queue.bind(queueBindBody, cb);
+ //Blocking for response
+ while (!cb.isComplete()){}
+ }
+
+ public void purgeQueue()throws Exception
+ {
+ AMQPQueue queue = _classFactory.createQueueClass(_channel);
+
+ QueuePurgeBody queuePurgeBody =
+ QueuePurgeBody.createMethodBody(_major, _minor,
+ false, //nowait
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ AMQPCallBack cb = new AMQPCallBack(){
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ QueuePurgeOkBody queuePurgeOkBody = (QueuePurgeOkBody)body;
+ System.out.println("[Broker has purged the queue, message count " + queuePurgeOkBody.getMessageCount() + "]\n");
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ queue.purge(queuePurgeBody, cb);
+ //Blocking for response
+ while (!cb.isComplete()){}
+
}
+
+ public void deleteQueue()throws Exception
+ {
+ AMQPQueue queue = _classFactory.createQueueClass(_channel);
+
+ QueueDeleteBody queueDeleteBody =
+ QueueDeleteBody.createMethodBody(_major, _minor,
+ false, //ifEmpty
+ false, //ifUnused
+ false, //nowait
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ AMQPCallBack cb = new AMQPCallBack(){
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ QueueDeleteOkBody queueDeleteOkBody = (QueueDeleteOkBody)body;
+ System.out.println("[Broker has deleted the queue, message count " + queueDeleteOkBody.getMessageCount() + "]\n");
+ }
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ queue.delete(queueDeleteBody, cb);
+ //Blocking for response
+ while (!cb.isComplete()){}
+
+ }
+
+ public void publishAndSubscribe() throws Exception
+ {
+ AMQPMessage message = _classFactory.createMessageClass(_channel,new MessageHelper());
+ MessageConsumeBody messageConsumeBody = MessageConsumeBody.createMethodBody(_major, _minor,
+ new AMQShortString("myClient"),// destination
+ false, //exclusive
+ null, //filter
+ false, //noAck,
+ false, //noLocal,
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ AMQPCallBack cb = createCallBackWithMessage("Broker has accepted the consume");
+ message.consume(messageConsumeBody, cb);
+ //Blocking for response
+ while (!cb.isComplete()){}
+
+ // Sending 5 messages serially
+ for (int i=0; i<5; i++)
+ {
+ cb = createCallBackWithMessage("Broker has accepted msg " + i);
+ message.transfer(createMessages("Test" + i),cb);
+ while (!cb.isComplete()){}
+ }
+
+ MessageCancelBody messageCancelBody = MessageCancelBody.createMethodBody(_major, _minor, new AMQShortString("myClient"));
+
+ AMQPCallBack cb2 = createCallBackWithMessage("Broker has accepted the consume cancel");
+ message.cancel(messageCancelBody, cb2);
+
+ }
+
+ private MessageTransferBody createMessages(String content) throws Exception
+ {
+ FieldTable headers = FieldTableFactory.newFieldTable();
+ headers.setAsciiString(new AMQShortString("Test"), System.currentTimeMillis() + "");
+
+ MessageTransferBody messageTransferBody =
+ MessageTransferBody.createMethodBody(_major, _minor,
+ new AMQShortString("testApp"), //appId
+ headers, //applicationHeaders
+ new Content(Content.TypeEnum.INLINE_T,content.getBytes()), //body
+ new AMQShortString(""), //contentEncoding,
+ new AMQShortString("text/plain"), //contentType
+ new AMQShortString("testApp"), //correlationId
+ (short)1, //deliveryMode non persistant
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange
+ 0l, //expiration
+ false, //immediate
+ false, //mandatory
+ new AMQShortString(UUID.randomUUID().toString()), //messageId
+ (short)0, //priority
+ false, //redelivered
+ new AMQShortString("RH"), //replyTo
+ new AMQShortString("RH"), //routingKey,
+ "abc".getBytes(), //securityToken
+ 0, //ticket
+ System.currentTimeMillis(), //timestamp
+ new AMQShortString(""), //transactionId
+ 0l, //ttl,
+ new AMQShortString("Hello") //userId
+ );
+
+ return messageTransferBody;
+
+ }
+
+ public void publishAndGet() throws Exception
+ {
+ AMQPMessage message = _classFactory.createMessageClass(_channel,new MessageHelper());
+ AMQPCallBack cb = createCallBackWithMessage("Broker has accepted msg 5");
+
+ MessageGetBody messageGetBody =
+ MessageGetBody.createMethodBody(_major, _minor,
+ new AMQShortString("myClient"),
+ false, //noAck
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ //AMQPMessage message = _classFactory.createMessage(_channel,new MessageHelper());
+ message.transfer(createMessages("Test"),cb);
+ while(!cb.isComplete()){}
+
+ cb = createCallBackWithMessage("Broker has accepted get");
+ message.get(messageGetBody, cb);
+ }
+
+ // Creates a gneric call back and prints the given message
+ private AMQPCallBack createCallBackWithMessage(final String msg)
+ {
+ AMQPCallBack cb = new AMQPCallBack(){
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ System.out.println(msg);
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ return cb;
+ }
+
public static void main(String[] args)
{
TestClient test = new TestClient();
- AMQPConnection con = test.openConnection();
-
+ try
+ {
+ AMQPConnection con = test.openConnection();
+ test.handleConnectionNegotiation(con);
+ test.handleChannelNegotiation();
+ test.createExchange();
+ test.createAndBindQueue();
+ test.publishAndSubscribe();
+ test.purgeQueue();
+ test.publishAndGet();
+ test.deleteQueue();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
}
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java
index 1c3bb788a0..7bc77e02c0 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java
@@ -4,6 +4,9 @@ import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Collection;
import java.util.List;
+import java.util.TreeMap;
+
+import javax.security.sasl.SaslClientFactory;
import org.apache.commons.configuration.CombinedConfiguration;
import org.apache.commons.configuration.ConfigurationException;
@@ -11,6 +14,7 @@ import org.apache.commons.configuration.SystemConfiguration;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.security.AMQPCallbackHandler;
/**
* Loads a properties file from classpath.
@@ -64,21 +68,33 @@ public class ClientConfiguration extends CombinedConfiguration {
public static void main(String[] args)
{
- System.out.println(ClientConfiguration.get().getString(QpidConstants.USE_SHARED_READ_WRITE_POOL));
-
- //System.out.println(ClientConfiguration.get().getString("methodListeners.methodListener(1).[@class]"));
- int count = ClientConfiguration.get().getMaxIndex(QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER);
- System.out.println(count);
-
- for(int i=0 ;i<count;i++)
- {
- String methodListener = QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER + "(" + i + ")";
- System.out.println("\n\n"+ClientConfiguration.get().getString(methodListener + QpidConstants.CLASS));
- List<String> list = ClientConfiguration.get().getList(methodListener + "." + QpidConstants.METHOD_CLASS);
- for(String s:list)
- {
- System.out.println(s);
- }
- }
+ String key = QpidConstants.AMQP_SECURITY + "." +
+ QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES + "." +
+ QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY;
+
+ TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister =
+ new TreeMap<String, Class<? extends SaslClientFactory>>();
+
+ int index = ClientConfiguration.get().getMaxIndex(key);
+
+ for (int i=0; i<index+1;i++)
+ {
+ String mechanism = ClientConfiguration.get().getString(key + "(" + i + ")[@type]");
+ String className = ClientConfiguration.get().getString(key + "(" + i + ")" );
+ try
+ {
+ Class<?> clazz = Class.forName(className);
+ if (!(SaslClientFactory.class.isAssignableFrom(clazz)))
+ {
+ _logger.error("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping");
+ continue;
+ }
+ factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz);
+ }
+ catch (Exception ex)
+ {
+ _logger.error("Error instantiating SaslClientFactory calss " + className + " - skipping");
+ }
+ }
}
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml b/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml
index 587271acd1..93e6f38074 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml
@@ -1,86 +1,36 @@
<?xml version="1.0" encoding="ISO-8859-1" ?>
<qpidClientConfig>
-<security>
- <saslClientFactoryTypes>
- <saslClientFactory type="AMQPLAIN">org.apache.qpid.client.security.amqplain.AmqPlainSaslClientFactory</saslClientFactory>
- </saslClientFactoryTypes>
- <securityMechanisms>
- <securityMechanismHandler type="PLAIN">org.apache.qpid.client.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
- <securityMechanismHandler type="CRAM_MD5">org.apache.qpid.client.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
- </securityMechanisms>
-</security>
-
-<!-- Transport Layer properties -->
-<useSharedReadWritePool>true</useSharedReadWritePool>
-<enableDirectBuffers>true</enableDirectBuffers>
-<enablePooledAllocator>false</enablePooledAllocator>
-<tcpNoDelay>true</tcpNoDelay>
-<sendBufferSizeInKb>32</sendBufferSizeInKb>
-<reciveBufferSizeInKb>32</reciveBufferSizeInKb>
-<qpidVMBrokerClass>org.apache.qpid.server.protocol.AMQPFastProtocolHandler</qpidVMBrokerClass>
-
-<!-- Execution Layer properties -->
-<maxAccumilatedResponses>20</maxAccumilatedResponses>
-
-<!-- Model Phase properties -->
-<serverTimeoutInMilliSeconds>1000</serverTimeoutInMilliSeconds>
-<maxAccumilatedResponses>20</maxAccumilatedResponses>
-<stateManager></stateManager>
-<stateListerners>
- <stateType class="org.apache.qpid.nclient.model.state.AMQPStateType.CONNECTION_STATE">
- <stateListerner></stateListerner>
- </stateType>
- <stateType class="org.apache.qpid.nclient.model.state.AMQPStateType.CHANNEL_STATE">
- <stateListerner></stateListerner>
- </stateType>
-</stateListerners>
-
-<methodListeners>
- <methodListener class="org.apache.qpid.nclient.amqp.AMQPConnection">
- <methodClass>org.apache.qpid.framing.ConnectionStartBody</methodClass>
- <methodClass>org.apache.qpid.framing.ConnectionSecureBody</methodClass>
- <methodClass>org.apache.qpid.framing.ConnectionTuneBody</methodClass>
- <methodClass>org.apache.qpid.framing.ConnectionOpenOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.ConnectionCloseBody</methodClass>
- <methodClass>org.apache.qpid.framing.ConnectionCloseOkBody</methodClass>
-
- <methodClass>org.apache.qpid.framing.ChannelOpenOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.ChannelCloseBody</methodClass>
- <methodClass>org.apache.qpid.framing.ChannelCloseOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.ChannelFlowBody</methodClass>
- <methodClass>org.apache.qpid.framing.ChannelFlowOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.ChannelOkBody</methodClass>
-
- <methodClass>org.apache.qpid.framing.ExchangeDeclareOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.ExchangeDeleteOkBody</methodClass>
-
- <methodClass>org.apache.qpid.framing.QueueDeclareOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.QueueBindOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.QueueUnbindOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.QueuePurgeOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.QueueDeleteOkBody</methodClass>
-
- <methodClass>org.apache.qpid.framing.MessageAppendBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageCancelBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageCheckpointBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageCloseBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageGetBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageOffsetBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageOpenBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageQosBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageRecoverBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageRejectBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageResumeBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageTransferBody</methodClass>
- </methodListener>
-</methodListeners>
-
-<phasePipe>
- <phase index="0">org.apache.qpid.nclient.transport.TransportPhase<phase>
- <phase index="1">org.apache.qpid.nclient.execution.ExecutionPhase<phase>
- <phase index="2">org.apache.qpid.nclient.model.ModelPhase<phase>
-</phasePipe>
+ <security>
+ <saslClientFactoryTypes>
+ <saslClientFactory type="AMQPLAIN">org.apache.qpid.nclient.security.amqplain.AmqPlainSaslClientFactory</saslClientFactory>
+ </saslClientFactoryTypes>
+ <securityMechanisms>
+ <securityMechanismHandler type="PLAIN">org.apache.qpid.nclient.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
+ <securityMechanismHandler type="CRAM_MD5">org.apache.qpid.nclient.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
+ </securityMechanisms>
+ </security>
+
+ <!-- Transport Layer properties -->
+ <useSharedReadWritePool>false</useSharedReadWritePool>
+ <enableDirectBuffers>true</enableDirectBuffers>
+ <enablePooledAllocator>false</enablePooledAllocator>
+ <tcpNoDelay>true</tcpNoDelay>
+ <sendBufferSizeInKb>32</sendBufferSizeInKb>
+ <reciveBufferSizeInKb>32</reciveBufferSizeInKb>
+ <qpidVMBrokerClass>org.apache.qpid.server.protocol.AMQPFastProtocolHandler</qpidVMBrokerClass>
+
+ <!-- Execution Layer properties -->
+ <maxAccumilatedResponses>20</maxAccumilatedResponses>
+
+ <!-- Model Phase properties -->
+ <serverTimeoutInMilliSeconds>60000</serverTimeoutInMilliSeconds>
+ <maxAccumilatedResponses>20</maxAccumilatedResponses>
+
+ <phasePipe>
+ <phase index="0">org.apache.qpid.nclient.transport.TransportPhase</phase>
+ <phase index="1">org.apache.qpid.nclient.execution.ExecutionPhase</phase>
+ <phase index="2">org.apache.qpid.nclient.model.ModelPhase</phase>
+ </phasePipe>
</qpidClientConfig> \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
index 21602fec8e..9542aab344 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
@@ -24,19 +24,22 @@ public class PhaseFactory
*/
public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException
{
+ String key = QpidConstants.PHASE_PIPE + "." + QpidConstants.PHASE;
Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>();
- List<String> list = ClientConfiguration.get().getList(QpidConstants.PHASE_PIPE + "." + QpidConstants.PHASE);
+ List<String> list = ClientConfiguration.get().getList(key);
+ int index = 0;
for(String s:list)
{
try
{
- Phase temp = (Phase)Class.forName(ClientConfiguration.get().getString(s)).newInstance();
- phaseMap.put(ClientConfiguration.get().getInt(s + "." + QpidConstants.INDEX),temp) ;
+ Phase temp = (Phase)Class.forName(s).newInstance();
+ phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + QpidConstants.INDEX),temp) ;
}
catch(Exception e)
{
throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s),e);
}
+ index++;
}
Phase current = null;
@@ -46,7 +49,7 @@ public class PhaseFactory
for (int i=0; i<phaseMap.size();i++)
{
current = phaseMap.get(i);
- if (1+1 < phaseMap.size())
+ if (i+1 < phaseMap.size())
{
next = phaseMap.get(i+1);
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java
index 1a949b7270..9998fda6bb 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java
@@ -10,13 +10,7 @@ public interface QpidConstants {
// Phase Context properties
public final static String AMQP_BROKER_DETAILS = "AMQP_BROKER_DETAILS";
public final static String MINA_IO_CONNECTOR = "MINA_IO_CONNECTOR";
- //public final static String AMQP_MAJOR_VERSION = "AMQP_MAJOR_VERSION";
- //public final static String AMQP_MINOR_VERSION = "AMQP_MINOR_VERSION";
- //public final static String AMQP_SASL_CLIENT = "AMQP_SASL_CLIENT";
- //public final static String AMQP_CLIENT_ID = "AMQP_CLIENT_ID";
- //public final static String AMQP_CONNECTION_TUNE_PARAMETERS = "AMQP_CONNECTION_TUNE_PARAMETERS";
- //public final static String AMQP_VIRTUAL_HOST = "AMQP_VIRTUAL_HOST";
- //public final static String AMQP_MESSAGE_STORE = "AMQP_MESSAGE_STORE";
+ public final static String EVENT_MANAGER = "EVENT_MANAGER";
/**---------------------------------------------------------------
* Configuration file properties
@@ -24,17 +18,7 @@ public interface QpidConstants {
*/
// Model Layer properties
- public final static String STATE_MANAGER = "stateManager";
- public final static String METHOD_LISTENERS = "methodListeners";
- public final static String METHOD_LISTENER = "methodListener";
- public final static String CLASS = "[@class]";
- public final static String METHOD_CLASS = "methodClass";
- public final static String STATE_LISTENERS = "stateListeners";
- public final static String STATE_LISTENER = "stateListener";
- public final static String STATE_TYPE = "stateType";
-
- public final static String AMQP_MESSAGE_STORE_CLASS = "AMQP_MESSAGE_STORE_CLASS";
public final static String SERVER_TIMEOUT_IN_MILLISECONDS = "serverTimeoutInMilliSeconds";
// MINA properties
@@ -50,6 +34,7 @@ public interface QpidConstants {
public final static String AMQP_SECURITY_SASL_CLIENT_FACTORY = "saslClientFactory";
public final static String TYPE = "[@type]";
+ public final static String AMQP_SECURITY = "security";
public final static String AMQP_SECURITY_MECHANISMS = "securityMechanisms";
public final static String AMQP_SECURITY_MECHANISM_HANDLER = "securityMechanismHandler";
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
index 8db955afbc..1305500439 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
@@ -10,142 +10,179 @@ import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQRequestBody;
import org.apache.qpid.framing.AMQResponseBody;
-import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.AbstractPhase;
import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.protocol.AMQMethodEvent;
/**
* Corressponds to the Layer 2 in AMQP.
* This phase handles the correlation of amqp messages
* This class implements the 0.9 spec (request/response)
*/
-public class ExecutionPhase extends AbstractPhase{
+public class ExecutionPhase extends AbstractPhase
+{
+
+ protected static final Logger _logger = Logger.getLogger(ExecutionPhase.class);
- protected static final Logger _logger = Logger.getLogger(ExecutionPhase.class);
protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap();
+
protected ConcurrentMap _channelId2ResponseMgrMap = new ConcurrentHashMap();
-
- /**
- * --------------------------------------------------
- * Phase related methods
- * --------------------------------------------------
- */
-
+ /**
+ * --------------------------------------------------
+ * Phase related methods
+ * --------------------------------------------------
+ */
+
// should add these in the init method
//_channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false));
//_channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false));
-
- public void messageReceived(Object msg) throws AMQPException
- {
- AMQFrame frame = (AMQFrame) msg;
- final AMQBody bodyFrame = frame.getBodyFrame();
-
- if (bodyFrame instanceof AMQRequestBody)
- {
- AMQPMethodEvent event;
- try
- {
- event = messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody)bodyFrame);
- super.messageReceived(event);
- }
- catch (Exception e)
- {
- _logger.error("Error handling request",e);
- }
-
- }
- else if (bodyFrame instanceof AMQResponseBody)
- {
- List<AMQPMethodEvent> events;
- try
- {
- events = messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody)bodyFrame);
- for (AMQPMethodEvent event: events)
- {
- super.messageReceived(event);
- }
- }
- catch (Exception e)
- {
- _logger.error("Error handling response",e);
- }
- }
- }
+ public void messageReceived(Object msg) throws AMQPException
+ {
+ AMQFrame frame = (AMQFrame) msg;
+ final AMQBody bodyFrame = frame.getBodyFrame();
- /**
- * Need to figure out if the message is a request or a response
- * that needs to be sent and then delegate it to the Request or response manager
- * to prepare it.
- */
- public void messageSent(Object msg) throws AMQPException
+ if (bodyFrame instanceof AMQRequestBody)
{
- AMQPMethodEvent evt = (AMQPMethodEvent)msg;
- if(evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID)
+ AMQPMethodEvent event;
+ try
+ {
+ event = messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody) bodyFrame);
+ super.messageReceived(event);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error handling request", e);
+ }
+
+ }
+ else if (bodyFrame instanceof AMQResponseBody)
+ {
+ List<AMQPMethodEvent> events;
+ try
+ {
+ events = messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody) bodyFrame);
+ for (AMQPMethodEvent event : events)
{
- // This is a request
- AMQFrame frame = handleRequest(evt);
- super.messageSent(frame);
+ super.messageReceived(event);
}
- else
- {
-// This is a response
- List<AMQFrame> frames = handleResponse(evt);
- for(AMQFrame frame: frames)
- {
- super.messageSent(frame);
- }
- }
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error handling response", e);
+ }
+ }
+ }
+
+ /**
+ * Need to figure out if the message is a request or a response
+ * that needs to be sent and then delegate it to the Request or response manager
+ * to prepare it.
+ */
+ public void messageSent(Object msg) throws AMQPException
+ {
+ AMQPMethodEvent evt = (AMQPMethodEvent) msg;
+ if (evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID)
+ {
+ // This is a request
+ AMQFrame frame = handleRequest(evt);
+ super.messageSent(frame);
}
+ else
+ {
+ // This is a response
+ List<AMQFrame> frames = handleResponse(evt);
+ for (AMQFrame frame : frames)
+ {
+ super.messageSent(frame);
+ }
+ }
+ }
- /**
- * ------------------------------------------------
- * Methods to handle request response
- * -----------------------------------------------
- */
+ /**
+ * ------------------------------------------------
+ * Methods to handle request response
+ * -----------------------------------------------
+ */
private AMQPMethodEvent messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Request frame received: " + requestBody);
- }
- ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(channelId);
- if (responseManager == null)
- throw new AMQException("Unable to find ResponseManager for channel " + channelId);
- return responseManager.requestReceived(requestBody);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Request frame received: " + requestBody);
+ }
+
+ ResponseManager responseManager;
+ if(_channelId2ResponseMgrMap.containsKey(channelId))
+ {
+ responseManager = (ResponseManager) _channelId2ResponseMgrMap.get(channelId);
+ }
+ else
+ {
+ responseManager = new ResponseManager(0,channelId,false);
+ _channelId2ResponseMgrMap.put(channelId, responseManager);
+ }
+ return responseManager.requestReceived(requestBody);
}
-
- private List<AMQPMethodEvent> messageResponseBodyReceived(int channelId, AMQResponseBody responseBody) throws Exception
+
+ private List<AMQPMethodEvent> messageResponseBodyReceived(int channelId, AMQResponseBody responseBody)
+ throws Exception
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Response frame received: " + responseBody);
- }
- RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(channelId);
- if (requestManager == null)
- throw new AMQException("Unable to find RequestManager for channel " + channelId);
- return requestManager.responseReceived(responseBody);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Response frame received: " + responseBody);
+ }
+
+ RequestManager requestManager;
+ if (_channelId2RequestMgrMap.containsKey(channelId))
+ {
+ requestManager = (RequestManager) _channelId2RequestMgrMap.get(channelId);
+ }
+ else
+ {
+ requestManager = new RequestManager(0,channelId,false);
+ _channelId2RequestMgrMap.put(channelId, requestManager);
+ }
+
+ return requestManager.responseReceived(responseBody);
}
-
+
private AMQFrame handleRequest(AMQPMethodEvent evt)
{
- RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(evt.getChannelId());
- return requestManager.sendRequest(evt);
+ int channelId = evt.getChannelId();
+ RequestManager requestManager;
+ if (_channelId2RequestMgrMap.containsKey(channelId))
+ {
+ requestManager = (RequestManager) _channelId2RequestMgrMap.get(channelId);
+ }
+ else
+ {
+ requestManager = new RequestManager(0,channelId,false);
+ _channelId2RequestMgrMap.put(channelId, requestManager);
+ }
+ return requestManager.sendRequest(evt);
}
-
+
private List<AMQFrame> handleResponse(AMQPMethodEvent evt) throws AMQPException
{
- ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(evt.getChannelId());
- try
- {
- return responseManager.sendResponse(evt);
- }
- catch(Exception e)
- {
- throw new AMQPException("Error handling response",e);
- }
+ int channelId = evt.getChannelId();
+ ResponseManager responseManager;
+ if(_channelId2ResponseMgrMap.containsKey(channelId))
+ {
+ responseManager = (ResponseManager) _channelId2ResponseMgrMap.get(channelId);
+ }
+ else
+ {
+ responseManager = new ResponseManager(0,channelId,false);
+ _channelId2ResponseMgrMap.put(channelId, responseManager);
+ }
+ try
+ {
+ return responseManager.sendResponse(evt);
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error handling response", e);
+ }
}
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java
index 761ec1b050..0084f27717 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java
@@ -29,7 +29,7 @@ import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQRequestBody;
import org.apache.qpid.framing.AMQResponseBody;
import org.apache.qpid.framing.RequestResponseMappingException;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
public class RequestManager
{
@@ -56,7 +56,7 @@ public class RequestManager
*/
private long lastProcessedResponseId;
- private ConcurrentHashMap<Long, Long> requestSentMap;
+ private ConcurrentHashMap<Long, CorrelationID> requestSentMap;
public RequestManager(long connectionId, int channel, boolean serverFlag)
{
@@ -65,7 +65,7 @@ public class RequestManager
this.connectionId = connectionId;
requestIdCount = 1L;
lastProcessedResponseId = 0L;
- requestSentMap = new ConcurrentHashMap<Long, Long>();
+ requestSentMap = new ConcurrentHashMap<Long, CorrelationID>();
}
// *** Functions to originate a request ***
@@ -80,7 +80,7 @@ public class RequestManager
logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
"] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + evt.getMethod());
}
- requestSentMap.put(requestId, evt.getCorrelationId());
+ requestSentMap.put(requestId, new CorrelationID(evt.getCorrelationId(), evt.getLocalCorrelationId()));
return requestFrame;
}
@@ -103,9 +103,9 @@ public class RequestManager
throw new RequestResponseMappingException(requestId,
"Failed to locate requestId " + requestId + " in requestSentMap.");
}
- long localCorrelationId = requestSentMap.get(requestId);
+ CorrelationID correlationID = requestSentMap.get(requestId);
AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel, responseBody.getMethodPayload(),
- requestId,localCorrelationId);
+ correlationID.getSystemCorrelationID(),correlationID.getLocalCorrelationID());
events.add(methodEvent);
requestSentMap.remove(requestId);
}
@@ -126,4 +126,40 @@ public class RequestManager
{
return requestIdCount++;
}
+
+ private class CorrelationID
+ {
+ // Use for the request/response stuff
+ private long _systemCorrelationID;
+ // used internally to track callbacks
+ private long _localCorrelationID;
+
+ CorrelationID(long systemCorrelationID,long localCorrelationID)
+ {
+ _localCorrelationID = localCorrelationID;
+ _systemCorrelationID = systemCorrelationID;
+ }
+
+ public long getLocalCorrelationID()
+ {
+ return _localCorrelationID;
+ }
+
+ public void setLocalCorrelationID(long correlationID)
+ {
+ _localCorrelationID = correlationID;
+ }
+
+ public long getSystemCorrelationID()
+ {
+ return _systemCorrelationID;
+ }
+
+ public void setSystemCorrelationID(long correlationID)
+ {
+ _systemCorrelationID = correlationID;
+ }
+
+
+ }
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java
index 97d9576a4e..c5a75d242f 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java
@@ -31,9 +31,9 @@ import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQRequestBody;
import org.apache.qpid.framing.AMQResponseBody;
import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
public class ResponseManager
{
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
index e7b762b77a..d79525c5b2 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
@@ -24,8 +24,6 @@ package org.apache.qpid.nclient.message;
import java.util.LinkedList;
import java.util.List;
-import org.apache.qpid.client.message.MessageHeaders;
-
public class AMQPApplicationMessage {
private int bytesReceived = 0;
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java
new file mode 100644
index 0000000000..562aa7b06e
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java
@@ -0,0 +1,679 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.qpid.nclient.message;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import java.util.Enumeration;
+
+public class MessageHeaders
+{
+ private static final Logger _logger = Logger.getLogger(MessageHeaders.class);
+
+ private AMQShortString _contentType;
+
+ private AMQShortString _encoding;
+
+ private AMQShortString _destination;
+
+ private AMQShortString _exchange;
+
+ private FieldTable _jmsHeaders;
+
+ private short _deliveryMode;
+
+ private short _priority;
+
+ private AMQShortString _correlationId;
+
+ private AMQShortString _replyTo;
+
+ private long _expiration;
+
+ private AMQShortString _messageId;
+
+ private long _timestamp;
+
+ private AMQShortString _type;
+
+ private AMQShortString _userId;
+
+ private AMQShortString _appId;
+
+ private AMQShortString _transactionId;
+
+ private AMQShortString _routingKey;
+
+ private int _size;
+
+ public int getSize()
+ {
+ return _size;
+ }
+
+ public void setSize(int size)
+ {
+ this._size = size;
+ }
+
+ public MessageHeaders()
+ {
+ }
+
+ public AMQShortString getContentType()
+ {
+ return _contentType;
+ }
+
+ public void setContentType(AMQShortString contentType)
+ {
+ _contentType = contentType;
+ }
+
+ public AMQShortString getEncoding()
+ {
+ return _encoding;
+ }
+
+ public void setEncoding(AMQShortString encoding)
+ {
+ _encoding = encoding;
+ }
+
+ public FieldTable getJMSHeaders()
+ {
+ if (_jmsHeaders == null)
+ {
+ setJMSHeaders(FieldTableFactory.newFieldTable());
+ }
+
+ return _jmsHeaders;
+ }
+
+ public void setJMSHeaders(FieldTable headers)
+ {
+ _jmsHeaders = headers;
+ }
+
+
+ public short getDeliveryMode()
+ {
+ return _deliveryMode;
+ }
+
+ public void setDeliveryMode(short deliveryMode)
+ {
+ _deliveryMode = deliveryMode;
+ }
+
+ public short getPriority()
+ {
+ return _priority;
+ }
+
+ public void setPriority(short priority)
+ {
+ _priority = priority;
+ }
+
+ public AMQShortString getCorrelationId()
+ {
+ return _correlationId;
+ }
+
+ public void setCorrelationId(AMQShortString correlationId)
+ {
+ _correlationId = correlationId;
+ }
+
+ public AMQShortString getReplyTo()
+ {
+ return _replyTo;
+ }
+
+ public void setReplyTo(AMQShortString replyTo)
+ {
+ _replyTo = replyTo;
+ }
+
+ public long getExpiration()
+ {
+ return _expiration;
+ }
+
+ public void setExpiration(long expiration)
+ {
+ _expiration = expiration;
+ }
+
+
+ public AMQShortString getMessageId()
+ {
+ return _messageId;
+ }
+
+ public void setMessageId(AMQShortString messageId)
+ {
+ _messageId = messageId;
+ }
+
+ public long getTimestamp()
+ {
+ return _timestamp;
+ }
+
+ public void setTimestamp(long timestamp)
+ {
+ _timestamp = timestamp;
+ }
+
+ public AMQShortString getType()
+ {
+ return _type;
+ }
+
+ public void setType(AMQShortString type)
+ {
+ _type = type;
+ }
+
+ public AMQShortString getUserId()
+ {
+ return _userId;
+ }
+
+ public void setUserId(AMQShortString userId)
+ {
+ _userId = userId;
+ }
+
+ public AMQShortString getAppId()
+ {
+ return _appId;
+ }
+
+ public void setAppId(AMQShortString appId)
+ {
+ _appId = appId;
+ }
+
+ // MapMessage Interface
+
+ public boolean getBoolean(AMQShortString string) throws JMSException
+ {
+ Boolean b = getJMSHeaders().getBoolean(string);
+
+ if (b == null)
+ {
+ if (getJMSHeaders().containsKey(string))
+ {
+ Object str = getJMSHeaders().getObject(string);
+
+ if (str == null || !(str instanceof AMQShortString))
+ {
+ throw new MessageFormatException("getBoolean can't use " + string + " item.");
+ }
+ else
+ {
+ return Boolean.valueOf(((AMQShortString)str).asString());
+ }
+ }
+ else
+ {
+ b = Boolean.valueOf(null);
+ }
+ }
+
+ return b;
+ }
+
+ public char getCharacter(AMQShortString string) throws JMSException
+ {
+ Character c = getJMSHeaders().getCharacter(string);
+
+ if (c == null)
+ {
+ if (getJMSHeaders().isNullStringValue(string.asString()))
+ {
+ throw new NullPointerException("Cannot convert null char");
+ }
+ else
+ {
+ throw new MessageFormatException("getChar can't use " + string + " item.");
+ }
+ }
+ else
+ {
+ return (char) c;
+ }
+ }
+
+ public byte[] getBytes(AMQShortString string) throws JMSException
+ {
+ byte[] bs = getJMSHeaders().getBytes(string);
+
+ if (bs == null)
+ {
+ throw new MessageFormatException("getBytes can't use " + string + " item.");
+ }
+ else
+ {
+ return bs;
+ }
+ }
+
+ public byte getByte(AMQShortString string) throws JMSException
+ {
+ Byte b = getJMSHeaders().getByte(string);
+ if (b == null)
+ {
+ if (getJMSHeaders().containsKey(string))
+ {
+ Object str = getJMSHeaders().getObject(string);
+
+ if (str == null || !(str instanceof AMQShortString))
+ {
+ throw new MessageFormatException("getByte can't use " + string + " item.");
+ }
+ else
+ {
+ return Byte.valueOf(((AMQShortString)str).asString());
+ }
+ }
+ else
+ {
+ b = Byte.valueOf(null);
+ }
+ }
+
+ return b;
+ }
+
+ public short getShort(AMQShortString string) throws JMSException
+ {
+ Short s = getJMSHeaders().getShort(string);
+
+ if (s == null)
+ {
+ s = Short.valueOf(getByte(string));
+ }
+
+ return s;
+ }
+
+ public int getInteger(AMQShortString string) throws JMSException
+ {
+ Integer i = getJMSHeaders().getInteger(string);
+
+ if (i == null)
+ {
+ i = Integer.valueOf(getShort(string));
+ }
+
+ return i;
+ }
+
+ public long getLong(AMQShortString string) throws JMSException
+ {
+ Long l = getJMSHeaders().getLong(string);
+
+ if (l == null)
+ {
+ l = Long.valueOf(getInteger(string));
+ }
+
+ return l;
+ }
+
+ public float getFloat(AMQShortString string) throws JMSException
+ {
+ Float f = getJMSHeaders().getFloat(string);
+
+ if (f == null)
+ {
+ if (getJMSHeaders().containsKey(string))
+ {
+ Object str = getJMSHeaders().getObject(string);
+
+ if (str == null || !(str instanceof AMQShortString))
+ {
+ throw new MessageFormatException("getFloat can't use " + string + " item.");
+ }
+ else
+ {
+ return Float.valueOf(((AMQShortString)str).asString());
+ }
+ }
+ else
+ {
+ f = Float.valueOf(null);
+ }
+
+ }
+
+ return f;
+ }
+
+ public double getDouble(AMQShortString string) throws JMSException
+ {
+ Double d = getJMSHeaders().getDouble(string);
+
+ if (d == null)
+ {
+ d = Double.valueOf(getFloat(string));
+ }
+
+ return d;
+ }
+
+ public AMQShortString getString(AMQShortString string) throws JMSException
+ {
+ AMQShortString s = new AMQShortString(getJMSHeaders().getString(string.asString()));
+
+ if (s == null)
+ {
+ if (getJMSHeaders().containsKey(string))
+ {
+ Object o = getJMSHeaders().getObject(string);
+ if (o instanceof byte[])
+ {
+ throw new MessageFormatException("getObject couldn't find " + string + " item.");
+ }
+ else
+ {
+ if (o == null)
+ {
+ return null;
+ }
+ else
+ {
+ s = (AMQShortString) o;
+ }
+ }
+ }
+ }
+
+ return s;
+ }
+
+ public Object getObject(AMQShortString string) throws JMSException
+ {
+ return getJMSHeaders().getObject(string);
+ }
+
+ public void setBoolean(AMQShortString string, boolean b) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setBoolean(string, b);
+ }
+
+ public void setChar(AMQShortString string, char c) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setChar(string, c);
+ }
+
+ public Object setBytes(AMQShortString string, byte[] bytes)
+ {
+ return getJMSHeaders().setBytes(string, bytes);
+ }
+
+ public Object setBytes(AMQShortString string, byte[] bytes, int start, int length)
+ {
+ return getJMSHeaders().setBytes(string, bytes, start, length);
+ }
+
+ public void setByte(AMQShortString string, byte b) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setByte(string, b);
+ }
+
+ public void setShort(AMQShortString string, short i) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setShort(string, i);
+ }
+
+ public void setInteger(AMQShortString string, int i) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setInteger(string, i);
+ }
+
+ public void setLong(AMQShortString string, long l) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setLong(string, l);
+ }
+
+ public void setFloat(AMQShortString string, float v) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setFloat(string, v);
+ }
+
+ public void setDouble(AMQShortString string, double v) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setDouble(string, v);
+ }
+
+ public void setString(AMQShortString string, AMQShortString string1) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setString(string.asString(), string1.asString());
+ }
+
+ public void setObject(AMQShortString string, Object object) throws JMSException
+ {
+ checkPropertyName(string);
+ try
+ {
+ getJMSHeaders().setObject(string, object);
+ }
+ catch (AMQPInvalidClassException aice)
+ {
+ throw new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
+ }
+ }
+
+ public boolean itemExists(AMQShortString string) throws JMSException
+ {
+ return getJMSHeaders().containsKey(string);
+ }
+
+ public Enumeration getPropertyNames()
+ {
+ return getJMSHeaders().getPropertyNames();
+ }
+
+ public void clear()
+ {
+ getJMSHeaders().clear();
+ }
+
+ public boolean propertyExists(AMQShortString propertyName)
+ {
+ return getJMSHeaders().propertyExists(propertyName);
+ }
+
+ public Object put(Object key, Object value)
+ {
+ return getJMSHeaders().setObject(key.toString(), value);
+ }
+
+ public Object remove(AMQShortString propertyName)
+ {
+ return getJMSHeaders().remove(propertyName);
+ }
+
+ public boolean isEmpty()
+ {
+ return getJMSHeaders().isEmpty();
+ }
+
+ public void writeToBuffer(ByteBuffer data)
+ {
+ getJMSHeaders().writeToBuffer(data);
+ }
+
+ public Enumeration getMapNames()
+ {
+ return getPropertyNames();
+ }
+
+ protected static void checkPropertyName(CharSequence propertyName)
+ {
+ if (propertyName == null)
+ {
+ throw new IllegalArgumentException("Property name must not be null");
+ }
+ else if (propertyName.length() == 0)
+ {
+ throw new IllegalArgumentException("Property name must not be the empty string");
+ }
+
+ checkIdentiferFormat(propertyName);
+ }
+
+ protected static void checkIdentiferFormat(CharSequence propertyName)
+ {
+// JMS requirements 3.5.1 Property Names
+// Identifiers:
+// - An identifier is an unlimited-length character sequence that must begin
+// with a Java identifier start character; all following characters must be Java
+// identifier part characters. An identifier start character is any character for
+// which the method Character.isJavaIdentifierStart returns true. This includes
+// '_' and '$'. An identifier part character is any character for which the
+// method Character.isJavaIdentifierPart returns true.
+// - Identifiers cannot be the names NULL, TRUE, or FALSE.
+// – Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or
+// ESCAPE.
+// – Identifiers are either header field references or property references. The
+// type of a property value in a message selector corresponds to the type
+// used to set the property. If a property that does not exist in a message is
+// referenced, its value is NULL. The semantics of evaluating NULL values
+// in a selector are described in Section 3.8.1.2, “Null Values.”
+// – The conversions that apply to the get methods for properties do not
+// apply when a property is used in a message selector expression. For
+// example, suppose you set a property as a string value, as in the
+// following:
+// myMessage.setStringProperty("NumberOfOrders", "2");
+// The following expression in a message selector would evaluate to false,
+// because a string cannot be used in an arithmetic expression:
+// "NumberOfOrders > 1"
+// – Identifiers are case sensitive.
+// – Message header field references are restricted to JMSDeliveryMode,
+// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and
+// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be
+// null and if so are treated as a NULL value.
+
+ if (Boolean.getBoolean("strict-jms"))
+ {
+ // JMS start character
+ if (!(Character.isJavaIdentifierStart(propertyName.charAt(0))))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character");
+ }
+
+ // JMS part character
+ int length = propertyName.length();
+ for (int c = 1; c < length; c++)
+ {
+ if (!(Character.isJavaIdentifierPart(propertyName.charAt(c))))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character");
+ }
+ }
+
+
+
+
+ // JMS invalid names
+ if ((propertyName.equals("NULL")
+ || propertyName.equals("TRUE")
+ || propertyName.equals("FALSE")
+ || propertyName.equals("NOT")
+ || propertyName.equals("AND")
+ || propertyName.equals("OR")
+ || propertyName.equals("BETWEEN")
+ || propertyName.equals("LIKE")
+ || propertyName.equals("IN")
+ || propertyName.equals("IS")
+ || propertyName.equals("ESCAPE")))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS");
+ }
+ }
+
+ }
+
+ public AMQShortString getTransactionId()
+ {
+ return _transactionId;
+ }
+
+ public void setTransactionId(AMQShortString id)
+ {
+ _transactionId = id;
+ }
+
+ public AMQShortString getDestination()
+ {
+ return _destination;
+ }
+
+ public void setDestination(AMQShortString destination)
+ {
+ this._destination = destination;
+ }
+
+ public AMQShortString getExchange()
+ {
+ return _exchange;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ this._exchange = exchange;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
+
+ public void setRoutingKey(AMQShortString routingKey)
+ {
+ this._routingKey = routingKey;
+ }
+}
+
+
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java
index 93eecdc0cc..efd7264f96 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java
@@ -1,7 +1,6 @@
package org.apache.qpid.nclient.message;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.MessageHeaders;
public interface MessageStore {
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java
index 26cf2327d7..eb5a9c1778 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java
@@ -4,7 +4,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.MessageHeaders;
public class TransientMessageStore implements MessageStore {
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java
deleted file mode 100644
index c33c087da8..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.apache.qpid.nclient.model;
-
-import org.apache.qpid.framing.AMQMethodBody;
-
-/**
- * This class is exactly the same as the AMQMethod event.
- * Except I renamed requestId to corelationId, so I could use it both ways.
- *
- * I didn't want to modify anything in common so that there is no
- * impact on the existing code.
- *
- */
-public class AMQPMethodEvent <M extends AMQMethodBody> {
-
- private final M _method;
- private final int _channelId;
- private final long _correlationId;
- private long _localCorrletionId = 0;
-
- public AMQPMethodEvent(int channelId, M method, long correlationId,long localCorrletionId)
- {
- _channelId = channelId;
- _method = method;
- _correlationId = correlationId;
- _localCorrletionId = localCorrletionId;
- }
-
- public AMQPMethodEvent(int channelId, M method, long correlationId)
- {
- _channelId = channelId;
- _method = method;
- _correlationId = correlationId;
- }
-
- public M getMethod()
- {
- return _method;
- }
-
- public int getChannelId()
- {
- return _channelId;
- }
-
- public long getCorrelationId()
- {
- return _correlationId;
- }
-
- public long getLocalCorrelationId()
- {
- return _localCorrletionId;
- }
-
- public String toString()
- {
- StringBuilder buf = new StringBuilder("Method event: \n");
- buf.append("Channel id: \n").append(_channelId);
- buf.append("Method: \n").append(_method);
- buf.append("Request Id: ").append(_correlationId);
- buf.append("Local Correlation Id: ").append(_localCorrletionId);
- return buf.toString();
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java
index 77003b4d21..d845059ee7 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java
@@ -1,13 +1,12 @@
package org.apache.qpid.nclient.model;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
-import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
-import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.AbstractPhase;
import org.apache.qpid.nclient.core.Phase;
@@ -31,15 +30,7 @@ public class ModelPhase extends AbstractPhase {
*/
public void init(PhaseContext ctx, Phase nextInFlowPhase, Phase nextOutFlowPhase)
{
- super.init(ctx, nextInFlowPhase, nextOutFlowPhase);
- try
- {
- loadMethodListeners();
- }
- catch(Exception e)
- {
- _logger.fatal("Error loading method listeners", e);
- }
+ super.init(ctx, nextInFlowPhase, nextOutFlowPhase);
}
public void messageReceived(Object msg) throws AMQPException
@@ -68,66 +59,13 @@ public class ModelPhase extends AbstractPhase {
public void notifyMethodListerners(AMQPMethodEvent event) throws AMQPException
{
- if (_methodListners.containsKey(event.getMethod().getClass()))
- {
- List<AMQPMethodListener> listeners = _methodListners.get(event.getMethod().getClass());
-
- if(listeners.size()>0)
- {
- throw new AMQPException("There are no registered listeners for this method");
- }
-
- for(AMQPMethodListener l : listeners)
- {
- try
- {
- l.methodReceived(event);
- }
- catch (Exception e)
- {
- _logger.error("Error handling method event " + event, e);
- }
- }
- }
+ AMQPEventManager eventManager = (AMQPEventManager)_ctx.getProperty(QpidConstants.EVENT_MANAGER);
+ eventManager.notifyEvent(event);
}
/**
* ------------------------------------------------
* Configuration
* ------------------------------------------------
- */
-
- /**
- * This method loads method listeners from the client.xml file
- * For each method class there is a list of listeners
- */
- private void loadMethodListeners() throws Exception
- {
- int count = ClientConfiguration.get().getMaxIndex(QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER);
- System.out.println(count);
-
- for(int i=0 ;i<count;i++)
- {
- String methodListener = QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER + "(" + i + ")";
- String className = ClientConfiguration.get().getString(methodListener + "." + QpidConstants.CLASS);
- Class listenerClass = Class.forName(className);
- List<String> list = ClientConfiguration.get().getList(methodListener + "." + QpidConstants.METHOD_CLASS);
- for(String s:list)
- {
- List listeners;
- Class methodClass = Class.forName(s);
- if (_methodListners.containsKey(methodClass))
- {
- listeners = _methodListners.get(methodClass);
- }
- else
- {
- listeners = new ArrayList();
- _methodListners.put(methodClass,listeners);
- }
- listeners.add(listenerClass);
- }
- }
- }
-
+ */
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java
index 28ba2e355c..428cd6753d 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java
@@ -62,11 +62,16 @@ public class CallbackHandlerRegistry
private void parseProperties()
{
- List<String> mechanisms = ClientConfiguration.get().getList(QpidConstants.AMQP_SECURITY_MECHANISMS);
-
- for (String mechanism : mechanisms)
+ String key = QpidConstants.AMQP_SECURITY + "." +
+ QpidConstants.AMQP_SECURITY_MECHANISMS + "." +
+ QpidConstants.AMQP_SECURITY_MECHANISM_HANDLER;
+
+ int index = ClientConfiguration.get().getMaxIndex(key);
+
+ for (int i=0; i<index+1;i++)
{
- String className = ClientConfiguration.get().getString(QpidConstants.AMQP_SECURITY_MECHANISM_HANDLER + "_" + mechanism);
+ String mechanism = ClientConfiguration.get().getString(key + "(" + i + ")[@type]");
+ String className = ClientConfiguration.get().getString(key + "(" + i + ")" );
Class clazz = null;
try
{
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java
new file mode 100644
index 0000000000..1097346c1d
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security.amqplain;
+
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.Callback;
+
+/**
+ * Implements the "AMQPlain" authentication protocol that uses FieldTables to send username and pwd.
+ *
+ */
+public class AmqPlainSaslClient implements SaslClient
+{
+ /**
+ * The name of this mechanism
+ */
+ public static final String MECHANISM = "AMQPLAIN";
+
+ private CallbackHandler _cbh;
+
+ public AmqPlainSaslClient(CallbackHandler cbh)
+ {
+ _cbh = cbh;
+ }
+
+ public String getMechanismName()
+ {
+ return "AMQPLAIN";
+ }
+
+ public boolean hasInitialResponse()
+ {
+ return true;
+ }
+
+ public byte[] evaluateChallenge(byte[] challenge) throws SaslException
+ {
+ // we do not care about the prompt or the default name
+ NameCallback nameCallback = new NameCallback("prompt", "defaultName");
+ PasswordCallback pwdCallback = new PasswordCallback("prompt", false);
+ Callback[] callbacks = new Callback[]{nameCallback, pwdCallback};
+ try
+ {
+ _cbh.handle(callbacks);
+ }
+ catch (Exception e)
+ {
+ throw new SaslException("Error handling SASL callbacks: " + e, e);
+ }
+ FieldTable table = FieldTableFactory.newFieldTable();
+ table.setString("LOGIN", nameCallback.getName());
+ table.setString("PASSWORD", new String(pwdCallback.getPassword()));
+ return table.getDataAsBytes();
+ }
+
+ public boolean isComplete()
+ {
+ return true;
+ }
+
+ public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException
+ {
+ throw new SaslException("Not supported");
+ }
+
+ public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException
+ {
+ throw new SaslException("Not supported");
+ }
+
+ public Object getNegotiatedProperty(String propName)
+ {
+ return null;
+ }
+
+ public void dispose() throws SaslException
+ {
+ _cbh = null;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java
new file mode 100644
index 0000000000..f98c1e3a58
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security.amqplain;
+
+import javax.security.sasl.SaslClientFactory;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.Sasl;
+import javax.security.auth.callback.CallbackHandler;
+import java.util.Map;
+
+public class AmqPlainSaslClientFactory implements SaslClientFactory
+{
+ public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName, Map props, CallbackHandler cbh) throws SaslException
+ {
+ for (int i = 0; i < mechanisms.length; i++)
+ {
+ if (mechanisms[i].equals(AmqPlainSaslClient.MECHANISM))
+ {
+ if (cbh == null)
+ {
+ throw new SaslException("CallbackHandler must not be null");
+ }
+ return new AmqPlainSaslClient(cbh);
+ }
+ }
+ return null;
+ }
+
+ public String[] getMechanismNames(Map props)
+ {
+ if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+ props.containsKey(Sasl.POLICY_NOACTIVE))
+ {
+ // returned array must be non null according to interface documentation
+ return new String[0];
+ }
+ else
+ {
+ return new String[]{AmqPlainSaslClient.MECHANISM};
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java
index aae3677f8b..734aa68a9d 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java
@@ -22,10 +22,12 @@ public class TCPConnection implements TransportConnection
private BrokerDetails _brokerDetails;
private IoConnector _ioConnector;
private Phase _phase;
+ private PhaseContext _ctx;
- protected TCPConnection(ConnectionURL url)
+ protected TCPConnection(ConnectionURL url, PhaseContext ctx)
{
_brokerDetails = url.getBrokerDetails(0);
+ _ctx = ctx;
ByteBuffer.setUseDirectBuffers(ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_DIRECT_BUFFERS));
@@ -41,8 +43,8 @@ public class TCPConnection implements TransportConnection
ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
}
- final IoConnector ioConnector = new SocketConnector();
- SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
+ _ioConnector = new SocketConnector();
+ SocketConnectorConfig cfg = (SocketConnectorConfig) _ioConnector.getDefaultConfig();
// if we do not use our own thread model we get the MINA default which is to use
// its own leader-follower model
@@ -59,12 +61,11 @@ public class TCPConnection implements TransportConnection
// Returns the phase pipe
public Phase connect() throws AMQPException
- {
- PhaseContext ctx = new DefaultPhaseContext();
- ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
- ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
+ {
+ _ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
+ _ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
- _phase = PhaseFactory.createPhasePipe(ctx);
+ _phase = PhaseFactory.createPhasePipe(_ctx);
_phase.start();
return _phase;
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java
index 5c97100bdc..ce2145c08b 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java
@@ -2,6 +2,8 @@ package org.apache.qpid.nclient.transport;
import java.net.URISyntaxException;
+import org.apache.qpid.nclient.core.PhaseContext;
+
public class TransportConnectionFactory
{
public enum ConnectionType
@@ -9,36 +11,26 @@ public class TransportConnectionFactory
TCP,VM
}
- public static TransportConnection createTransportConnection(String url,ConnectionType type) throws URISyntaxException
+ public static TransportConnection createTransportConnection(String url,ConnectionType type, PhaseContext ctx) throws URISyntaxException
{
- return createTransportConnection(new AMQPConnectionURL(url),type);
+ return createTransportConnection(new AMQPConnectionURL(url),type,ctx);
}
- public static TransportConnection createTransportConnection(ConnectionURL url,ConnectionType type)
+ public static TransportConnection createTransportConnection(ConnectionURL url,ConnectionType type, PhaseContext ctx)
{
switch (type)
{
case TCP : default:
{
- return createTCPConnection(url);
+ return new TCPConnection(url,ctx);
}
case VM :
{
- return createVMConnection(url);
+ return new VMConnection(url,ctx);
}
}
}
-
- private static TransportConnection createTCPConnection(ConnectionURL url)
- {
- return new TCPConnection(url);
- }
-
- private static TransportConnection createVMConnection(ConnectionURL url)
- {
- return null;
- }
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java
index b1bc7b4c8c..911e855d4f 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java
@@ -108,7 +108,6 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol
public void messageSent(Object frame) throws AMQPException
{
-
_ioSession.write(frame);
}
@@ -120,7 +119,7 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol
public void sessionIdle(IoSession session, IdleStatus status)
throws Exception
{
- _logger.debug("Protocol Session [" + this + ":" + session + "] idle: "
+ _logger.debug("Protocol Session for [ " + this + " : " + session + "] idle: "
+ status);
if (IdleStatus.WRITER_IDLE.equals(status))
{
@@ -148,9 +147,8 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol
_logger.debug("Received heartbeat");
} else
{
- messageReceived(bodyFrame);
- }
- // _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+ messageReceived(frame);
+ }
}
public void messageSent(IoSession session, Object message) throws Exception
@@ -162,18 +160,19 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol
throws Exception
{
// Need to handle failover
- sessionClosed(session);
+ _logger.info("Exception caught for [ " + this + " : Session " + System.identityHashCode(session) + "]",cause);
+ //sessionClosed(session);
}
public void sessionClosed(IoSession session) throws Exception
{
// Need to handle failover
- _logger.info("Protocol Session [" + this + "] closed");
+ _logger.info("Protocol Session for [ " + this + " : " + System.identityHashCode(session) + "] closed");
}
public void sessionCreated(IoSession session) throws Exception
{
- _logger.debug("Protocol session created for session "
+ _logger.info("Protocol session created for " + this + " session : "
+ System.identityHashCode(session));
final ProtocolCodecFilter pcf = new ProtocolCodecFilter(
@@ -184,7 +183,8 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol
{
session.getFilterChain().addBefore("AsynchronousWriteFilter",
"protocolFilter", pcf);
- } else
+ }
+ else
{
session.getFilterChain().addLast("protocolFilter", pcf);
}
@@ -213,12 +213,13 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol
e.printStackTrace();
}
+ _ioSession = session;
doAMQPConnectionNegotiation();
}
public void sessionOpened(IoSession session) throws Exception
{
- _logger.debug("Protocol session opened for session "
+ _logger.info("Protocol session opened for " + this + " : session "
+ System.identityHashCode(session));
}
@@ -230,6 +231,7 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol
private void doAMQPConnectionNegotiation()
{
int i = pv.length - 1;
+ _logger.debug("Engaging in connection negotiation");
writeFrame(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
}
@@ -257,7 +259,8 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol
}
/**
- * ----------------------------------------------------------- Failover
- * section -----------------------------------------------------------
+ * -----------------------------------------------------------
+ * Failover section
+ * -----------------------------------------------------------
*/
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java
index c13bb873a7..ba38848149 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java
@@ -26,10 +26,13 @@ public class VMConnection implements TransportConnection
private BrokerDetails _brokerDetails;
private IoConnector _ioConnector;
private Phase _phase;
+ private PhaseContext _ctx;
- protected VMConnection(ConnectionURL url)
+ protected VMConnection(ConnectionURL url,PhaseContext ctx)
{
_brokerDetails = url.getBrokerDetails(0);
+ _ctx = ctx;
+
_ioConnector = new VmPipeConnector();
final IoServiceConfig cfg = _ioConnector.getDefaultConfig();
ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance();
@@ -45,11 +48,10 @@ public class VMConnection implements TransportConnection
{
createVMBroker();
- PhaseContext ctx = new DefaultPhaseContext();
- ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
- ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
+ _ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
+ _ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
- _phase = PhaseFactory.createPhasePipe(ctx);
+ _phase = PhaseFactory.createPhasePipe(_ctx);
_phase.start();
return _phase;