summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-03-27 16:31:05 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-03-27 16:31:05 +0000
commit0f9044243547ded8521af0c8d0ff81d791d8048d (patch)
tree7ea962842e9af041430d3a892ea1292b22b95f39
parentc3b33ead3e0028b44020bdb02cd139c8a85f409e (diff)
downloadqpid-python-0f9044243547ded8521af0c8d0ff81d791d8048d.tar.gz
This is the initial checkup for the new client
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@522988 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/newclient/.project17
-rw-r--r--java/newclient/pom.xml188
-rw-r--r--java/newclient/src/main/java/client.log4j28
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java23
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java64
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java284
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java348
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java88
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java232
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java78
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java117
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java129
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java54
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java93
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java59
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java8
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java26
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java11
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java50
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java47
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java84
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml86
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java55
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java97
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java20
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java38
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java35
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java60
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java66
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java19
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java151
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java129
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java240
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java124
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java64
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java17
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java40
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java64
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java11
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java133
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java30
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java98
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java75
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java46
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java60
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java344
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java412
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java73
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java77
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java82
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java34
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java44
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java263
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java133
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java14
55 files changed, 5262 insertions, 0 deletions
diff --git a/java/newclient/.project b/java/newclient/.project
new file mode 100644
index 0000000000..0522e6b5dc
--- /dev/null
+++ b/java/newclient/.project
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>qpid-newclient</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
diff --git a/java/newclient/pom.xml b/java/newclient/pom.xml
new file mode 100644
index 0000000000..d2d27f3fd7
--- /dev/null
+++ b/java/newclient/pom.xml
@@ -0,0 +1,188 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-newclient</artifactId>
+ <packaging>jar</packaging>
+ <version>1.0-incubating-M2-SNAPSHOT</version>
+ <name>Qpid New Client</name>
+ <url>http://cwiki.apache.org/confluence/display/qpid</url>
+
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid</artifactId>
+ <version>1.0-incubating-M2-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <topDirectoryLocation>..</topDirectoryLocation>
+ <java.source.version>1.5</java.source.version>
+ <qpid.version>${pom.version}</qpid.version>
+ <qpid.targetDir>${project.build.directory}</qpid.targetDir>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.mina</groupId>
+ <artifactId>mina-filter-ssl</artifactId>
+ </dependency>
+
+ <!-- Test Dependencies -->
+ <dependency> <!-- for inVm Broker -->
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>jmscts</groupId>
+ <artifactId>jmscts</artifactId>
+ <version>0.5-b2</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymockclassextension</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemProperties>
+ <property>
+ <name>amqj.noAutoCreateVMBroker</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>amqj.logging.level</name>
+ <value>${amqj.logging.level}</value>
+ </property>
+ <property>
+ <name>log4j.configuration</name>
+ <value>file:///${basedir}/src/main/java/client.log4j</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+
+<!-- The inclusion of this resource causes the build to hang. -->
+ <!--resources>
+ <resource>
+ <targetPath>META-INF/</targetPath>
+ <filtering>false</filtering>
+ <directory>../resources/META-INF</directory>
+ <includes>
+ <include>**</include>
+ </includes>
+ </resource>
+ </resources-->
+
+ <testResources>
+ <testResource>
+ <targetPath>META-INF/</targetPath>
+ <filtering>false</filtering>
+ <directory>../resources/META-INF</directory>
+ <includes>
+ <include>**</include>
+ </includes>
+ </testResource>
+ <testResource>
+ <targetPath>src/</targetPath>
+ <filtering>false</filtering>
+ <directory>src/test/java</directory>
+ <includes>
+ <include>**/*.java</include>
+ </includes>
+ </testResource>
+
+ <testResource>
+ <targetPath></targetPath>
+ <filtering>false</filtering>
+ <directory>src/main/java</directory>
+ <includes>
+ <include>client.log4j</include>
+ </includes>
+ </testResource>
+ </testResources>
+
+ </build>
+
+</project>
diff --git a/java/newclient/src/main/java/client.log4j b/java/newclient/src/main/java/client.log4j
new file mode 100644
index 0000000000..525433e9a9
--- /dev/null
+++ b/java/newclient/src/main/java/client.log4j
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+log4j.rootLogger=${root.logging.level}
+
+
+log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+log4j.additivity.org.apache.qpid=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=all
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java
new file mode 100644
index 0000000000..49575be8d2
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java
@@ -0,0 +1,23 @@
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+
+public abstract class AMQPCallBack
+{
+ private boolean _isComplete = false;
+
+ public abstract void brokerResponded(AMQMethodBody body);
+
+ public abstract void brokerRespondedWithError(AMQException e);
+
+ public void setIsComplete(boolean isComplete)
+ {
+ _isComplete = isComplete;
+ }
+
+ public boolean isComplete()
+ {
+ return _isComplete;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
new file mode 100644
index 0000000000..890b0dd6eb
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
@@ -0,0 +1,64 @@
+package org.apache.qpid.nclient.amqp;
+
+import java.security.SecureRandom;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.model.AMQPMethodEvent;
+
+public abstract class AMQPCallBackSupport
+{
+ private SecureRandom _localCorrelationIdGenerator = new SecureRandom();
+ protected ConcurrentHashMap<Long,AMQPCallBack> _cbMap = new ConcurrentHashMap<Long,AMQPCallBack>();
+
+ //the channelId assigned for this instance
+ protected int _channelId;
+
+ public AMQPCallBackSupport(int channelId)
+ {
+ _channelId = channelId;
+ }
+
+ private long getNextCorrelationId()
+ {
+ return _localCorrelationIdGenerator.nextLong();
+ }
+
+
+ // For methods that still use nowait, hopefully they will remove nowait
+ protected AMQPMethodEvent handleNoWait(boolean noWait,AMQMethodBody methodBody,AMQPCallBack cb)
+ {
+ if(noWait)
+ {
+ // u only need to register if u are expecting a response
+ long localCorrelationId = getNextCorrelationId();
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId);
+ _cbMap.put(localCorrelationId, cb);
+ return msg;
+ }
+ else
+ {
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID);
+ return msg;
+ }
+ }
+
+ protected AMQPMethodEvent handleAsynchronousCall(AMQMethodBody methodBody,AMQPCallBack cb)
+ {
+ long localCorrelationId = getNextCorrelationId();
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId);
+ _cbMap.put(localCorrelationId, cb);
+ return msg;
+ }
+
+ protected void invokeCallBack(long localCorrelationId, AMQMethodBody methodBody)
+ {
+ if(_cbMap.contains(localCorrelationId))
+ {
+ AMQPCallBack cb = (AMQPCallBack)_cbMap.get(localCorrelationId);
+ cb.brokerResponded(methodBody);
+ }
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
new file mode 100644
index 0000000000..d86f948e28
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
@@ -0,0 +1,284 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.nclient.amqp;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ChannelOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ChannelResumeBody;
+import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.model.AMQPMethodEvent;
+import org.apache.qpid.nclient.model.AMQPMethodListener;
+import org.apache.qpid.nclient.util.AMQPValidator;
+
+/**
+ * This represents the Channel class defined in the AMQP protocol.
+ * This class is a finite state machine and is thread safe by design.
+ * Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown.
+ * Only one thread can enter the methods that change state, at a given time.
+ * The AMQP protocol recommends one thread per channel by design.
+ *
+ * A JMS Session can wrap an instance of this class.
+ */
+
+public class AMQPChannel extends AMQPStateMachine implements AMQPMethodListener
+{
+private static final Logger _logger = Logger.getLogger(AMQPChannel.class);
+
+ //the channelId assigned for this channel
+ private int _channelId;
+ private Phase _phase;
+ private AMQPState _currentState;
+ private final AMQPState[] _validCloseStates = new AMQPState[]{AMQPState.CHANNEL_OPENED,AMQPState.CHANNEL_SUSPEND};
+ private final AMQPState[] _validResumeStates = new AMQPState[]{AMQPState.CHANNEL_CLOSED,AMQPState.CHANNEL_NOT_OPENED};
+
+ // The wait period until a server sends a respond
+ private long _serverTimeOut = 1000;
+ private final Lock _lock = new ReentrantLock();
+ private final Condition _channelNotOpend = _lock.newCondition();
+ private final Condition _channelNotClosed = _lock.newCondition();
+ private final Condition _channelFlowNotResponded = _lock.newCondition();
+ private final Condition _channelNotResumed = _lock.newCondition();
+
+ private ChannelOpenOkBody _channelOpenOkBody;
+ private ChannelCloseOkBody _channelCloseOkBody;
+ private ChannelFlowOkBody _channelFlowOkBody;
+ private ChannelOkBody _channelOkBody;
+
+ public AMQPChannel(int channelId)
+ {
+ _channelId = channelId;
+ _currentState = AMQPState.CHANNEL_NOT_OPENED;
+ _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+ }
+
+ /**-------------------------------------------
+ * API Methods
+ *--------------------------------------------
+ */
+
+ /**
+ * Opens the channel
+ */
+ public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException
+ {
+ _lock.lock();
+ try {
+ _channelOpenOkBody = null;
+ checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED,_currentState,AMQPState.CHANNEL_OPENED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelOpenBody,QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+ _channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
+ _currentState = AMQPState.CHANNEL_OPENED;
+ return _channelOpenOkBody;
+ }
+ catch(Exception e)
+ {
+ throw new AMQPException("XXX");
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Close the channel
+ */
+ public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException
+ {
+ _lock.lock();
+ try {
+ _channelCloseOkBody = null;
+ checkIfValidStateTransition(_validCloseStates,_currentState,AMQPState.CHANNEL_CLOSED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelCloseBody,QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+ _channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time");
+ _currentState = AMQPState.CHANNEL_CLOSED;
+ return _channelCloseOkBody;
+ }
+ catch(Exception e)
+ {
+ throw new AMQPException("XXX");
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Channel Flow
+ */
+ public ChannelFlowOkBody close(ChannelFlowBody channelFlowBody) throws AMQPException
+ {
+ _lock.lock();
+ try {
+ _channelFlowOkBody = null;
+ if(channelFlowBody.active)
+ {
+ checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND,_currentState,AMQPState.CHANNEL_OPENED);
+ }
+ else
+ {
+ checkIfValidStateTransition(AMQPState.CHANNEL_OPENED,_currentState,AMQPState.CHANNEL_SUSPEND);
+ }
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelFlowBody,QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+ _channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
+ handleChannelFlowState(_channelFlowOkBody.active);
+ return _channelFlowOkBody;
+ }
+ catch(Exception e)
+ {
+ throw new AMQPException("XXX");
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Close the channel
+ */
+ public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException
+ {
+ _lock.lock();
+ try {
+ _channelOkBody = null;
+ checkIfValidStateTransition(_validResumeStates,_currentState,AMQPState.CHANNEL_OPENED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelResumeBody,QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+ _channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ AMQPValidator.throwExceptionOnNull(_channelOkBody, "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
+ _currentState = AMQPState.CHANNEL_OPENED;
+ return _channelOkBody;
+ }
+ catch(Exception e)
+ {
+ throw new AMQPException("XXX");
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**-------------------------------------------
+ * AMQPMethodListener methods
+ *--------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ if (evt.getMethod() instanceof ChannelOpenOkBody)
+ {
+ _channelOpenOkBody = (ChannelOpenOkBody)evt.getMethod();
+ _channelNotOpend.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelCloseOkBody)
+ {
+ _channelCloseOkBody = (ChannelCloseOkBody)evt.getMethod();
+ _channelNotClosed.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelCloseBody)
+ {
+ handleChannelClose((ChannelCloseBody)evt.getMethod());
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelFlowOkBody)
+ {
+ _channelFlowOkBody = (ChannelFlowOkBody)evt.getMethod();
+ _channelFlowNotResponded.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelFlowBody)
+ {
+ handleChannelFlow((ChannelFlowBody)evt.getMethod());
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelOkBody)
+ {
+ _channelOkBody = (ChannelOkBody)evt.getMethod();
+ //In this case the only method expecting channel-ok is channel-resume
+ // haven't implemented ping and pong.
+ _channelNotResumed.signal();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ private void handleChannelClose(ChannelCloseBody channelCloseBody)
+ {
+ try
+ {
+ _lock.lock();
+ _currentState = AMQPState.CHANNEL_CLOSED;
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ private void handleChannelFlow(ChannelFlowBody channelFlowBody)
+ {
+ _lock.lock();
+ try
+ {
+ handleChannelFlowState(channelFlowBody.active);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ private void handleChannelFlowState(boolean flow)
+ {
+ _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
new file mode 100644
index 0000000000..9c9e913cd3
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
@@ -0,0 +1,348 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.model.AMQPMethodEvent;
+import org.apache.qpid.nclient.model.AMQPMethodListener;
+import org.apache.qpid.nclient.transport.TransportConnection;
+import org.apache.qpid.nclient.util.AMQPValidator;
+
+/**
+ * This maps directly to the Connection class defined in the AMQP protocol This class is a finite state machine and is
+ * thread safe by design A particular method (state changing) can only be invoked once and only in sequence or else an
+ * IllegalStateTransitionException will be thrown Also only one thread can enter those methods at a given time.
+ */
+public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(AMQPConnection.class);
+
+ private Phase _phase;
+
+ private TransportConnection _connection;
+
+ private long _correlationId;
+
+ private AMQPState _currentState;
+
+ private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED,
+ AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED,
+ AMQPState.CONNECTION_OPEN, };
+
+ // The wait period until a server sends a respond
+ private long _serverTimeOut = 1000;
+
+ private final Lock _lock = new ReentrantLock();
+
+ private final Condition _connectionNotStarted = _lock.newCondition();
+
+ private final Condition _connectionNotSecure = _lock.newCondition();
+
+ private final Condition _connectionNotTuned = _lock.newCondition();
+
+ private final Condition _connectionNotOpened = _lock.newCondition();
+
+ private final Condition _connectionNotClosed = _lock.newCondition();
+
+ private ConnectionStartBody _connectionStartBody;
+
+ private ConnectionSecureBody _connectionSecureBody;
+
+ private ConnectionTuneBody _connectionTuneBody;
+
+ private ConnectionOpenOkBody _connectionOpenOkBody;
+
+ private ConnectionCloseOkBody _connectionCloseOkBody;
+
+ public AMQPConnection(TransportConnection connection)
+ {
+ _connection = connection;
+ _currentState = AMQPState.CONNECTION_UNDEFINED;
+ _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+ }
+
+ /**
+ * ------------------------------------------- API Methods --------------------------------------------
+ */
+
+ /**
+ * Opens the TCP connection and let the formalities begin.
+ */
+ public ConnectionStartBody openTCPConnection() throws AMQPException
+ {
+ _lock.lock();
+ // open the TCP connection
+ try
+ {
+ _connectionStartBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED);
+ _phase = _connection.connect();
+
+ // waiting for ConnectionStartBody or error in connection
+ _connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ AMQPValidator.throwExceptionOnNull(_connectionStartBody,
+ "The broker didn't send the ConnectionStartBody in time");
+ _currentState = AMQPState.CONNECTION_NOT_STARTED;
+ return _connectionStartBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("XXX");
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ public ConnectionSecureBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _connectionSecureBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState,
+ AMQPState.CONNECTION_NOT_SECURE);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
+ _phase.messageSent(msg);
+ _connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ AMQPValidator.throwExceptionOnNull(_connectionSecureBody,
+ "The broker didn't send the ConnectionSecureBody in time");
+ _currentState = AMQPState.CONNECTION_NOT_SECURE;
+ return _connectionSecureBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("XXX");
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
+ * issue a new challenge
+ */
+ public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _connectionTuneBody = null;
+ _connectionSecureBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
+ _connectionSecureBody = null; // The server could send a fresh challenge
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody,
+ _correlationId);
+ _phase.messageSent(msg);
+ _connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ if (_connectionTuneBody != null)
+ {
+ _currentState = AMQPState.CONNECTION_NOT_TUNED;
+ return _connectionTuneBody;
+ }
+ else if (_connectionSecureBody != null)
+ { // oops the server sent another challenge
+ _currentState = AMQPState.CONNECTION_NOT_SECURE;
+ return _connectionSecureBody;
+ }
+ else
+ {
+ throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
+ }
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("XXX");
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED);
+ _connectionSecureBody = null;
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId);
+ _phase.messageSent(msg);
+ _currentState = AMQPState.CONNECTION_NOT_OPENED;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("XXX");
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _connectionOpenOkBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody,
+ QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+ _connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody,
+ "The broker didn't send the ConnectionOpenOkBody in time");
+ _currentState = AMQPState.CONNECTION_OPEN;
+ return _connectionOpenOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("XXX");
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _connectionCloseOkBody = null;
+ checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody,
+ QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+ _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody,
+ "The broker didn't send the ConnectionCloseOkBody in time");
+ _currentState = AMQPState.CONNECTION_CLOSED;
+ return _connectionCloseOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("XXX");
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * ------------------------------------------- AMQMethodListener methods
+ * --------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ _correlationId = evt.getCorrelationId();
+
+ if (evt.getMethod() instanceof ConnectionStartBody)
+ {
+ _connectionStartBody = (ConnectionStartBody) evt.getMethod();
+ _connectionNotStarted.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionSecureBody)
+ {
+ _connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
+ _connectionNotSecure.signal();
+ _connectionNotTuned.signal(); // in case the server has sent another chanllenge
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionTuneBody)
+ {
+ _connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
+ _connectionNotTuned.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionOpenOkBody)
+ {
+ _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
+ _connectionNotOpened.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionCloseOkBody)
+ {
+ _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
+ _connectionNotClosed.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionCloseBody)
+ {
+ handleClose();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public void handleClose() throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ checkIfValidStateTransition(AMQPState.CONNECTION_OPEN, _currentState, AMQPState.CONNECTION_CLOSING);
+ _currentState = AMQPState.CONNECTION_CLOSING;
+ // do the required cleanup and send a ConnectionCloseOkBody
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("XXX");
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java
new file mode 100644
index 0000000000..5315f7f318
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.ExchangeDeleteBody;
+import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.model.AMQPMethodEvent;
+import org.apache.qpid.nclient.model.AMQPMethodListener;
+
+/**
+ *
+ * This class represents the Exchange class defined in AMQP.
+ * Each method takes an @see AMQPCallBack object if it wants to know
+ * the response from the broker to particular method.
+ * Clients can handle the reponse asynchronously or block for a response
+ * using AMQPCallBack.isComplete() periodically using a loop.
+ */
+public class AMQPExchange extends AMQPCallBackSupport implements AMQPMethodListener
+{
+ private Phase _phase;
+
+ public AMQPExchange(int channelId,Phase phase)
+ {
+ super(channelId);
+ _phase = phase;
+ }
+
+ /**
+ * -----------------------------------------------
+ * API Methods
+ * -----------------------------------------------
+ */
+ public void declare(ExchangeDeclareBody exchangeDeclareBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleNoWait(exchangeDeclareBody.nowait,exchangeDeclareBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void delete(ExchangeDeleteBody exchangeDeleteBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleNoWait(exchangeDeleteBody.nowait,exchangeDeleteBody,cb);
+ _phase.messageSent(msg);
+ }
+
+
+ /**-------------------------------------------
+ * AMQPMethodListener methods
+ *--------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ long localCorrelationId = evt.getLocalCorrelationId();
+ AMQMethodBody methodBody = evt.getMethod();
+ if ( methodBody instanceof ExchangeDeclareOkBody || methodBody instanceof ExchangeDeleteOkBody)
+ {
+ invokeCallBack(localCorrelationId,methodBody);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java
new file mode 100644
index 0000000000..e3ad9d6306
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java
@@ -0,0 +1,232 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageEmptyBody;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.framing.MessageOffsetBody;
+import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageQosBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageRejectBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.model.AMQPMethodEvent;
+import org.apache.qpid.nclient.model.AMQPMethodListener;
+
+/**
+ * This class represents the AMQP Message class.
+ * You need an instance of this class per channel.
+ * A @see AMQPMessageCallBack class is taken as an argument in the constructor.
+ * A client can use this class to issue Message class methods on the broker.
+ * When the broker issues Message class methods on the client, the client is notified
+ * via the AMQPMessageCallBack interface.
+ *
+ * A JMS Message producer implementation can wrap an instance if this and map
+ * JMS method calls to the appropriate AMQP methods.
+ *
+ * AMQPMessageCallBack can be implemented by the JMS MessageConsumer implementation.
+ *
+ */
+public class AMQPMessage extends AMQPCallBackSupport implements AMQPMethodListener
+{
+ private Phase _phase;
+ private AMQPMessageCallBack _messageCb;
+
+ public AMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb)
+ {
+ super(channelId);
+ _phase = phase;
+ _messageCb = messageCb;
+ }
+
+ /**
+ * -----------------------------------------------
+ * API Methods
+ * -----------------------------------------------
+ */
+
+ public void transfer(MessageTransferBody messageTransferBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageTransferBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void consume(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void cancel(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void get(MessageGetBody messageGetBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageGetBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void recover(MessageRecoverBody messageRecoverBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageRecoverBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void open(MessageOpenBody messageOpenBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageOpenBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void close(MessageCloseBody messageCloseBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageCloseBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void append(MessageAppendBody messageAppendBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageAppendBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void checkpoint(MessageCheckpointBody messageCheckpointBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageCheckpointBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void resume(MessageResumeBody messageResumeBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageResumeBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void qos(MessageQosBody messageQosBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageQosBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /**
+ * The correlationId from the request.
+ * For example if a message.transfer is sent with correlationId "ABCD"
+ * then u need to pass that in. This correlation id is used by the execution layer
+ * to handle the correlation of method requests and responses
+ */
+ public void ok(MessageOkBody messageOkBody,long correlationId) throws AMQPException
+ {
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOkBody,correlationId);
+ _phase.messageSent(msg);
+ }
+
+ /**
+ * The correlationId from the request.
+ * For example if a message.transfer is sent with correlationId "ABCD"
+ * then u need to pass that in. This correlation id is used by the execution layer
+ * to handle the correlation of method requests and responses
+ */
+ public void reject(MessageRejectBody messageRejectBody,long correlationId) throws AMQPException
+ {
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageRejectBody,correlationId);
+ _phase.messageSent(msg);
+ }
+
+ /**
+ * The correlationId from the request.
+ * For example if a message.resume is sent with correlationId "ABCD"
+ * then u need to pass that in. This correlation id is used by the execution layer
+ * to handle the correlation of method requests and responses
+ */
+ public void offset(MessageOffsetBody messageOffsetBody,long correlationId) throws AMQPException
+ {
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOffsetBody,correlationId);
+ _phase.messageSent(msg);
+ }
+
+ /**-------------------------------------------
+ * AMQPMethodListener methods
+ *--------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ long localCorrelationId = evt.getLocalCorrelationId();
+ AMQMethodBody methodBody = evt.getMethod();
+ if ( methodBody instanceof MessageOkBody ||
+ methodBody instanceof MessageRejectBody ||
+ methodBody instanceof MessageEmptyBody)
+ {
+ invokeCallBack(localCorrelationId,methodBody);
+ return true;
+ }
+ else if (methodBody instanceof MessageTransferBody)
+ {
+ _messageCb.transfer((MessageTransferBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else if (methodBody instanceof MessageAppendBody)
+ {
+ _messageCb.append((MessageAppendBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else if (methodBody instanceof MessageOpenBody)
+ {
+ _messageCb.open((MessageOpenBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else if (methodBody instanceof MessageCloseBody)
+ {
+ _messageCb.close((MessageCloseBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else if (methodBody instanceof MessageCheckpointBody)
+ {
+ _messageCb.checkpoint((MessageCheckpointBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else if (methodBody instanceof MessageRecoverBody)
+ {
+ _messageCb.recover((MessageRecoverBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else if (methodBody instanceof MessageResumeBody)
+ {
+ _messageCb.resume((MessageResumeBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java
new file mode 100644
index 0000000000..183ed9dba8
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.nclient.core.AMQPException;
+
+/**
+ * This class also represents the AMQP Message class.
+ * You need an instance per channel.
+ * This is passed in as an argument in the constructor of an AMQPMessage instance.
+ * A client who implements this interface is notified When the broker issues
+ * Message class methods on the client.
+ *
+ * A Client should use the AMQPMessage class when it wants to issue Message class
+ * methods on the broker.
+ *
+ * A JMS MessageConsumer implementation can implement this interface and map
+ * AMQP Method notifications to the appropriate JMS methods.
+ *
+ * Simillarly a JMS MessageProducer implementation can wrap an AMQPMessage instance.
+ *
+ */
+
+public interface AMQPMessageCallBack
+{
+ /**
+ * -----------------------------------------------------------------------
+ * This provides Notifications for broker initiated Message class methods.
+ * All methods have a correlationId that u need to pass into
+ * the corresponding Message methods when responding to the broker.
+ *
+ * For example the correlationID passed in from Message.trasnfer
+ * should be passed back when u call Message.ok in AMQPMessage
+ * -----------------------------------------------------------------------
+ */
+
+
+ public void transfer(MessageTransferBody messageTransferBody,long correlationId) throws AMQPException;
+
+ public void recover(MessageRecoverBody messageRecoverBody,long correlationId) throws AMQPException;
+
+ public void open(MessageOpenBody messageOpenBody,long correlationId) throws AMQPException ;
+
+ public void close(MessageCloseBody messageCloseBody,long correlationId) throws AMQPException;
+
+ public void append(MessageAppendBody messageAppendBody,long correlationId) throws AMQPException;
+
+ public void checkpoint(MessageCheckpointBody messageCheckpointBody,long correlationId) throws AMQPException;
+
+ public void resume(MessageResumeBody messageResumeBody,long correlationId) throws AMQPException;
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java
new file mode 100644
index 0000000000..a5fe6de298
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.QueuePurgeBody;
+import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.framing.QueueUnbindBody;
+import org.apache.qpid.framing.QueueUnbindOkBody;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.model.AMQPMethodEvent;
+import org.apache.qpid.nclient.model.AMQPMethodListener;
+
+/**
+ *
+ * This class represents the Queue class defined in AMQP.
+ * Each method takes an @see AMQPCallBack object if it wants to know
+ * the response from the broker to a particular method.
+ * Clients can handle the reponse asynchronously or block for a response
+ * using AMQPCallBack.isComplete() periodically using a loop.
+ */
+public class AMQPQueue extends AMQPCallBackSupport implements AMQPMethodListener
+{
+ private Phase _phase;
+
+ public AMQPQueue(int channelId,Phase phase)
+ {
+ super(channelId);
+ _phase = phase;
+ }
+
+ /**
+ * -----------------------------------------------
+ * API Methods
+ * -----------------------------------------------
+ */
+ public void declare(QueueDeclareBody queueDeclareBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleNoWait(queueDeclareBody.nowait,queueDeclareBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void bind(QueueBindBody queueBindBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleNoWait(queueBindBody.nowait,queueBindBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ // Queue.unbind doesn't have nowait
+ public void unbind(QueueUnbindBody queueUnbindBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(queueUnbindBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void purge(QueuePurgeBody queuePurgeBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleNoWait(queuePurgeBody.nowait,queuePurgeBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void delete(QueueDeleteBody queueDeleteBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleNoWait(queueDeleteBody.nowait,queueDeleteBody,cb);
+ _phase.messageSent(msg);
+ }
+
+
+ /**-------------------------------------------
+ * AMQPMethodListener methods
+ *--------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ long localCorrelationId = evt.getLocalCorrelationId();
+ AMQMethodBody methodBody = evt.getMethod();
+ if ( methodBody instanceof QueueDeclareOkBody ||
+ methodBody instanceof QueueBindOkBody ||
+ methodBody instanceof QueueUnbindOkBody ||
+ methodBody instanceof QueuePurgeOkBody ||
+ methodBody instanceof QueueDeleteOkBody
+ )
+ {
+ invokeCallBack(localCorrelationId,methodBody);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java
new file mode 100644
index 0000000000..38421cfca3
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.model.AMQPMethodEvent;
+import org.apache.qpid.nclient.model.AMQPMethodListener;
+
+/**
+ * This class registeres with the ModelPhase as a AMQMethodListener,
+ * to receive method events and then it distributes methods to other listerners
+ * using a filtering criteria. The criteria is channel id and method body class.
+ * The method listeners are added and removed dynamically
+ *
+ * <p/>
+ */
+public class EventManager implements AMQPMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(EventManager.class);
+
+ private Map <Integer,Map> _channelMap = new ConcurrentHashMap<Integer,Map>();
+
+ /**
+ * ------------------------------------------------
+ * methods introduced by AMQPMethodEventManager
+ * ------------------------------------------------
+ */
+ public void addMethodEventListener(int channelId,Class clazz,AMQPMethodListener l)
+ {
+ Map<Class,List> _methodListenerMap;
+ if (_channelMap.containsKey(channelId))
+ {
+ _methodListenerMap = _channelMap.get(channelId);
+
+ }
+ else
+ {
+ _methodListenerMap = new ConcurrentHashMap<Class,List>();
+ _channelMap.put(channelId, _methodListenerMap);
+ }
+
+ List<AMQPMethodListener> _listeners;
+ if (_methodListenerMap.containsKey(clazz))
+ {
+ _listeners = _methodListenerMap.get(clazz);
+
+ }
+ else
+ {
+ _listeners = new ArrayList<AMQPMethodListener>();
+ _methodListenerMap.put(clazz, _listeners);
+ }
+
+ _listeners.add(l);
+
+ }
+
+ public void removeMethodEventListener(int channelId,Class clazz,AMQPMethodListener l)
+ {
+ if (_channelMap.containsKey(channelId))
+ {
+ Map<Class,List> _methodListenerMap = _channelMap.get(channelId);
+
+ if (_methodListenerMap.containsKey(clazz))
+ {
+ List<AMQPMethodListener> _listeners = _methodListenerMap.get(clazz);
+ _listeners.remove(l);
+ }
+
+ }
+ }
+
+
+ /**
+ * ------------------------------------------------
+ * methods introduced by AMQMethodListener
+ * ------------------------------------------------
+ */
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.model.AMQStateManager#methodReceived(org.apache.qpid.protocol.AMQMethodEvent)
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ if (_channelMap.containsKey(evt.getChannelId()))
+ {
+ Map<Class,List> _methodListenerMap = _channelMap.get(evt.getChannelId());
+
+ if (_methodListenerMap.containsKey(evt.getMethod().getClass()))
+ {
+
+ List<AMQPMethodListener> _listeners = _methodListenerMap.get(evt.getMethod().getClass());
+ for (AMQPMethodListener l:_listeners)
+ {
+ l.methodReceived(evt);
+ }
+
+ return (_listeners.size()>0);
+ }
+
+ }
+
+ return false;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java
new file mode 100644
index 0000000000..8c4cb56971
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java
@@ -0,0 +1,54 @@
+package org.apache.qpid.nclient.amqp.sample;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.StringTokenizer;
+
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.security.AMQPCallbackHandler;
+import org.apache.qpid.nclient.security.CallbackHandlerRegistry;
+import org.apache.qpid.nclient.transport.ConnectionURL;
+
+public class SecurityHelper
+{
+ public static String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException
+ {
+ final String mechanisms = new String(availableMechanisms, "utf8");
+ StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
+ HashSet mechanismSet = new HashSet();
+ while (tokenizer.hasMoreTokens())
+ {
+ mechanismSet.add(tokenizer.nextToken());
+ }
+
+ String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms();
+ StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " ");
+ while (prefTokenizer.hasMoreTokens())
+ {
+ String mech = prefTokenizer.nextToken();
+ if (mechanismSet.contains(mech))
+ {
+ return mech;
+ }
+ }
+ return null;
+ }
+
+ public static AMQPCallbackHandler createCallbackHandler(String mechanism, ConnectionURL url)
+ throws AMQPException
+ {
+ Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism);
+ try
+ {
+ Object instance = mechanismClass.newInstance();
+ AMQPCallbackHandler cbh = (AMQPCallbackHandler) instance;
+ cbh.initialise(url);
+ return cbh;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Unable to create callback handler: " + e, e);
+ }
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
new file mode 100644
index 0000000000..69d2564112
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
@@ -0,0 +1,93 @@
+package org.apache.qpid.nclient.amqp.sample;
+
+import java.util.StringTokenizer;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.nclient.amqp.AMQPConnection;
+import org.apache.qpid.nclient.transport.AMQPConnectionURL;
+import org.apache.qpid.nclient.transport.ConnectionURL;
+import org.apache.qpid.nclient.transport.TransportConnection;
+import org.apache.qpid.nclient.transport.TransportConnectionFactory;
+import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
+
+/**
+ * This class illustrates the usage of the API
+ * Notes this is just a simple demo.
+ *
+ * I have used Helper classes to keep the code cleaner.
+ */
+public class TestClient
+{
+ private byte major;
+ private byte minor;
+ private ConnectionURL _url;
+
+ public AMQPConnection openConnection() throws Exception
+ {
+ _url = new AMQPConnectionURL("");
+ TransportConnection conn = TransportConnectionFactory.createTransportConnection(_url, ConnectionType.VM);
+ return new AMQPConnection(conn);
+ }
+
+ public void handleProtocolNegotiation(AMQPConnection con) throws Exception
+ {
+ // ConnectionStartBody
+ ConnectionStartBody connectionStartBody = con.openTCPConnection();
+ major = connectionStartBody.getMajor();
+ minor = connectionStartBody.getMajor();
+
+ FieldTable clientProperties = FieldTableFactory.newFieldTable();
+ clientProperties.put(new AMQShortString(ClientProperties.instance.toString()),"Test"); // setting only the client id
+
+ final String locales = new String(connectionStartBody.getLocales(), "utf8");
+ final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
+
+ final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms());
+
+ SaslClient sc = Sasl.createSaslClient(new String[]{mechanism},
+ null, "AMQP", "localhost",
+ null, SecurityHelper.createCallbackHandler(mechanism,_url));
+
+ ConnectionStartOkBody connectionStartOkBody =
+ ConnectionStartOkBody.createMethodBody(major, minor, clientProperties,
+ new AMQShortString(tokenizer.nextToken()),
+ new AMQShortString(mechanism),
+ (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null));
+ // ConnectionSecureBody
+ ConnectionSecureBody connectionSecureBody = con.startOk(connectionStartOkBody);
+
+ ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(
+ major,minor,sc.evaluateChallenge(connectionSecureBody.getChallenge()));
+
+ // Assuming the server is not going to send another challenge
+ ConnectionTuneBody connectionTuneBody = (ConnectionTuneBody)con.secureOk(connectionSecureOkBody);
+
+ // Using broker supplied values
+ ConnectionTuneOkBody connectionTuneOkBody =
+ ConnectionTuneOkBody.createMethodBody(major,minor,
+ connectionTuneBody.getChannelMax(),
+ connectionTuneBody.getFrameMax(),
+ connectionTuneBody.getHeartbeat());
+ con.tuneOk(connectionTuneOkBody);
+ }
+
+ public static void main(String[] args)
+ {
+ TestClient test = new TestClient();
+ AMQPConnection con = test.openConnection();
+
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
new file mode 100644
index 0000000000..3555704c4f
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.state;
+
+/**
+ * States used in the AMQ protocol. Used by the finite state machine to determine
+ * valid responses.
+ */
+public class AMQPState
+{
+ private final int _id;
+
+ private final String _name;
+
+ private AMQPState(int id, String name)
+ {
+ _id = id;
+ _name = name;
+ }
+
+ public String toString()
+ {
+ return "AMQState: id = " + _id + " name: " + _name;
+ }
+
+ // Connection state
+ public static final AMQPState CONNECTION_UNDEFINED = new AMQPState(0, "CONNECTION_UNDEFINED");
+ public static final AMQPState CONNECTION_NOT_STARTED = new AMQPState(1, "CONNECTION_NOT_STARTED");
+ public static final AMQPState CONNECTION_NOT_SECURE = new AMQPState(2, "CONNECTION_NOT_SECURE");
+ public static final AMQPState CONNECTION_NOT_TUNED = new AMQPState(2, "CONNECTION_NOT_TUNED");
+ public static final AMQPState CONNECTION_NOT_OPENED = new AMQPState(3, "CONNECTION_NOT_OPENED");
+ public static final AMQPState CONNECTION_OPEN = new AMQPState(4, "CONNECTION_OPEN");
+ public static final AMQPState CONNECTION_CLOSING = new AMQPState(5, "CONNECTION_CLOSING");
+ public static final AMQPState CONNECTION_CLOSED = new AMQPState(6, "CONNECTION_CLOSED");
+
+ // Channel state
+ public static final AMQPState CHANNEL_NOT_OPENED = new AMQPState(10, "CHANNEL_NOT_OPENED");
+ public static final AMQPState CHANNEL_OPENED = new AMQPState(11, "CHANNEL_OPENED");
+ public static final AMQPState CHANNEL_CLOSED = new AMQPState(11, "CHANNEL_CLOSED");
+ public static final AMQPState CHANNEL_SUSPEND = new AMQPState(11, "CHANNEL_SUSPEND");
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java
new file mode 100644
index 0000000000..67f854caf9
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java
@@ -0,0 +1,8 @@
+package org.apache.qpid.nclient.amqp.state;
+
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPStateListener
+{
+ public void stateChanged(AMQPState oldState, AMQPState newState) throws AMQPException;
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java
new file mode 100644
index 0000000000..c1fde7181d
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java
@@ -0,0 +1,26 @@
+package org.apache.qpid.nclient.amqp.state;
+
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class AMQPStateMachine
+{
+ protected void checkIfValidStateTransition(AMQPState correctState,AMQPState currentState,AMQPState requiredState) throws IllegalStateTransitionException
+ {
+ if (currentState != correctState)
+ {
+ throw new IllegalStateTransitionException(currentState,requiredState);
+ }
+ }
+
+ protected void checkIfValidStateTransition(AMQPState[] correctStates,AMQPState currentState,AMQPState requiredState) throws IllegalStateTransitionException
+ {
+ for(AMQPState correctState :correctStates)
+ {
+ if (currentState == correctState)
+ {
+ return;
+ }
+ }
+ throw new IllegalStateTransitionException(currentState,requiredState);
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java
new file mode 100644
index 0000000000..2956a19e66
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java
@@ -0,0 +1,11 @@
+package org.apache.qpid.nclient.amqp.state;
+
+import org.apache.qpid.AMQException;
+
+public interface AMQPStateManager
+{
+
+ public void addListener(AMQPStateListener l)throws AMQException;
+
+ public void removeListener(AMQPStateListener l)throws AMQException;
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java
new file mode 100644
index 0000000000..cbae4aafee
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.state;
+
+/**
+ * The Type of States used in the AMQ protocol.
+ * This allows to partition listeners by the type of states they want
+ * to listen rather than all.
+ * For example an Object might only be interested in Channel state
+ */
+public class AMQPStateType
+{
+ private final int _typeId;
+
+ private final String _typeName;
+
+ private AMQPStateType(int id, String name)
+ {
+ _typeId = id;
+ _typeName = name;
+ }
+
+ public String toString()
+ {
+ return "AMQState: id = " + _typeId + " name: " + _typeName;
+ }
+
+ // Connection state
+ public static final AMQPStateType CONNECTION_STATE = new AMQPStateType(0, "CONNECTION_STATE");
+ public static final AMQPStateType CHANNEL_STATE = new AMQPStateType(1, "CHANNEL_STATE");
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java
new file mode 100644
index 0000000000..fdc24d1d2f
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.state;
+
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class IllegalStateTransitionException extends AMQPException
+{
+ private AMQPState _currentState;
+ private AMQPState _desiredState;
+
+ public IllegalStateTransitionException(AMQPState currentState, AMQPState desiredState)
+ {
+ super("No valid state transition defined from state " + currentState +
+ " to state " + desiredState);
+ _currentState = currentState;
+ _desiredState = desiredState;
+ }
+
+ public AMQPState getCurrentState()
+ {
+ return _currentState;
+ }
+
+ public AMQPState getDesiredState()
+ {
+ return _desiredState;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java
new file mode 100644
index 0000000000..1c3bb788a0
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java
@@ -0,0 +1,84 @@
+package org.apache.qpid.nclient.config;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.configuration.CombinedConfiguration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.SystemConfiguration;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.nclient.core.QpidConstants;
+
+/**
+ * Loads a properties file from classpath.
+ * These values can be overwritten using system properties
+ */
+public class ClientConfiguration extends CombinedConfiguration {
+
+ private static final Logger _logger = Logger.getLogger(ClientConfiguration.class);
+ private static ClientConfiguration _instance = new ClientConfiguration();
+
+ ClientConfiguration()
+ {
+ super();
+ addConfiguration(new SystemConfiguration());
+ try
+ {
+ XMLConfiguration config = new XMLConfiguration();
+ config.load(getInputStream());
+ addConfiguration(config);
+ }
+ catch (ConfigurationException e)
+ {
+ _logger.warn("Client Properties missing, using defaults",e);
+ }
+ }
+
+ public static ClientConfiguration get()
+ {
+ return _instance;
+ }
+
+ private InputStream getInputStream()
+ {
+ if (System.getProperty(QpidConstants.CONFIG_FILE_PATH) != null)
+ {
+ try
+ {
+ return new FileInputStream((String)System.getProperty(QpidConstants.CONFIG_FILE_PATH));
+ }
+ catch(Exception e)
+ {
+ return this.getClass().getResourceAsStream("client.xml");
+ }
+ }
+ else
+ {
+ return this.getClass().getResourceAsStream("client.xml");
+ }
+
+ }
+
+ public static void main(String[] args)
+ {
+ System.out.println(ClientConfiguration.get().getString(QpidConstants.USE_SHARED_READ_WRITE_POOL));
+
+ //System.out.println(ClientConfiguration.get().getString("methodListeners.methodListener(1).[@class]"));
+ int count = ClientConfiguration.get().getMaxIndex(QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER);
+ System.out.println(count);
+
+ for(int i=0 ;i<count;i++)
+ {
+ String methodListener = QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER + "(" + i + ")";
+ System.out.println("\n\n"+ClientConfiguration.get().getString(methodListener + QpidConstants.CLASS));
+ List<String> list = ClientConfiguration.get().getList(methodListener + "." + QpidConstants.METHOD_CLASS);
+ for(String s:list)
+ {
+ System.out.println(s);
+ }
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml b/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml
new file mode 100644
index 0000000000..587271acd1
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="ISO-8859-1" ?>
+<qpidClientConfig>
+
+<security>
+ <saslClientFactoryTypes>
+ <saslClientFactory type="AMQPLAIN">org.apache.qpid.client.security.amqplain.AmqPlainSaslClientFactory</saslClientFactory>
+ </saslClientFactoryTypes>
+ <securityMechanisms>
+ <securityMechanismHandler type="PLAIN">org.apache.qpid.client.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
+ <securityMechanismHandler type="CRAM_MD5">org.apache.qpid.client.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
+ </securityMechanisms>
+</security>
+
+<!-- Transport Layer properties -->
+<useSharedReadWritePool>true</useSharedReadWritePool>
+<enableDirectBuffers>true</enableDirectBuffers>
+<enablePooledAllocator>false</enablePooledAllocator>
+<tcpNoDelay>true</tcpNoDelay>
+<sendBufferSizeInKb>32</sendBufferSizeInKb>
+<reciveBufferSizeInKb>32</reciveBufferSizeInKb>
+<qpidVMBrokerClass>org.apache.qpid.server.protocol.AMQPFastProtocolHandler</qpidVMBrokerClass>
+
+<!-- Execution Layer properties -->
+<maxAccumilatedResponses>20</maxAccumilatedResponses>
+
+<!-- Model Phase properties -->
+<serverTimeoutInMilliSeconds>1000</serverTimeoutInMilliSeconds>
+<maxAccumilatedResponses>20</maxAccumilatedResponses>
+<stateManager></stateManager>
+<stateListerners>
+ <stateType class="org.apache.qpid.nclient.model.state.AMQPStateType.CONNECTION_STATE">
+ <stateListerner></stateListerner>
+ </stateType>
+ <stateType class="org.apache.qpid.nclient.model.state.AMQPStateType.CHANNEL_STATE">
+ <stateListerner></stateListerner>
+ </stateType>
+</stateListerners>
+
+<methodListeners>
+ <methodListener class="org.apache.qpid.nclient.amqp.AMQPConnection">
+ <methodClass>org.apache.qpid.framing.ConnectionStartBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ConnectionSecureBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ConnectionTuneBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ConnectionOpenOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ConnectionCloseBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ConnectionCloseOkBody</methodClass>
+
+ <methodClass>org.apache.qpid.framing.ChannelOpenOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ChannelCloseBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ChannelCloseOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ChannelFlowBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ChannelFlowOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ChannelOkBody</methodClass>
+
+ <methodClass>org.apache.qpid.framing.ExchangeDeclareOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ExchangeDeleteOkBody</methodClass>
+
+ <methodClass>org.apache.qpid.framing.QueueDeclareOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.QueueBindOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.QueueUnbindOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.QueuePurgeOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.QueueDeleteOkBody</methodClass>
+
+ <methodClass>org.apache.qpid.framing.MessageAppendBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageCancelBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageCheckpointBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageCloseBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageGetBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageOffsetBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageOpenBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageQosBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageRecoverBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageRejectBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageResumeBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageTransferBody</methodClass>
+ </methodListener>
+</methodListeners>
+
+<phasePipe>
+ <phase index="0">org.apache.qpid.nclient.transport.TransportPhase<phase>
+ <phase index="1">org.apache.qpid.nclient.execution.ExecutionPhase<phase>
+ <phase index="2">org.apache.qpid.nclient.model.ModelPhase<phase>
+</phasePipe>
+
+</qpidClientConfig> \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java
new file mode 100644
index 0000000000..a029f7d4ff
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java
@@ -0,0 +1,55 @@
+package org.apache.qpid.nclient.core;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
+
+public class AMQPException extends Exception
+{
+ private int _errorCode;
+
+ public AMQPException(String message)
+ {
+ super(message);
+ }
+
+ public AMQPException(String msg, Throwable t)
+ {
+ super(msg, t);
+ }
+
+ public AMQPException(int errorCode, String msg, Throwable t)
+ {
+ super(msg + " [error code " + errorCode + ']', t);
+ _errorCode = errorCode;
+ }
+
+ public AMQPException(int errorCode, String msg)
+ {
+ super(msg + " [error code " + errorCode + ']');
+ _errorCode = errorCode;
+ }
+
+ public AMQPException(Logger logger, String msg, Throwable t)
+ {
+ this(msg, t);
+ logger.error(getMessage(), this);
+ }
+
+ public AMQPException(Logger logger, String msg)
+ {
+ this(msg);
+ logger.error(getMessage(), this);
+ }
+
+ public AMQPException(Logger logger, int errorCode, String msg)
+ {
+ this(errorCode, msg);
+ logger.error(getMessage(), this);
+ }
+
+ public int getErrorCode()
+ {
+ return _errorCode;
+ }
+ }
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java
new file mode 100644
index 0000000000..bf6a19b920
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.core;
+
+public abstract class AbstractPhase implements Phase {
+
+ protected PhaseContext _ctx;
+ protected Phase _nextInFlowPhase;
+ protected Phase _nextOutFlowPhase;
+
+
+ /**
+ * ------------------------------------------------
+ * Phase - method introduced by Phase
+ * ------------------------------------------------
+ */
+ public void init(PhaseContext ctx,Phase nextInFlowPhase, Phase nextOutFlowPhase) {
+ _nextInFlowPhase = nextInFlowPhase;
+ _nextOutFlowPhase = nextOutFlowPhase;
+ _ctx = ctx;
+ }
+
+ /**
+ * The start is called from the top
+ * of the pipe and is propogated the
+ * bottom.
+ *
+ * Each phase can override this to do
+ * any phase specific logic related
+ * pipe.start()
+ */
+ public void start()throws AMQPException
+ {
+ if(_nextOutFlowPhase != null)
+ {
+ _nextOutFlowPhase.start();
+ }
+ }
+
+ /**
+ * Each phase can override this to do
+ * any phase specific cleanup
+ */
+ public void close()throws AMQPException
+ {
+
+ }
+
+ public void messageReceived(Object frame) throws AMQPException
+ {
+ if(_nextInFlowPhase != null)
+ {
+ _nextInFlowPhase.messageReceived(frame);
+ }
+ }
+
+ public void messageSent(Object frame) throws AMQPException
+ {
+ if (_nextOutFlowPhase != null)
+ {
+ _nextOutFlowPhase.messageSent(frame);
+ }
+ }
+
+ public PhaseContext getPhaseContext()
+ {
+ return _ctx;
+ }
+
+ public Phase getNextInFlowPhase() {
+ return _nextInFlowPhase;
+ }
+
+ public Phase getNextOutFlowPhase() {
+ return _nextOutFlowPhase;
+ }
+
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java
new file mode 100644
index 0000000000..a3455cdacd
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java
@@ -0,0 +1,20 @@
+package org.apache.qpid.nclient.core;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DefaultPhaseContext implements PhaseContext
+{
+ public Map<String,Object> _props = new ConcurrentHashMap<String,Object>();
+
+ public Object getProperty(String name)
+ {
+ return _props.get(name);
+ }
+
+ public void setProperty(String name, Object value)
+ {
+ _props.put(name, value);
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java
new file mode 100644
index 0000000000..7aa10b77ff
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java
@@ -0,0 +1,38 @@
+package org.apache.qpid.nclient.core;
+
+
+public interface Phase
+{
+
+ /**
+ * This method is used to initialize a phase
+ *
+ * @param ctx
+ * @param nextInFlowPhase
+ * @param nextOutFlowPhase
+ */
+ public void init(PhaseContext ctx,Phase nextInFlowPhase, Phase nextOutFlowPhase);
+
+ /**
+ *
+ * Implement logic related to physical opening
+ * of the pipe
+ */
+ public void start()throws AMQPException;
+
+ /**
+ * Implement cleanup in this method.
+ * This indicates the pipe is closing
+ */
+ public void close()throws AMQPException;
+
+ public void messageReceived(Object msg) throws AMQPException;
+
+ public void messageSent(Object msg) throws AMQPException;
+
+ public PhaseContext getPhaseContext();
+
+ public Phase getNextOutFlowPhase();
+
+ public Phase getNextInFlowPhase();
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java
new file mode 100644
index 0000000000..d5942fd785
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.core;
+
+/**
+ * This can be thought of as a session context associated
+ * with the pipe. This is transient and is scoped by the
+ * duration of the physical connection.
+ *
+ */
+public interface PhaseContext {
+
+ public Object getProperty(String name);
+
+ public void setProperty(String name, Object value);
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
new file mode 100644
index 0000000000..21602fec8e
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
@@ -0,0 +1,60 @@
+package org.apache.qpid.nclient.core;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.nclient.config.ClientConfiguration;
+
+public class PhaseFactory
+{
+ /**
+ * This method will create the pipe and return a reference
+ * to the top of the pipeline.
+ *
+ * The application can then use this (top most) phase and all
+ * calls will propogated down the pipe.
+ *
+ * Simillar calls orginating at the bottom of the pipeline
+ * will be propogated to the top.
+ *
+ * @param ctx
+ * @return
+ * @throws AMQPException
+ */
+ public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException
+ {
+ Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>();
+ List<String> list = ClientConfiguration.get().getList(QpidConstants.PHASE_PIPE + "." + QpidConstants.PHASE);
+ for(String s:list)
+ {
+ try
+ {
+ Phase temp = (Phase)Class.forName(ClientConfiguration.get().getString(s)).newInstance();
+ phaseMap.put(ClientConfiguration.get().getInt(s + "." + QpidConstants.INDEX),temp) ;
+ }
+ catch(Exception e)
+ {
+ throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s),e);
+ }
+ }
+
+ Phase current = null;
+ Phase prev = null;
+ Phase next = null;
+ //Lets build the phase pipe.
+ for (int i=0; i<phaseMap.size();i++)
+ {
+ current = phaseMap.get(i);
+ if (1+1 < phaseMap.size())
+ {
+ next = phaseMap.get(i+1);
+ }
+ current.init(ctx, next, prev);
+ prev = current;
+ next = null;
+ }
+
+ return current;
+ }
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java
new file mode 100644
index 0000000000..1a949b7270
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java
@@ -0,0 +1,66 @@
+package org.apache.qpid.nclient.core;
+
+public interface QpidConstants {
+
+ // Common properties
+ public static long EMPTY_CORRELATION_ID = -1;
+ public static int CHANNEL_ZERO = 0;
+ public static String CONFIG_FILE_PATH = "ConfigFilePath";
+
+ // Phase Context properties
+ public final static String AMQP_BROKER_DETAILS = "AMQP_BROKER_DETAILS";
+ public final static String MINA_IO_CONNECTOR = "MINA_IO_CONNECTOR";
+ //public final static String AMQP_MAJOR_VERSION = "AMQP_MAJOR_VERSION";
+ //public final static String AMQP_MINOR_VERSION = "AMQP_MINOR_VERSION";
+ //public final static String AMQP_SASL_CLIENT = "AMQP_SASL_CLIENT";
+ //public final static String AMQP_CLIENT_ID = "AMQP_CLIENT_ID";
+ //public final static String AMQP_CONNECTION_TUNE_PARAMETERS = "AMQP_CONNECTION_TUNE_PARAMETERS";
+ //public final static String AMQP_VIRTUAL_HOST = "AMQP_VIRTUAL_HOST";
+ //public final static String AMQP_MESSAGE_STORE = "AMQP_MESSAGE_STORE";
+
+ /**---------------------------------------------------------------
+ * Configuration file properties
+ * ------------------------------------------------------------
+ */
+
+ // Model Layer properties
+ public final static String STATE_MANAGER = "stateManager";
+ public final static String METHOD_LISTENERS = "methodListeners";
+ public final static String METHOD_LISTENER = "methodListener";
+ public final static String CLASS = "[@class]";
+ public final static String METHOD_CLASS = "methodClass";
+
+ public final static String STATE_LISTENERS = "stateListeners";
+ public final static String STATE_LISTENER = "stateListener";
+ public final static String STATE_TYPE = "stateType";
+
+ public final static String AMQP_MESSAGE_STORE_CLASS = "AMQP_MESSAGE_STORE_CLASS";
+ public final static String SERVER_TIMEOUT_IN_MILLISECONDS = "serverTimeoutInMilliSeconds";
+
+ // MINA properties
+ public final static String USE_SHARED_READ_WRITE_POOL = "useSharedReadWritePool";
+ public final static String ENABLE_DIRECT_BUFFERS = "enableDirectBuffers";
+ public final static String ENABLE_POOLED_ALLOCATOR = "enablePooledAllocator";
+ public final static String TCP_NO_DELAY = "tcpNoDelay";
+ public final static String SEND_BUFFER_SIZE_IN_KB = "sendBufferSizeInKb";
+ public final static String RECEIVE_BUFFER_SIZE_IN_KB = "reciveBufferSizeInKb";
+
+ // Security properties
+ public final static String AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES = "saslClientFactoryTypes";
+ public final static String AMQP_SECURITY_SASL_CLIENT_FACTORY = "saslClientFactory";
+ public final static String TYPE = "[@type]";
+
+ public final static String AMQP_SECURITY_MECHANISMS = "securityMechanisms";
+ public final static String AMQP_SECURITY_MECHANISM_HANDLER = "securityMechanismHandler";
+
+ // Execution Layer properties
+ public final static String MAX_ACCUMILATED_RESPONSES = "maxAccumilatedResponses";
+
+ //Transport Layer properties
+ public final static String QPID_VM_BROKER_CLASS = "qpidVMBrokerClass";
+
+ //Phase pipe properties
+ public final static String PHASE_PIPE = "phasePipe";
+ public final static String PHASE = "phase";
+ public final static String INDEX = "[@index]";
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java
new file mode 100644
index 0000000000..cc1503d414
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java
@@ -0,0 +1,19 @@
+package org.apache.qpid.nclient.core;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class TransientPhaseContext implements PhaseContext {
+
+ private Map map = new ConcurrentHashMap();
+
+ public Object getProperty(String name) {
+ return map.get(name);
+ }
+
+ public void setProperty(String name, Object value) {
+ map.put(name, value);
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
new file mode 100644
index 0000000000..8db955afbc
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
@@ -0,0 +1,151 @@
+package org.apache.qpid.nclient.execution;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQRequestBody;
+import org.apache.qpid.framing.AMQResponseBody;
+import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.AbstractPhase;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.model.AMQPMethodEvent;
+import org.apache.qpid.protocol.AMQMethodEvent;
+
+/**
+ * Corressponds to the Layer 2 in AMQP.
+ * This phase handles the correlation of amqp messages
+ * This class implements the 0.9 spec (request/response)
+ */
+public class ExecutionPhase extends AbstractPhase{
+
+ protected static final Logger _logger = Logger.getLogger(ExecutionPhase.class);
+ protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap();
+ protected ConcurrentMap _channelId2ResponseMgrMap = new ConcurrentHashMap();
+
+
+ /**
+ * --------------------------------------------------
+ * Phase related methods
+ * --------------------------------------------------
+ */
+
+ // should add these in the init method
+ //_channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false));
+ //_channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false));
+
+ public void messageReceived(Object msg) throws AMQPException
+ {
+ AMQFrame frame = (AMQFrame) msg;
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ if (bodyFrame instanceof AMQRequestBody)
+ {
+ AMQPMethodEvent event;
+ try
+ {
+ event = messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody)bodyFrame);
+ super.messageReceived(event);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error handling request",e);
+ }
+
+ }
+ else if (bodyFrame instanceof AMQResponseBody)
+ {
+ List<AMQPMethodEvent> events;
+ try
+ {
+ events = messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody)bodyFrame);
+ for (AMQPMethodEvent event: events)
+ {
+ super.messageReceived(event);
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error handling response",e);
+ }
+ }
+ }
+
+ /**
+ * Need to figure out if the message is a request or a response
+ * that needs to be sent and then delegate it to the Request or response manager
+ * to prepare it.
+ */
+ public void messageSent(Object msg) throws AMQPException
+ {
+ AMQPMethodEvent evt = (AMQPMethodEvent)msg;
+ if(evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID)
+ {
+ // This is a request
+ AMQFrame frame = handleRequest(evt);
+ super.messageSent(frame);
+ }
+ else
+ {
+// This is a response
+ List<AMQFrame> frames = handleResponse(evt);
+ for(AMQFrame frame: frames)
+ {
+ super.messageSent(frame);
+ }
+ }
+ }
+
+ /**
+ * ------------------------------------------------
+ * Methods to handle request response
+ * -----------------------------------------------
+ */
+ private AMQPMethodEvent messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Request frame received: " + requestBody);
+ }
+ ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(channelId);
+ if (responseManager == null)
+ throw new AMQException("Unable to find ResponseManager for channel " + channelId);
+ return responseManager.requestReceived(requestBody);
+ }
+
+ private List<AMQPMethodEvent> messageResponseBodyReceived(int channelId, AMQResponseBody responseBody) throws Exception
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Response frame received: " + responseBody);
+ }
+ RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(channelId);
+ if (requestManager == null)
+ throw new AMQException("Unable to find RequestManager for channel " + channelId);
+ return requestManager.responseReceived(responseBody);
+ }
+
+ private AMQFrame handleRequest(AMQPMethodEvent evt)
+ {
+ RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(evt.getChannelId());
+ return requestManager.sendRequest(evt);
+ }
+
+ private List<AMQFrame> handleResponse(AMQPMethodEvent evt) throws AMQPException
+ {
+ ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(evt.getChannelId());
+ try
+ {
+ return responseManager.sendResponse(evt);
+ }
+ catch(Exception e)
+ {
+ throw new AMQPException("Error handling response",e);
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java
new file mode 100644
index 0000000000..761ec1b050
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.execution;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQRequestBody;
+import org.apache.qpid.framing.AMQResponseBody;
+import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.nclient.model.AMQPMethodEvent;
+
+public class RequestManager
+{
+ private static final Logger logger = Logger.getLogger(RequestManager.class);
+
+ private int channel;
+
+ /**
+ * Used for logging and debugging only - allows the context of this instance
+ * to be known.
+ */
+ private boolean serverFlag;
+ private long connectionId;
+
+ /**
+ * Request and response frames must have a requestID and responseID which
+ * indepenedently increment from 0 on a per-channel basis. These are the
+ * counters, and contain the value of the next (not yet used) frame.
+ */
+ private long requestIdCount;
+
+ /**
+ * These keep track of the last requestId and responseId to be received.
+ */
+ private long lastProcessedResponseId;
+
+ private ConcurrentHashMap<Long, Long> requestSentMap;
+
+ public RequestManager(long connectionId, int channel, boolean serverFlag)
+ {
+ this.channel = channel;
+ this.serverFlag = serverFlag;
+ this.connectionId = connectionId;
+ requestIdCount = 1L;
+ lastProcessedResponseId = 0L;
+ requestSentMap = new ConcurrentHashMap<Long, Long>();
+ }
+
+ // *** Functions to originate a request ***
+
+ public AMQFrame sendRequest(AMQPMethodEvent evt)
+ {
+ long requestId = getNextRequestId(); // Get new request ID
+ AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId,
+ lastProcessedResponseId, evt.getMethod());
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
+ "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + evt.getMethod());
+ }
+ requestSentMap.put(requestId, evt.getCorrelationId());
+ return requestFrame;
+ }
+
+ public List<AMQPMethodEvent> responseReceived(AMQResponseBody responseBody)
+ throws Exception
+ {
+ long requestIdStart = responseBody.getRequestId();
+ long requestIdStop = requestIdStart + responseBody.getBatchOffset();
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX RES: " +
+ responseBody + "; " + responseBody.getMethodPayload());
+ }
+
+ List<AMQPMethodEvent> events = new ArrayList<AMQPMethodEvent>();
+ for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++)
+ {
+ if (requestSentMap.get(requestId) == null)
+ {
+ throw new RequestResponseMappingException(requestId,
+ "Failed to locate requestId " + requestId + " in requestSentMap.");
+ }
+ long localCorrelationId = requestSentMap.get(requestId);
+ AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel, responseBody.getMethodPayload(),
+ requestId,localCorrelationId);
+ events.add(methodEvent);
+ requestSentMap.remove(requestId);
+ }
+ lastProcessedResponseId = responseBody.getResponseId();
+ return events;
+ }
+
+ // *** Management functions ***
+
+ public int requestsMapSize()
+ {
+ return requestSentMap.size();
+ }
+
+ // *** Private helper functions ***
+
+ private long getNextRequestId()
+ {
+ return requestIdCount++;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java
new file mode 100644
index 0000000000..97d9576a4e
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java
@@ -0,0 +1,240 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.execution;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQRequestBody;
+import org.apache.qpid.framing.AMQResponseBody;
+import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.model.AMQPMethodEvent;
+
+public class ResponseManager
+{
+ private static final Logger logger = Logger.getLogger(ResponseManager.class);
+
+ private int channel;
+
+ /**
+ * Used for logging and debugging only - allows the context of this instance
+ * to be known.
+ */
+ private boolean serverFlag;
+ private long connectionId;
+
+ private int maxAccumulatedResponses = 20; // Default
+
+ /**
+ * Request and response frames must have a requestID and responseID which
+ * indepenedently increment from 0 on a per-channel basis. These are the
+ * counters, and contain the value of the next (not yet used) frame.
+ */
+ private long responseIdCount;
+
+ /**
+ * These keep track of the last requestId and responseId to be received.
+ */
+ private long lastReceivedRequestId;
+
+ /**
+ * Last requestID sent in a response (for batching)
+ */
+ private long lastSentRequestId;
+
+ private class ResponseStatus implements Comparable<ResponseStatus>
+ {
+ private long requestId;
+ private AMQMethodBody responseMethodBody;
+
+ public ResponseStatus(long requestId)
+ {
+ this.requestId = requestId;
+ responseMethodBody = null;
+ }
+
+ public int compareTo(ResponseStatus o)
+ {
+ return (int)(requestId - o.requestId);
+ }
+
+ public String toString()
+ {
+ // Need to define this
+ return "";
+ }
+ }
+
+ private ConcurrentHashMap<Long, ResponseStatus> responseMap;
+
+ public ResponseManager(long connectionId, int channel, boolean serverFlag)
+ {
+ this.channel = channel;
+ this.serverFlag = serverFlag;
+ this.connectionId = connectionId;
+ responseIdCount = 1L;
+ lastReceivedRequestId = 0L;
+ maxAccumulatedResponses = ClientConfiguration.get().getInt(QpidConstants.MAX_ACCUMILATED_RESPONSES);
+ responseMap = new ConcurrentHashMap<Long, ResponseStatus>();
+ }
+
+ // *** Functions to handle an incoming request ***
+
+ public AMQPMethodEvent requestReceived(AMQRequestBody requestBody) throws Exception
+ {
+ long requestId = requestBody.getRequestId();
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX REQ: " +
+ requestBody + "; " + requestBody.getMethodPayload());
+ }
+ long responseMark = requestBody.getResponseMark();
+ lastReceivedRequestId = requestId;
+ responseMap.put(requestId, new ResponseStatus(requestId));
+
+ AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel,
+ requestBody.getMethodPayload(), requestId);
+
+ return methodEvent;
+ }
+
+ public List<AMQFrame> sendResponse(AMQPMethodEvent evt)
+ throws RequestResponseMappingException
+ {
+ long requestId = evt.getCorrelationId();
+ AMQMethodBody responseMethodBody = evt.getMethod();
+
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
+ "] TX RES: Res[# " + requestId + "]; " + responseMethodBody);
+ }
+
+ ResponseStatus responseStatus = responseMap.get(requestId);
+ if (responseStatus == null)
+ {
+ throw new RequestResponseMappingException(requestId,
+ "Failed to locate requestId " + requestId + " in responseMap." + responseMap);
+ }
+ if (responseStatus.responseMethodBody != null)
+ {
+ throw new RequestResponseMappingException(requestId, "RequestId " +
+ requestId + " already has a response in responseMap.");
+ }
+ responseStatus.responseMethodBody = responseMethodBody;
+ return doBatches();
+ }
+
+ // *** Management functions ***
+
+ /**
+ * Sends batched responses - i.e. all those members of responseMap that have
+ * received a response.
+ */
+ public synchronized List<AMQFrame> doBatches()
+ {
+ long startRequestId = 0;
+ int numAdditionalRequestIds = 0;
+ Class responseMethodBodyClass = null;
+ List<AMQFrame> frames = new ArrayList<AMQFrame>();
+ Iterator<Long> lItr = responseMap.keySet().iterator();
+ while (lItr.hasNext())
+ {
+ long requestId = lItr.next();
+ ResponseStatus responseStatus = responseMap.get(requestId);
+ if (responseStatus.responseMethodBody != null)
+ {
+ frames.add(sendResponseBatchFrame(requestId, 0, responseStatus.responseMethodBody));
+ lItr.remove();
+ }
+ }
+
+ return frames;
+ }
+
+ /**
+ * Total number of entries in the responseMap - including both those that
+ * are outstanding (i.e. no response has been received) and those that are
+ * batched (those for which responses have been received but have not yet
+ * been collected together and sent).
+ */
+ public int responsesMapSize()
+ {
+ return responseMap.size();
+ }
+
+ /**
+ * As the responseMap may contain both outstanding responses (those with
+ * ResponseStatus.responseMethodBody still null) and responses waiting to
+ * be batched (those with ResponseStatus.responseMethodBody not null), we
+ * need to count only those in the map with responseMethodBody null.
+ */
+ public int outstandingResponses()
+ {
+ int cnt = 0;
+ for (Long requestId : responseMap.keySet())
+ {
+ if (responseMap.get(requestId).responseMethodBody == null)
+ cnt++;
+ }
+ return cnt;
+ }
+
+ /**
+ * As the responseMap may contain both outstanding responses (those with
+ * ResponseStatus.responseMethodBody still null) and responses waiting to
+ * be batched (those with ResponseStatus.responseMethodBody not null), we
+ * need to count only those in the map with responseMethodBody not null.
+ */
+ public int batchedResponses()
+ {
+ int cnt = 0;
+ for (Long requestId : responseMap.keySet())
+ {
+ if (responseMap.get(requestId).responseMethodBody != null)
+ cnt++;
+ }
+ return cnt;
+ }
+
+ // *** Private helper functions ***
+
+ private long getNextResponseId()
+ {
+ return responseIdCount++;
+ }
+
+ private AMQFrame sendResponseBatchFrame(long firstRequestId, int numAdditionalRequests,
+ AMQMethodBody responseMethodBody)
+ {
+ long responseId = getNextResponseId(); // Get new response ID
+ AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId,
+ firstRequestId, numAdditionalRequests, responseMethodBody);
+ return responseFrame;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
new file mode 100644
index 0000000000..e7b762b77a
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.nclient.message;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.qpid.client.message.MessageHeaders;
+
+public class AMQPApplicationMessage {
+
+ private int bytesReceived = 0;
+ private int channelId;
+ private byte[] referenceId;
+ private List<byte[]> contents = new LinkedList<byte[]>();
+ private long deliveryTag;
+ private boolean redeliveredFlag;
+ private MessageHeaders messageHeaders;
+
+ public AMQPApplicationMessage(int channelId, byte[] referenceId)
+ {
+ this.channelId = channelId;
+ this.referenceId = referenceId;
+ }
+
+ public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, boolean redeliveredFlag)
+ {
+ this.channelId = channelId;
+ this.deliveryTag = deliveryTag;
+ this.messageHeaders = messageHeaders;
+ this.redeliveredFlag = redeliveredFlag;
+ }
+
+ public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, byte[] content, boolean redeliveredFlag)
+ {
+ this.channelId = channelId;
+ this.deliveryTag = deliveryTag;
+ this.messageHeaders = messageHeaders;
+ this.redeliveredFlag = redeliveredFlag;
+ addContent(content);
+ }
+
+ public void addContent(byte[] content)
+ {
+ contents.add(content);
+ bytesReceived += content.length;
+ }
+
+ public int getBytesReceived()
+ {
+ return bytesReceived;
+ }
+
+ public int getChannelId()
+ {
+ return channelId;
+ }
+
+ public byte[] getReferenceId()
+ {
+ return referenceId;
+ }
+
+ public List<byte[]> getContents()
+ {
+ return contents;
+ }
+
+ public long getDeliveryTag()
+ {
+ return deliveryTag;
+ }
+
+ public boolean getRedeliveredFlag()
+ {
+ return redeliveredFlag;
+ }
+
+ public MessageHeaders getMessageHeaders()
+ {
+ return messageHeaders;
+ }
+
+ public String toString()
+ {
+ return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" + bytesReceived + "; deliveryTag=" +
+ deliveryTag + "; MsgHdrs=" + messageHeaders + "Num contents=" + contents.size() + "; First content=" +
+ new String(contents.get(0));
+ }
+
+ public void setDeliveryTag(long deliveryTag)
+ {
+ this.deliveryTag = deliveryTag;
+ }
+
+ public void setMessageHeaders(MessageHeaders messageHeaders)
+ {
+ this.messageHeaders = messageHeaders;
+ }
+
+ public void setRedeliveredFlag(boolean redeliveredFlag)
+ {
+ this.redeliveredFlag = redeliveredFlag;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java
new file mode 100644
index 0000000000..53a2142718
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.message;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.AbstractPhase;
+import org.apache.qpid.nclient.model.ModelPhase;
+
+public class MessagePhase extends AbstractPhase {
+
+ private final BlockingQueue<AMQPApplicationMessage> _queue = new LinkedBlockingQueue<AMQPApplicationMessage>();
+ private static final Logger _logger = Logger.getLogger(ModelPhase.class);
+
+ public void messageReceived(Object msg) throws AMQPException
+ {
+ try
+ {
+ _queue.put((AMQPApplicationMessage)msg);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.error("Error adding message to queue", e);
+ }
+ super.messageReceived(msg);
+ }
+
+ public void messageSent(Object msg) throws AMQPException
+ {
+ super.messageSent(msg);
+ }
+
+ public AMQPApplicationMessage getNextMessage()
+ {
+ return _queue.poll();
+ }
+
+ public AMQPApplicationMessage getNextMessage(long timeout, TimeUnit tu) throws InterruptedException
+ {
+ return _queue.poll(timeout, tu);
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java
new file mode 100644
index 0000000000..93eecdc0cc
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java
@@ -0,0 +1,17 @@
+package org.apache.qpid.nclient.message;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.MessageHeaders;
+
+public interface MessageStore {
+
+ public void removeMessage(String identifier);
+
+ public void storeContentBodyChunk(String identifier,byte[] contentBody) throws AMQException;
+
+ public void storeMessageMetaData(String identifier, MessageHeaders messageHeaders) throws AMQException;
+
+ public AMQPApplicationMessage getMessage(String identifier) throws AMQException;
+
+ public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException;
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java
new file mode 100644
index 0000000000..26cf2327d7
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java
@@ -0,0 +1,40 @@
+package org.apache.qpid.nclient.message;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.MessageHeaders;
+
+public class TransientMessageStore implements MessageStore {
+
+ private Map<String,AMQPApplicationMessage> messageMap = new ConcurrentHashMap<String,AMQPApplicationMessage>();
+
+ public AMQPApplicationMessage getMessage(String identifier)
+ throws AMQException
+ {
+ return messageMap.get(identifier);
+ }
+
+ public void removeMessage(String identifier)
+ {
+ messageMap.remove(identifier);
+ }
+
+ public void storeContentBodyChunk(String identifier, byte[] contentBody)
+ throws AMQException
+ {
+
+ }
+
+ public void storeMessageMetaData(String identifier,
+ MessageHeaders messageHeaders) throws AMQException
+ {
+
+ }
+
+ public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException
+ {
+
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java
new file mode 100644
index 0000000000..c33c087da8
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java
@@ -0,0 +1,64 @@
+package org.apache.qpid.nclient.model;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+/**
+ * This class is exactly the same as the AMQMethod event.
+ * Except I renamed requestId to corelationId, so I could use it both ways.
+ *
+ * I didn't want to modify anything in common so that there is no
+ * impact on the existing code.
+ *
+ */
+public class AMQPMethodEvent <M extends AMQMethodBody> {
+
+ private final M _method;
+ private final int _channelId;
+ private final long _correlationId;
+ private long _localCorrletionId = 0;
+
+ public AMQPMethodEvent(int channelId, M method, long correlationId,long localCorrletionId)
+ {
+ _channelId = channelId;
+ _method = method;
+ _correlationId = correlationId;
+ _localCorrletionId = localCorrletionId;
+ }
+
+ public AMQPMethodEvent(int channelId, M method, long correlationId)
+ {
+ _channelId = channelId;
+ _method = method;
+ _correlationId = correlationId;
+ }
+
+ public M getMethod()
+ {
+ return _method;
+ }
+
+ public int getChannelId()
+ {
+ return _channelId;
+ }
+
+ public long getCorrelationId()
+ {
+ return _correlationId;
+ }
+
+ public long getLocalCorrelationId()
+ {
+ return _localCorrletionId;
+ }
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("Method event: \n");
+ buf.append("Channel id: \n").append(_channelId);
+ buf.append("Method: \n").append(_method);
+ buf.append("Request Id: ").append(_correlationId);
+ buf.append("Local Correlation Id: ").append(_localCorrletionId);
+ return buf.toString();
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java
new file mode 100644
index 0000000000..52b9f6de91
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java
@@ -0,0 +1,11 @@
+package org.apache.qpid.nclient.model;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPMethodListener
+{
+
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException;
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java
new file mode 100644
index 0000000000..77003b4d21
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java
@@ -0,0 +1,133 @@
+package org.apache.qpid.nclient.model;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.AbstractPhase;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.PhaseContext;
+import org.apache.qpid.nclient.core.QpidConstants;
+
+/**
+ * This Phase handles Layer 3 functionality of the AMQP spec.
+ * This class acts as the interface between the API and the pipeline
+ */
+public class ModelPhase extends AbstractPhase {
+
+ private static final Logger _logger = Logger.getLogger(ModelPhase.class);
+
+ private Map <Class,List> _methodListners = new HashMap<Class,List>();
+
+ /**
+ * ------------------------------------------------
+ * Phase - methods introduced by Phase
+ * ------------------------------------------------
+ */
+ public void init(PhaseContext ctx, Phase nextInFlowPhase, Phase nextOutFlowPhase)
+ {
+ super.init(ctx, nextInFlowPhase, nextOutFlowPhase);
+ try
+ {
+ loadMethodListeners();
+ }
+ catch(Exception e)
+ {
+ _logger.fatal("Error loading method listeners", e);
+ }
+ }
+
+ public void messageReceived(Object msg) throws AMQPException
+ {
+ notifyMethodListerners((AMQPMethodEvent)msg);
+
+ // not doing super.methodReceived here, as this is the end of
+ // the pipeline
+ //super.messageReceived(msg);
+ }
+
+ /**
+ * This method should only except and pass messages
+ * of Type @see AMQPMethodEvent
+ */
+ public void messageSent(Object msg) throws AMQPException
+ {
+ super.messageSent(msg);
+ }
+
+ /**
+ * ------------------------------------------------
+ * Event Handling
+ * ------------------------------------------------
+ */
+
+ public void notifyMethodListerners(AMQPMethodEvent event) throws AMQPException
+ {
+ if (_methodListners.containsKey(event.getMethod().getClass()))
+ {
+ List<AMQPMethodListener> listeners = _methodListners.get(event.getMethod().getClass());
+
+ if(listeners.size()>0)
+ {
+ throw new AMQPException("There are no registered listeners for this method");
+ }
+
+ for(AMQPMethodListener l : listeners)
+ {
+ try
+ {
+ l.methodReceived(event);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error handling method event " + event, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * ------------------------------------------------
+ * Configuration
+ * ------------------------------------------------
+ */
+
+ /**
+ * This method loads method listeners from the client.xml file
+ * For each method class there is a list of listeners
+ */
+ private void loadMethodListeners() throws Exception
+ {
+ int count = ClientConfiguration.get().getMaxIndex(QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER);
+ System.out.println(count);
+
+ for(int i=0 ;i<count;i++)
+ {
+ String methodListener = QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER + "(" + i + ")";
+ String className = ClientConfiguration.get().getString(methodListener + "." + QpidConstants.CLASS);
+ Class listenerClass = Class.forName(className);
+ List<String> list = ClientConfiguration.get().getList(methodListener + "." + QpidConstants.METHOD_CLASS);
+ for(String s:list)
+ {
+ List listeners;
+ Class methodClass = Class.forName(s);
+ if (_methodListners.containsKey(methodClass))
+ {
+ listeners = _methodListners.get(methodClass);
+ }
+ else
+ {
+ listeners = new ArrayList();
+ _methodListners.put(methodClass,listeners);
+ }
+ listeners.add(listenerClass);
+ }
+ }
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java
new file mode 100644
index 0000000000..fc5878f6ef
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security;
+
+import javax.security.auth.callback.CallbackHandler;
+
+import org.apache.qpid.nclient.transport.ConnectionURL;
+
+public interface AMQPCallbackHandler extends CallbackHandler
+{
+ void initialise(ConnectionURL url);
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java
new file mode 100644
index 0000000000..28ba2e355c
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.QpidConstants;
+
+public class CallbackHandlerRegistry
+{
+ private static final Logger _logger = Logger.getLogger(CallbackHandlerRegistry.class);
+
+ private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry();
+
+ private Map<String,Class> _mechanismToHandlerClassMap = new HashMap<String,Class>();
+
+ private String _mechanisms;
+
+ public static CallbackHandlerRegistry getInstance()
+ {
+ return _instance;
+ }
+
+ public Class getCallbackHandlerClass(String mechanism)
+ {
+ return _mechanismToHandlerClassMap.get(mechanism);
+ }
+
+ public String getMechanisms()
+ {
+ return _mechanisms;
+ }
+
+ private CallbackHandlerRegistry()
+ {
+ // first we register any Sasl client factories
+ DynamicSaslRegistrar.registerSaslProviders();
+ parseProperties();
+ }
+
+ private void parseProperties()
+ {
+ List<String> mechanisms = ClientConfiguration.get().getList(QpidConstants.AMQP_SECURITY_MECHANISMS);
+
+ for (String mechanism : mechanisms)
+ {
+ String className = ClientConfiguration.get().getString(QpidConstants.AMQP_SECURITY_MECHANISM_HANDLER + "_" + mechanism);
+ Class clazz = null;
+ try
+ {
+ clazz = Class.forName(className);
+ if (!AMQPCallbackHandler.class.isAssignableFrom(clazz))
+ {
+ _logger.warn("SASL provider " + clazz + " does not implement " + AMQPCallbackHandler.class +
+ ". Skipping");
+ continue;
+ }
+ _mechanismToHandlerClassMap.put(mechanism, clazz);
+ if (_mechanisms == null)
+ {
+ _mechanisms = mechanism;
+ }
+ else
+ {
+ // one time cost
+ _mechanisms = _mechanisms + " " + mechanism;
+ }
+ }
+ catch (ClassNotFoundException ex)
+ {
+ _logger.warn("Unable to load class " + className + ". Skipping that SASL provider");
+ continue;
+ }
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java
new file mode 100644
index 0000000000..958c6c4782
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security;
+
+import java.security.Security;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.security.sasl.SaslClientFactory;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.QpidConstants;
+
+public class DynamicSaslRegistrar
+{
+ private static final Logger _logger = Logger.getLogger(DynamicSaslRegistrar.class);
+
+ public static void registerSaslProviders()
+ {
+ Map<String, Class<? extends SaslClientFactory>> factories = parseProperties();
+ if (factories.size() > 0)
+ {
+ Security.addProvider(new JCAProvider(factories));
+ _logger.debug("Dynamic SASL provider added as a security provider");
+ }
+ }
+
+ private static Map<String, Class<? extends SaslClientFactory>> parseProperties()
+ {
+ List<String> mechanisms = ClientConfiguration.get().getList(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES);
+ TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister =
+ new TreeMap<String, Class<? extends SaslClientFactory>>();
+ for (String mechanism: mechanisms)
+ {
+ String className = ClientConfiguration.get().getString(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY + "_" + mechanism);
+ try
+ {
+ Class<?> clazz = Class.forName(className);
+ if (!(SaslClientFactory.class.isAssignableFrom(clazz)))
+ {
+ _logger.error("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping");
+ continue;
+ }
+ factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz);
+ }
+ catch (Exception ex)
+ {
+ _logger.error("Error instantiating SaslClientFactory calss " + className + " - skipping");
+ }
+ }
+ return factoriesToRegister;
+ }
+
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java
new file mode 100644
index 0000000000..10ccb88821
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security;
+
+import javax.security.sasl.SaslClientFactory;
+import java.security.Provider;
+import java.security.Security;
+import java.util.Map;
+
+public class JCAProvider extends Provider
+{
+ public JCAProvider(Map<String, Class<? extends SaslClientFactory>> providerMap)
+ {
+ super("AMQSASLProvider", 1.0, "A JCA provider that registers all " +
+ "AMQ SASL providers that want to be registered");
+ register(providerMap);
+ Security.addProvider(this);
+ }
+
+ private void register(Map<String, Class<? extends SaslClientFactory>> providerMap)
+ {
+ for (Map.Entry<String, Class<? extends SaslClientFactory>> me :
+ providerMap.entrySet())
+ {
+ put("SaslClientFactory." + me.getKey(), me.getValue().getName());
+ }
+ }
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java
new file mode 100644
index 0000000000..7297d07134
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security;
+
+import java.io.IOException;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+
+import org.apache.qpid.nclient.transport.ConnectionURL;
+
+public class UsernamePasswordCallbackHandler implements AMQPCallbackHandler
+{
+ private ConnectionURL _url;
+
+ public void initialise(ConnectionURL url)
+ {
+ _url = url;
+ }
+
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
+ {
+ for (int i = 0; i < callbacks.length; i++)
+ {
+ Callback cb = callbacks[i];
+ if (cb instanceof NameCallback)
+ {
+ ((NameCallback)cb).setName(_url.getUsername());
+ }
+ else if (cb instanceof PasswordCallback)
+ {
+ ((PasswordCallback)cb).setPassword((_url.getPassword()).toCharArray());
+ }
+ else
+ {
+ throw new UnsupportedCallbackException(cb);
+ }
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java
new file mode 100644
index 0000000000..9e878fb839
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java
@@ -0,0 +1,344 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.url.URLSyntaxException;
+
+import java.util.HashMap;
+import java.net.URISyntaxException;
+import java.net.URI;
+
+public class AMQPBrokerDetails implements BrokerDetails
+{
+ private String _host;
+
+ private int _port;
+
+ private String _transport;
+
+ private HashMap<String, String> _options;
+
+ public AMQPBrokerDetails()
+ {
+ _options = new HashMap<String, String>();
+ }
+
+ public AMQPBrokerDetails(String url) throws URLSyntaxException
+ {
+ this();
+ // URL should be of format tcp://host:port?option='value',option='value'
+ try
+ {
+ URI connection = new URI(url);
+
+ String transport = connection.getScheme();
+
+ // Handles some defaults to minimise changes to existing broker URLS e.g. localhost
+ if (transport != null)
+ {
+ //todo this list of valid transports should be enumerated somewhere
+ if ((!(transport.equalsIgnoreCase("vm") || transport.equalsIgnoreCase("tcp"))))
+ {
+ if (transport.equalsIgnoreCase("localhost"))
+ {
+ connection = new URI(DEFAULT_TRANSPORT + "://" + url);
+ transport = connection.getScheme();
+ }
+ else
+ {
+ if (url.charAt(transport.length()) == ':' && url.charAt(transport.length() + 1) != '/')
+ {
+ //Then most likely we have a host:port value
+ connection = new URI(DEFAULT_TRANSPORT + "://" + url);
+ transport = connection.getScheme();
+ }
+ else
+ {
+ URLHelper.parseError(0, transport.length(), "Unknown transport", url);
+ }
+ }
+ }
+ }
+ else
+ {
+ //Default the transport
+ connection = new URI(DEFAULT_TRANSPORT + "://" + url);
+ transport = connection.getScheme();
+ }
+
+ if (transport == null)
+ {
+ URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" + " In broker URL:'" + url
+ + "' Format: " + URL_FORMAT_EXAMPLE, "");
+ }
+
+ setTransport(transport);
+
+ String host = connection.getHost();
+
+ // Fix for Java 1.5
+ if (host == null)
+ {
+ host = "";
+ }
+
+ setHost(host);
+
+ int port = connection.getPort();
+
+ if (port == -1)
+ {
+ // Fix for when there is port data but it is not automatically parseable by getPort().
+ String auth = connection.getAuthority();
+
+ if (auth != null && auth.contains(":"))
+ {
+ int start = auth.indexOf(":") + 1;
+ int end = start;
+ boolean looking = true;
+ boolean found = false;
+ //Walk the authority looking for a port value.
+ while (looking)
+ {
+ try
+ {
+ end++;
+ Integer.parseInt(auth.substring(start, end));
+
+ if (end >= auth.length())
+ {
+ looking = false;
+ found = true;
+ }
+ }
+ catch (NumberFormatException nfe)
+ {
+ looking = false;
+ }
+
+ }
+ if (found)
+ {
+ setPort(Integer.parseInt(auth.substring(start, end)));
+ }
+ else
+ {
+ URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1,
+ "Illegal character in port number", connection.toString());
+ }
+
+ }
+ else
+ {
+ setPort(DEFAULT_PORT);
+ }
+ }
+ else
+ {
+ setPort(port);
+ }
+
+ String queryString = connection.getQuery();
+
+ URLHelper.parseOptions(_options, queryString);
+
+ //Fragment is #string (not used)
+ }
+ catch (URISyntaxException uris)
+ {
+ if (uris instanceof URLSyntaxException)
+ {
+ throw (URLSyntaxException) uris;
+ }
+
+ URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+ }
+ }
+
+ public AMQPBrokerDetails(String host, int port, boolean useSSL)
+ {
+ _host = host;
+ _port = port;
+
+ if (useSSL)
+ {
+ setOption(OPTIONS_SSL, "true");
+ }
+ }
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public void setHost(String _host)
+ {
+ this._host = _host;
+ }
+
+ public int getPort()
+ {
+ return _port;
+ }
+
+ public void setPort(int _port)
+ {
+ this._port = _port;
+ }
+
+ public String getTransport()
+ {
+ return _transport;
+ }
+
+ public void setTransport(String _transport)
+ {
+ this._transport = _transport;
+ }
+
+ public String getOption(String key)
+ {
+ return _options.get(key);
+ }
+
+ public void setOption(String key, String value)
+ {
+ _options.put(key, value);
+ }
+
+ public long getTimeout()
+ {
+ if (_options.containsKey(OPTIONS_CONNECT_TIMEOUT))
+ {
+ try
+ {
+ return Long.parseLong(_options.get(OPTIONS_CONNECT_TIMEOUT));
+ }
+ catch (NumberFormatException nfe)
+ {
+ //Do nothing as we will use the default below.
+ }
+ }
+
+ return BrokerDetails.DEFAULT_CONNECT_TIMEOUT;
+ }
+
+ public void setTimeout(long timeout)
+ {
+ setOption(OPTIONS_CONNECT_TIMEOUT, Long.toString(timeout));
+ }
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append(_transport);
+ sb.append("://");
+
+ if (!(_transport.equalsIgnoreCase("vm")))
+ {
+ sb.append(_host);
+ }
+
+ sb.append(':');
+ sb.append(_port);
+
+ sb.append(printOptionsURL());
+
+ return sb.toString();
+ }
+
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof BrokerDetails))
+ {
+ return false;
+ }
+
+ BrokerDetails bd = (BrokerDetails) o;
+
+ return _host.equalsIgnoreCase(bd.getHost()) && (_port == bd.getPort())
+ && _transport.equalsIgnoreCase(bd.getTransport()) && (useSSL() == bd.useSSL());
+
+ //todo do we need to compare all the options as well?
+ }
+
+ private String printOptionsURL()
+ {
+ StringBuffer optionsURL = new StringBuffer();
+
+ optionsURL.append('?');
+
+ if (!(_options.isEmpty()))
+ {
+
+ for (String key : _options.keySet())
+ {
+ optionsURL.append(key);
+
+ optionsURL.append("='");
+
+ optionsURL.append(_options.get(key));
+
+ optionsURL.append("'");
+
+ optionsURL.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+ }
+
+ //removeKey the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options
+ optionsURL.deleteCharAt(optionsURL.length() - 1);
+
+ return optionsURL.toString();
+ }
+
+ public boolean useSSL()
+ {
+ // To be friendly to users we should be case insensitive.
+ // or simply force users to conform to OPTIONS_SSL
+ // todo make case insensitive by trying ssl Ssl sSl ssL SSl SsL sSL SSL
+
+ if (_options.containsKey(OPTIONS_SSL))
+ {
+ return _options.get(OPTIONS_SSL).equalsIgnoreCase("true");
+ }
+
+ return USE_SSL_DEFAULT;
+ }
+
+ public void useSSL(boolean ssl)
+ {
+ setOption(OPTIONS_SSL, Boolean.toString(ssl));
+ }
+
+ public static String checkTransport(String broker)
+ {
+ if ((!broker.contains("://")))
+ {
+ return "tcp://" + broker;
+ }
+ else
+ {
+ return broker;
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java
new file mode 100644
index 0000000000..d0259830c6
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java
@@ -0,0 +1,412 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.url.URLSyntaxException;
+
+import java.util.*;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+public class AMQPConnectionURL implements ConnectionURL
+{
+ private String _url;
+ private String _failoverMethod;
+ private HashMap<String, String> _failoverOptions;
+ private HashMap<String, String> _options;
+ private List<BrokerDetails> _brokers;
+ private String _clientName;
+ private String _username;
+ private String _password;
+ private String _virtualHost;
+
+ public AMQPConnectionURL(String fullURL) throws URLSyntaxException
+ {
+ _url = fullURL;
+ _options = new HashMap<String, String>();
+ _brokers = new LinkedList<BrokerDetails>();
+ _failoverOptions = new HashMap<String, String>();
+
+ try
+ {
+ URI connection = new URI(fullURL);
+
+ if (connection.getScheme() == null || !(connection.getScheme().equalsIgnoreCase(AMQ_PROTOCOL)))
+ {
+ throw new URISyntaxException(fullURL, "Not an AMQP URL");
+ }
+
+ if (connection.getHost() == null || connection.getHost().equals(""))
+ {
+ String uid = getUniqueClientID();
+ if (uid == null)
+ {
+ URLHelper.parseError(-1, "Client Name not specified", fullURL);
+ }
+ else
+ {
+ setClientName(uid);
+ }
+
+ }
+ else
+ {
+ setClientName(connection.getHost());
+ }
+
+ String userInfo = connection.getUserInfo();
+
+ if (userInfo == null)
+ {
+ //Fix for Java 1.5 which doesn't parse UserInfo for non http URIs
+ userInfo = connection.getAuthority();
+
+ if (userInfo != null)
+ {
+ int atIndex = userInfo.indexOf('@');
+
+ if (atIndex != -1)
+ {
+ userInfo = userInfo.substring(0, atIndex);
+ }
+ else
+ {
+ userInfo = null;
+ }
+ }
+
+ }
+
+ if (userInfo == null)
+ {
+ URLHelper.parseError(AMQ_PROTOCOL.length() + 3,
+ "User information not found on url", fullURL);
+ }
+ else
+ {
+ parseUserInfo(userInfo);
+ }
+ String virtualHost = connection.getPath();
+
+ if (virtualHost != null && (!virtualHost.equals("")))
+ {
+ setVirtualHost(virtualHost);
+ }
+ else
+ {
+ int authLength = connection.getAuthority().length();
+ int start = AMQ_PROTOCOL.length() + 3;
+ int testIndex = start + authLength;
+ if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?')
+ {
+ URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL);
+ }
+ else
+ {
+ URLHelper.parseError(-1, "Virtual host not specified", fullURL);
+ }
+
+ }
+
+
+ URLHelper.parseOptions(_options, connection.getQuery());
+
+ processOptions();
+
+ //Fragment is #string (not used)
+ //System.out.println(connection.getFragment());
+
+ }
+ catch (URISyntaxException uris)
+ {
+ if (uris instanceof URLSyntaxException)
+ {
+ throw (URLSyntaxException) uris;
+ }
+
+ int slash = fullURL.indexOf("\\");
+
+ if (slash == -1)
+ {
+ URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+ }
+ else
+ {
+ if (slash != 0 && fullURL.charAt(slash - 1) == ':')
+ {
+ URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
+ }
+ else
+ {
+ URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL);
+ }
+ }
+
+ }
+ }
+
+ private String getUniqueClientID()
+ {
+ try
+ {
+ InetAddress addr = InetAddress.getLocalHost();
+ return addr.getHostName() + System.currentTimeMillis();
+ }
+ catch (UnknownHostException e)
+ {
+ return null;
+ }
+ }
+
+ private void parseUserInfo(String userinfo) throws URLSyntaxException
+ {
+ //user info = user:pass
+
+ int colonIndex = userinfo.indexOf(':');
+
+ if (colonIndex == -1)
+ {
+ URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(),
+ "Null password in user information not allowed.", _url);
+ }
+ else
+ {
+ setUsername(userinfo.substring(0, colonIndex));
+ setPassword(userinfo.substring(colonIndex + 1));
+ }
+
+ }
+
+ private void processOptions() throws URLSyntaxException
+ {
+ if (_options.containsKey(OPTIONS_BROKERLIST))
+ {
+ String brokerlist = _options.get(OPTIONS_BROKERLIST);
+
+ //brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'
+ StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR);
+
+ while (st.hasMoreTokens())
+ {
+ String broker = st.nextToken();
+
+ _brokers.add(new AMQPBrokerDetails(broker));
+ }
+
+ _options.remove(OPTIONS_BROKERLIST);
+ }
+
+ if (_options.containsKey(OPTIONS_FAILOVER))
+ {
+ String failover = _options.get(OPTIONS_FAILOVER);
+
+ // failover='method?option='value',option='value''
+
+ int methodIndex = failover.indexOf('?');
+
+ if (methodIndex > -1)
+ {
+ _failoverMethod = failover.substring(0, methodIndex);
+ URLHelper.parseOptions(_failoverOptions, failover.substring(methodIndex + 1));
+ }
+ else
+ {
+ _failoverMethod = failover;
+ }
+
+ _options.remove(OPTIONS_FAILOVER);
+ }
+ }
+
+ public String getURL()
+ {
+ return _url;
+ }
+
+ public String getFailoverMethod()
+ {
+ return _failoverMethod;
+ }
+
+ public String getFailoverOption(String key)
+ {
+ return _failoverOptions.get(key);
+ }
+
+ public void setFailoverOption(String key, String value)
+ {
+ _failoverOptions.put(key, value);
+ }
+
+ public int getBrokerCount()
+ {
+ return _brokers.size();
+ }
+
+ public BrokerDetails getBrokerDetails(int index)
+ {
+ if (index < _brokers.size())
+ {
+ return _brokers.get(index);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void addBrokerDetails(BrokerDetails broker)
+ {
+ if (!(_brokers.contains(broker)))
+ {
+ _brokers.add(broker);
+ }
+ }
+
+ public List<BrokerDetails> getAllBrokerDetails()
+ {
+ return _brokers;
+ }
+
+ public String getClientName()
+ {
+ return _clientName;
+ }
+
+ public void setClientName(String clientName)
+ {
+ _clientName = clientName;
+ }
+
+ public String getUsername()
+ {
+ return _username;
+ }
+
+ public void setUsername(String username)
+ {
+ _username = username;
+ }
+
+ public String getPassword()
+ {
+ return _password;
+ }
+
+ public void setPassword(String password)
+ {
+ _password = password;
+ }
+
+ public String getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public void setVirtualHost(String virtuaHost)
+ {
+ _virtualHost = virtuaHost;
+ }
+
+ public String getOption(String key)
+ {
+ return _options.get(key);
+ }
+
+ public void setOption(String key, String value)
+ {
+ _options.put(key, value);
+ }
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append(AMQ_PROTOCOL);
+ sb.append("://");
+
+ if (_username != null)
+ {
+ sb.append(_username);
+
+ if (_password != null)
+ {
+ sb.append(':');
+ sb.append(_password);
+ }
+
+ sb.append('@');
+ }
+
+ sb.append(_clientName);
+
+ sb.append(_virtualHost);
+
+ sb.append(optionsToString());
+
+ return sb.toString();
+ }
+
+ private String optionsToString()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append("?" + OPTIONS_BROKERLIST + "='");
+
+ for (BrokerDetails service : _brokers)
+ {
+ sb.append(service.toString());
+ sb.append(';');
+ }
+
+ sb.deleteCharAt(sb.length() - 1);
+ sb.append("'");
+
+ if (_failoverMethod != null)
+ {
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ sb.append(OPTIONS_FAILOVER + "='");
+ sb.append(_failoverMethod);
+ sb.append(URLHelper.printOptions(_failoverOptions));
+ sb.append("'");
+ }
+
+ return sb.toString();
+ }
+
+
+ public static void main(String[] args) throws URLSyntaxException
+ {
+
+ String url2 = "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'";
+ //"amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''";
+
+ //ConnectionURL connectionurl2 = new AMQConnectionURL(url2);
+
+ System.out.println(url2);
+ //System.out.println(connectionurl2);
+
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java
new file mode 100644
index 0000000000..fe20a1e8dd
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+public interface BrokerDetails
+{
+
+ /*
+ * Known URL Options
+ * @see ConnectionURL
+ */
+ public static final String OPTIONS_RETRY = "retries";
+ public static final String OPTIONS_SSL = ConnectionURL.OPTIONS_SSL;
+ public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
+ public static final int DEFAULT_PORT = 5672;
+
+ public static final String TCP = "tcp";
+ public static final String VM = "vm";
+
+ public static final String DEFAULT_TRANSPORT = TCP;
+
+ public static final String URL_FORMAT_EXAMPLE =
+ "<transport>://<hostname>[:<port Default=\"" + DEFAULT_PORT + "\">][?<option>='<value>'[,<option>='<value>']]";
+
+ public static final long DEFAULT_CONNECT_TIMEOUT = 30000L;
+ public static final boolean USE_SSL_DEFAULT = false;
+
+ String getHost();
+
+ void setHost(String host);
+
+ int getPort();
+
+ void setPort(int port);
+
+ String getTransport();
+
+ void setTransport(String transport);
+
+ boolean useSSL();
+
+ void useSSL(boolean ssl);
+
+ String getOption(String key);
+
+ void setOption(String key, String value);
+
+ long getTimeout();
+
+ void setTimeout(long timeout);
+
+ String toString();
+
+ boolean equals(Object o);
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java
new file mode 100644
index 0000000000..79653dc442
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+import java.util.List;
+
+/**
+ Connection URL format
+ For TCP:
+ amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\'&option=\'value\'
+
+ For VMBroker:
+ vm://:3/virtualpath?option=\'value\''&failover='method?option=\'value\'&option='value''"
+
+ Options are of course optional except for requiring a single broker in the broker list.
+ The option seperator is defined to be either '&' or ','
+ */
+public interface ConnectionURL
+{
+ public static final String AMQ_PROTOCOL = "amqp";
+ public static final String OPTIONS_BROKERLIST = "brokerlist";
+ public static final String OPTIONS_FAILOVER = "failover";
+ public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
+ public static final String OPTIONS_SSL = "ssl";
+
+ String getURL();
+
+ String getFailoverMethod();
+
+ String getFailoverOption(String key);
+
+ int getBrokerCount();
+
+ BrokerDetails getBrokerDetails(int index);
+
+ void addBrokerDetails(BrokerDetails broker);
+
+ List<BrokerDetails> getAllBrokerDetails();
+
+ String getClientName();
+
+ void setClientName(String clientName);
+
+ String getUsername();
+
+ void setUsername(String username);
+
+ String getPassword();
+
+ void setPassword(String password);
+
+ String getVirtualHost();
+
+ void setVirtualHost(String virtualHost);
+
+ String getOption(String key);
+
+ void setOption(String key, String value);
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java
new file mode 100644
index 0000000000..aae3677f8b
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java
@@ -0,0 +1,82 @@
+package org.apache.qpid.nclient.transport;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.DefaultPhaseContext;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.PhaseContext;
+import org.apache.qpid.nclient.core.PhaseFactory;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+
+public class TCPConnection implements TransportConnection
+{
+ private static final Logger _logger = Logger.getLogger(TCPConnection.class);
+ private BrokerDetails _brokerDetails;
+ private IoConnector _ioConnector;
+ private Phase _phase;
+
+ protected TCPConnection(ConnectionURL url)
+ {
+ _brokerDetails = url.getBrokerDetails(0);
+
+ ByteBuffer.setUseDirectBuffers(ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_DIRECT_BUFFERS));
+
+ // the MINA default is currently to use the pooled allocator although this may change in future
+ // once more testing of the performance of the simple allocator has been done
+ if (ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_POOLED_ALLOCATOR))
+ {
+ // Not sure what the original code wanted use :)
+ }
+ else
+ {
+ _logger.info("Using SimpleByteBufferAllocator");
+ ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+ }
+
+ final IoConnector ioConnector = new SocketConnector();
+ SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
+
+ // if we do not use our own thread model we get the MINA default which is to use
+ // its own leader-follower model
+ if (ClientConfiguration.get().getBoolean(QpidConstants.USE_SHARED_READ_WRITE_POOL))
+ {
+ cfg.setThreadModel(ReadWriteThreadModel.getInstance());
+ }
+
+ SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+ scfg.setTcpNoDelay(ClientConfiguration.get().getBoolean(QpidConstants.TCP_NO_DELAY));
+ scfg.setSendBufferSize(ClientConfiguration.get().getInt(QpidConstants.SEND_BUFFER_SIZE_IN_KB)*1024);
+ scfg.setReceiveBufferSize(ClientConfiguration.get().getInt(QpidConstants.RECEIVE_BUFFER_SIZE_IN_KB)*1024);
+ }
+
+ // Returns the phase pipe
+ public Phase connect() throws AMQPException
+ {
+ PhaseContext ctx = new DefaultPhaseContext();
+ ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
+ ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
+
+ _phase = PhaseFactory.createPhasePipe(ctx);
+ _phase.start();
+
+ return _phase;
+ }
+
+ public void close() throws AMQPException
+ {
+
+ }
+
+ public Phase getPhasePipe()
+ {
+ return _phase;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java
new file mode 100644
index 0000000000..9df353a2df
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+
+public interface TransportConnection
+{
+ public Phase connect()throws AMQPException;
+
+ public void close()throws AMQPException;
+
+ public Phase getPhasePipe();
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java
new file mode 100644
index 0000000000..5c97100bdc
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java
@@ -0,0 +1,44 @@
+package org.apache.qpid.nclient.transport;
+
+import java.net.URISyntaxException;
+
+public class TransportConnectionFactory
+{
+ public enum ConnectionType
+ {
+ TCP,VM
+ }
+
+ public static TransportConnection createTransportConnection(String url,ConnectionType type) throws URISyntaxException
+ {
+ return createTransportConnection(new AMQPConnectionURL(url),type);
+
+ }
+
+ public static TransportConnection createTransportConnection(ConnectionURL url,ConnectionType type)
+ {
+ switch (type)
+ {
+ case TCP : default:
+ {
+ return createTCPConnection(url);
+ }
+
+ case VM :
+ {
+ return createVMConnection(url);
+ }
+ }
+
+ }
+
+ private static TransportConnection createTCPConnection(ConnectionURL url)
+ {
+ return new TCPConnection(url);
+ }
+
+ private static TransportConnection createVMConnection(ConnectionURL url)
+ {
+ return null;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java
new file mode 100644
index 0000000000..b1bc7b4c8c
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java
@@ -0,0 +1,263 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
+import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+import org.apache.mina.transport.vmpipe.VmPipeConnector;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.AbstractPhase;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.ssl.BogusSSLContextFactory;
+
+/**
+ * The Transport Phase corresponds to the Layer 1 in AMQP It works at the Frame
+ * layer
+ *
+ */
+public class TransportPhase extends AbstractPhase implements IoHandler, ProtocolVersionList
+{
+
+ private static final Logger _logger = Logger
+ .getLogger(TransportPhase.class);
+
+ private IoSession _ioSession;
+ private BrokerDetails _brokerDetails;
+
+ protected WriteFuture _lastWriteFuture;
+
+ /**
+ * ------------------------------------------------
+ * Phase - methods introduced by Phase
+ * ------------------------------------------------
+ */
+
+ public void start()throws AMQPException
+ {
+ _brokerDetails = (BrokerDetails)_ctx.getProperty(QpidConstants.AMQP_BROKER_DETAILS);
+ IoConnector ioConnector = (IoConnector)_ctx.getProperty(QpidConstants.MINA_IO_CONNECTOR);
+
+ final SocketAddress address;
+ if (ioConnector instanceof VmPipeConnector)
+ {
+ address = new VmPipeAddress(_brokerDetails.getPort());
+ }
+ else
+ {
+ address = new InetSocketAddress(_brokerDetails.getHost(), _brokerDetails.getPort());
+ _logger.info("Attempting connection to " + address);
+
+ }
+
+ ConnectFuture future = ioConnector.connect(address,this);
+
+ // wait for connection to complete
+ if (future.join(_brokerDetails.getTimeout()))
+ {
+ // we call getSession which throws an IOException if there has been an error connecting
+ future.getSession();
+ }
+ else
+ {
+ throw new AMQPException("Timeout waiting for connection.");
+ }
+ }
+
+ public void messageReceived(Object frame) throws AMQPException
+ {
+ super.messageReceived(frame);
+ }
+
+ public void messageSent(Object frame) throws AMQPException
+ {
+
+ _ioSession.write(frame);
+ }
+
+ /**
+ * ------------------------------------------------
+ * IoHandler - methods introduced by IoHandler
+ * ------------------------------------------------
+ */
+ public void sessionIdle(IoSession session, IdleStatus status)
+ throws Exception
+ {
+ _logger.debug("Protocol Session [" + this + ":" + session + "] idle: "
+ + status);
+ if (IdleStatus.WRITER_IDLE.equals(status))
+ {
+ // write heartbeat frame:
+ _logger.debug("Sent heartbeat");
+ session.write(HeartbeatBody.FRAME);
+ // HeartbeatDiagnostics.sent();
+ } else if (IdleStatus.READER_IDLE.equals(status))
+ {
+ // failover:
+ // HeartbeatDiagnostics.timeout();
+ _logger.warn("Timed out while waiting for heartbeat from peer.");
+ session.close();
+ }
+ }
+
+ public void messageReceived(IoSession session, Object message)
+ throws Exception
+ {
+ AMQFrame frame = (AMQFrame) message;
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ if (bodyFrame instanceof HeartbeatBody)
+ {
+ _logger.debug("Received heartbeat");
+ } else
+ {
+ messageReceived(bodyFrame);
+ }
+ // _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+ }
+
+ public void messageSent(IoSession session, Object message) throws Exception
+ {
+ _logger.debug("Sent frame " + message);
+ }
+
+ public void exceptionCaught(IoSession session, Throwable cause)
+ throws Exception
+ {
+ // Need to handle failover
+ sessionClosed(session);
+ }
+
+ public void sessionClosed(IoSession session) throws Exception
+ {
+ // Need to handle failover
+ _logger.info("Protocol Session [" + this + "] closed");
+ }
+
+ public void sessionCreated(IoSession session) throws Exception
+ {
+ _logger.debug("Protocol session created for session "
+ + System.identityHashCode(session));
+
+ final ProtocolCodecFilter pcf = new ProtocolCodecFilter(
+ new AMQCodecFactory(false));
+
+ if (ClientConfiguration.get().getBoolean(
+ QpidConstants.USE_SHARED_READ_WRITE_POOL))
+ {
+ session.getFilterChain().addBefore("AsynchronousWriteFilter",
+ "protocolFilter", pcf);
+ } else
+ {
+ session.getFilterChain().addLast("protocolFilter", pcf);
+ }
+ // we only add the SSL filter where we have an SSL connection
+ if (_brokerDetails.useSSL())
+ {
+ // FIXME: Bogus context cannot be used in production.
+ SSLFilter sslFilter = new SSLFilter(BogusSSLContextFactory
+ .getInstance(false));
+ sslFilter.setUseClientMode(true);
+ session.getFilterChain().addBefore("protocolFilter", "ssl",
+ sslFilter);
+ }
+
+ try
+ {
+
+ ReadWriteThreadModel threadModel = ReadWriteThreadModel
+ .getInstance();
+ threadModel.getAsynchronousReadFilter().createNewJobForSession(
+ session);
+ threadModel.getAsynchronousWriteFilter().createNewJobForSession(
+ session);
+ } catch (RuntimeException e)
+ {
+ e.printStackTrace();
+ }
+
+ doAMQPConnectionNegotiation();
+ }
+
+ public void sessionOpened(IoSession session) throws Exception
+ {
+ _logger.debug("Protocol session opened for session "
+ + System.identityHashCode(session));
+ }
+
+ /**
+ * ----------------------------------------------------------
+ * Protocol related methods
+ * ----------------------------------------------------------
+ */
+ private void doAMQPConnectionNegotiation()
+ {
+ int i = pv.length - 1;
+ writeFrame(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
+ }
+
+ /**
+ * ----------------------------------------------------------
+ * Write Operations
+ * ----------------------------------------------------------
+ */
+ public void writeFrame(AMQDataBlock frame)
+ {
+ writeFrame(frame, false);
+ }
+
+ public void writeFrame(AMQDataBlock frame, boolean wait)
+ {
+ WriteFuture f = _ioSession.write(frame);
+ if (wait)
+ {
+ // fixme -- time out?
+ f.join();
+ } else
+ {
+ _lastWriteFuture = f;
+ }
+ }
+
+ /**
+ * ----------------------------------------------------------- Failover
+ * section -----------------------------------------------------------
+ */
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java
new file mode 100644
index 0000000000..c13bb873a7
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java
@@ -0,0 +1,133 @@
+package org.apache.qpid.nclient.transport;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+import org.apache.mina.transport.vmpipe.VmPipeConnector;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.DefaultPhaseContext;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.PhaseContext;
+import org.apache.qpid.nclient.core.PhaseFactory;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.pool.PoolingFilter;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
+
+public class VMConnection implements TransportConnection
+{
+ private static final Logger _logger = Logger.getLogger(VMConnection.class);
+ private BrokerDetails _brokerDetails;
+ private IoConnector _ioConnector;
+ private Phase _phase;
+
+ protected VMConnection(ConnectionURL url)
+ {
+ _brokerDetails = url.getBrokerDetails(0);
+ _ioConnector = new VmPipeConnector();
+ final IoServiceConfig cfg = _ioConnector.getDefaultConfig();
+ ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance();
+ PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executorService,
+ "AsynchronousReadFilter");
+ cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
+ PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executorService,
+ "AsynchronousWriteFilter");
+ cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
+ }
+
+ public Phase connect() throws AMQPException
+ {
+ createVMBroker();
+
+ PhaseContext ctx = new DefaultPhaseContext();
+ ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
+ ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
+
+ _phase = PhaseFactory.createPhasePipe(ctx);
+ _phase.start();
+
+ return _phase;
+
+ }
+
+ private void createVMBroker()throws AMQPException
+ {
+ _logger.info("Creating InVM Qpid.AMQP listening on port " + _brokerDetails.getPort());
+
+ VmPipeAcceptor acceptor = new VmPipeAcceptor();
+ IoServiceConfig config = acceptor.getDefaultConfig();
+ config.setThreadModel(ReadWriteThreadModel.getInstance());
+
+ IoHandlerAdapter provider = null;
+ try
+ {
+ VmPipeAddress pipe = new VmPipeAddress(_brokerDetails.getPort());
+ provider = createBrokerInstance(_brokerDetails.getPort());
+ acceptor.bind(pipe, provider);
+ _logger.info("Created InVM Qpid.AMQP listening on port " + _brokerDetails.getPort());
+ }
+ catch (IOException e)
+ {
+ _logger.error(e);
+ VmPipeAddress pipe = new VmPipeAddress(_brokerDetails.getPort());
+ acceptor.unbind(pipe);
+
+ throw new AMQPException("Error creating VM broker",e);
+ }
+ }
+
+ private IoHandlerAdapter createBrokerInstance(int port) throws AMQPException
+ {
+ String protocolProviderClass = ClientConfiguration.get().getString(QpidConstants.QPID_VM_BROKER_CLASS);
+ _logger.info("Creating Qpid protocol provider: " + protocolProviderClass);
+
+ // can't use introspection to get Provider as it is a server class.
+ // need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access.
+
+ //get correct constructor and pass in instancec ID - "port"
+ IoHandlerAdapter provider;
+ try
+ {
+ Class[] cnstr = {Integer.class};
+ Object[] params = {port};
+ provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
+ //Give the broker a second to create
+ _logger.info("Created VMBroker Instance:" + port);
+ }
+ catch (Exception e)
+ {
+ _logger.info("Unable to create InVM Qpid broker on port " + port + ". due to : " + e.getCause());
+ _logger.error(e);
+ String because;
+ if (e.getCause() == null)
+ {
+ because = e.toString();
+ }
+ else
+ {
+ because = e.getCause().toString();
+ }
+
+
+ throw new AMQPException(port, because + " Stopped InVM Qpid.AMQP creation",e);
+ }
+
+ return provider;
+ }
+
+ public void close() throws AMQPException
+ {
+
+ }
+
+ public Phase getPhasePipe()
+ {
+ return _phase;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java b/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java
new file mode 100644
index 0000000000..e161e30227
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java
@@ -0,0 +1,14 @@
+package org.apache.qpid.nclient.util;
+
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class AMQPValidator
+{
+ public static void throwExceptionOnNull(Object obj, String msg) throws AMQPException
+ {
+ if(obj == null)
+ {
+ throw new AMQPException(msg);
+ }
+ }
+}