diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-03-27 16:31:05 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-03-27 16:31:05 +0000 |
commit | 0f9044243547ded8521af0c8d0ff81d791d8048d (patch) | |
tree | 7ea962842e9af041430d3a892ea1292b22b95f39 | |
parent | c3b33ead3e0028b44020bdb02cd139c8a85f409e (diff) | |
download | qpid-python-0f9044243547ded8521af0c8d0ff81d791d8048d.tar.gz |
This is the initial checkup for the new client
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@522988 13f79535-47bb-0310-9956-ffa450edef68
55 files changed, 5262 insertions, 0 deletions
diff --git a/java/newclient/.project b/java/newclient/.project new file mode 100644 index 0000000000..0522e6b5dc --- /dev/null +++ b/java/newclient/.project @@ -0,0 +1,17 @@ +<?xml version="1.0" encoding="UTF-8"?> +<projectDescription> + <name>qpid-newclient</name> + <comment></comment> + <projects> + </projects> + <buildSpec> + <buildCommand> + <name>org.eclipse.jdt.core.javabuilder</name> + <arguments> + </arguments> + </buildCommand> + </buildSpec> + <natures> + <nature>org.eclipse.jdt.core.javanature</nature> + </natures> +</projectDescription> diff --git a/java/newclient/pom.xml b/java/newclient/pom.xml new file mode 100644 index 0000000000..d2d27f3fd7 --- /dev/null +++ b/java/newclient/pom.xml @@ -0,0 +1,188 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-newclient</artifactId> + <packaging>jar</packaging> + <version>1.0-incubating-M2-SNAPSHOT</version> + <name>Qpid New Client</name> + <url>http://cwiki.apache.org/confluence/display/qpid</url> + + <parent> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid</artifactId> + <version>1.0-incubating-M2-SNAPSHOT</version> + </parent> + + <properties> + <topDirectoryLocation>..</topDirectoryLocation> + <java.source.version>1.5</java.source.version> + <qpid.version>${pom.version}</qpid.version> + <qpid.targetDir>${project.build.directory}</qpid.targetDir> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-common</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jms_1.1_spec</artifactId> + </dependency> + + <dependency> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.mina</groupId> + <artifactId>mina-filter-ssl</artifactId> + </dependency> + + <!-- Test Dependencies --> + <dependency> <!-- for inVm Broker --> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>jmscts</groupId> + <artifactId>jmscts</artifactId> + <version>0.5-b2</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymockclassextension</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemProperties> + <property> + <name>amqj.noAutoCreateVMBroker</name> + <value>true</value> + </property> + <property> + <name>amqj.logging.level</name> + <value>${amqj.logging.level}</value> + </property> + <property> + <name>log4j.configuration</name> + <value>file:///${basedir}/src/main/java/client.log4j</value> + </property> + </systemProperties> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + + </plugins> + +<!-- The inclusion of this resource causes the build to hang. --> + <!--resources> + <resource> + <targetPath>META-INF/</targetPath> + <filtering>false</filtering> + <directory>../resources/META-INF</directory> + <includes> + <include>**</include> + </includes> + </resource> + </resources--> + + <testResources> + <testResource> + <targetPath>META-INF/</targetPath> + <filtering>false</filtering> + <directory>../resources/META-INF</directory> + <includes> + <include>**</include> + </includes> + </testResource> + <testResource> + <targetPath>src/</targetPath> + <filtering>false</filtering> + <directory>src/test/java</directory> + <includes> + <include>**/*.java</include> + </includes> + </testResource> + + <testResource> + <targetPath></targetPath> + <filtering>false</filtering> + <directory>src/main/java</directory> + <includes> + <include>client.log4j</include> + </includes> + </testResource> + </testResources> + + </build> + +</project> diff --git a/java/newclient/src/main/java/client.log4j b/java/newclient/src/main/java/client.log4j new file mode 100644 index 0000000000..525433e9a9 --- /dev/null +++ b/java/newclient/src/main/java/client.log4j @@ -0,0 +1,28 @@ +#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+log4j.rootLogger=${root.logging.level}
+
+
+log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+log4j.additivity.org.apache.qpid=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=all
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java new file mode 100644 index 0000000000..49575be8d2 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java @@ -0,0 +1,23 @@ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; + +public abstract class AMQPCallBack +{ + private boolean _isComplete = false; + + public abstract void brokerResponded(AMQMethodBody body); + + public abstract void brokerRespondedWithError(AMQException e); + + public void setIsComplete(boolean isComplete) + { + _isComplete = isComplete; + } + + public boolean isComplete() + { + return _isComplete; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java new file mode 100644 index 0000000000..890b0dd6eb --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java @@ -0,0 +1,64 @@ +package org.apache.qpid.nclient.amqp; + +import java.security.SecureRandom; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.model.AMQPMethodEvent; + +public abstract class AMQPCallBackSupport +{ + private SecureRandom _localCorrelationIdGenerator = new SecureRandom(); + protected ConcurrentHashMap<Long,AMQPCallBack> _cbMap = new ConcurrentHashMap<Long,AMQPCallBack>(); + + //the channelId assigned for this instance + protected int _channelId; + + public AMQPCallBackSupport(int channelId) + { + _channelId = channelId; + } + + private long getNextCorrelationId() + { + return _localCorrelationIdGenerator.nextLong(); + } + + + // For methods that still use nowait, hopefully they will remove nowait + protected AMQPMethodEvent handleNoWait(boolean noWait,AMQMethodBody methodBody,AMQPCallBack cb) + { + if(noWait) + { + // u only need to register if u are expecting a response + long localCorrelationId = getNextCorrelationId(); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId); + _cbMap.put(localCorrelationId, cb); + return msg; + } + else + { + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID); + return msg; + } + } + + protected AMQPMethodEvent handleAsynchronousCall(AMQMethodBody methodBody,AMQPCallBack cb) + { + long localCorrelationId = getNextCorrelationId(); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId); + _cbMap.put(localCorrelationId, cb); + return msg; + } + + protected void invokeCallBack(long localCorrelationId, AMQMethodBody methodBody) + { + if(_cbMap.contains(localCorrelationId)) + { + AMQPCallBack cb = (AMQPCallBack)_cbMap.get(localCorrelationId); + cb.brokerResponded(methodBody); + } + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java new file mode 100644 index 0000000000..d86f948e28 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java @@ -0,0 +1,284 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.nclient.amqp; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.framing.ChannelOkBody; +import org.apache.qpid.framing.ChannelOpenBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.ChannelResumeBody; +import org.apache.qpid.nclient.amqp.state.AMQPState; +import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.model.AMQPMethodEvent; +import org.apache.qpid.nclient.model.AMQPMethodListener; +import org.apache.qpid.nclient.util.AMQPValidator; + +/** + * This represents the Channel class defined in the AMQP protocol. + * This class is a finite state machine and is thread safe by design. + * Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown. + * Only one thread can enter the methods that change state, at a given time. + * The AMQP protocol recommends one thread per channel by design. + * + * A JMS Session can wrap an instance of this class. + */ + +public class AMQPChannel extends AMQPStateMachine implements AMQPMethodListener +{ +private static final Logger _logger = Logger.getLogger(AMQPChannel.class); + + //the channelId assigned for this channel + private int _channelId; + private Phase _phase; + private AMQPState _currentState; + private final AMQPState[] _validCloseStates = new AMQPState[]{AMQPState.CHANNEL_OPENED,AMQPState.CHANNEL_SUSPEND}; + private final AMQPState[] _validResumeStates = new AMQPState[]{AMQPState.CHANNEL_CLOSED,AMQPState.CHANNEL_NOT_OPENED}; + + // The wait period until a server sends a respond + private long _serverTimeOut = 1000; + private final Lock _lock = new ReentrantLock(); + private final Condition _channelNotOpend = _lock.newCondition(); + private final Condition _channelNotClosed = _lock.newCondition(); + private final Condition _channelFlowNotResponded = _lock.newCondition(); + private final Condition _channelNotResumed = _lock.newCondition(); + + private ChannelOpenOkBody _channelOpenOkBody; + private ChannelCloseOkBody _channelCloseOkBody; + private ChannelFlowOkBody _channelFlowOkBody; + private ChannelOkBody _channelOkBody; + + public AMQPChannel(int channelId) + { + _channelId = channelId; + _currentState = AMQPState.CHANNEL_NOT_OPENED; + _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); + } + + /**------------------------------------------- + * API Methods + *-------------------------------------------- + */ + + /** + * Opens the channel + */ + public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException + { + _lock.lock(); + try { + _channelOpenOkBody = null; + checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED,_currentState,AMQPState.CHANNEL_OPENED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelOpenBody,QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + _channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time"); + _currentState = AMQPState.CHANNEL_OPENED; + return _channelOpenOkBody; + } + catch(Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + /** + * Close the channel + */ + public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException + { + _lock.lock(); + try { + _channelCloseOkBody = null; + checkIfValidStateTransition(_validCloseStates,_currentState,AMQPState.CHANNEL_CLOSED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelCloseBody,QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + _channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time"); + _currentState = AMQPState.CHANNEL_CLOSED; + return _channelCloseOkBody; + } + catch(Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + /** + * Channel Flow + */ + public ChannelFlowOkBody close(ChannelFlowBody channelFlowBody) throws AMQPException + { + _lock.lock(); + try { + _channelFlowOkBody = null; + if(channelFlowBody.active) + { + checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND,_currentState,AMQPState.CHANNEL_OPENED); + } + else + { + checkIfValidStateTransition(AMQPState.CHANNEL_OPENED,_currentState,AMQPState.CHANNEL_SUSPEND); + } + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelFlowBody,QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + _channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time"); + handleChannelFlowState(_channelFlowOkBody.active); + return _channelFlowOkBody; + } + catch(Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + /** + * Close the channel + */ + public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException + { + _lock.lock(); + try { + _channelOkBody = null; + checkIfValidStateTransition(_validResumeStates,_currentState,AMQPState.CHANNEL_OPENED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelResumeBody,QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + _channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_channelOkBody, "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time"); + _currentState = AMQPState.CHANNEL_OPENED; + return _channelOkBody; + } + catch(Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + /**------------------------------------------- + * AMQPMethodListener methods + *-------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + if (evt.getMethod() instanceof ChannelOpenOkBody) + { + _channelOpenOkBody = (ChannelOpenOkBody)evt.getMethod(); + _channelNotOpend.signal(); + return true; + } + else if (evt.getMethod() instanceof ChannelCloseOkBody) + { + _channelCloseOkBody = (ChannelCloseOkBody)evt.getMethod(); + _channelNotClosed.signal(); + return true; + } + else if (evt.getMethod() instanceof ChannelCloseBody) + { + handleChannelClose((ChannelCloseBody)evt.getMethod()); + return true; + } + else if (evt.getMethod() instanceof ChannelFlowOkBody) + { + _channelFlowOkBody = (ChannelFlowOkBody)evt.getMethod(); + _channelFlowNotResponded.signal(); + return true; + } + else if (evt.getMethod() instanceof ChannelFlowBody) + { + handleChannelFlow((ChannelFlowBody)evt.getMethod()); + return true; + } + else if (evt.getMethod() instanceof ChannelOkBody) + { + _channelOkBody = (ChannelOkBody)evt.getMethod(); + //In this case the only method expecting channel-ok is channel-resume + // haven't implemented ping and pong. + _channelNotResumed.signal(); + return true; + } + else + { + return false; + } + } + + private void handleChannelClose(ChannelCloseBody channelCloseBody) + { + try + { + _lock.lock(); + _currentState = AMQPState.CHANNEL_CLOSED; + } + finally + { + _lock.unlock(); + } + } + + private void handleChannelFlow(ChannelFlowBody channelFlowBody) + { + _lock.lock(); + try + { + handleChannelFlowState(channelFlowBody.active); + } + finally + { + _lock.unlock(); + } + } + + private void handleChannelFlowState(boolean flow) + { + _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java new file mode 100644 index 0000000000..9c9e913cd3 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java @@ -0,0 +1,348 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.framing.ConnectionOpenBody; +import org.apache.qpid.framing.ConnectionOpenOkBody; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionSecureOkBody; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ConnectionStartOkBody; +import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.nclient.amqp.state.AMQPState; +import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.model.AMQPMethodEvent; +import org.apache.qpid.nclient.model.AMQPMethodListener; +import org.apache.qpid.nclient.transport.TransportConnection; +import org.apache.qpid.nclient.util.AMQPValidator; + +/** + * This maps directly to the Connection class defined in the AMQP protocol This class is a finite state machine and is + * thread safe by design A particular method (state changing) can only be invoked once and only in sequence or else an + * IllegalStateTransitionException will be thrown Also only one thread can enter those methods at a given time. + */ +public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListener +{ + private static final Logger _logger = Logger.getLogger(AMQPConnection.class); + + private Phase _phase; + + private TransportConnection _connection; + + private long _correlationId; + + private AMQPState _currentState; + + private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED, + AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, + AMQPState.CONNECTION_OPEN, }; + + // The wait period until a server sends a respond + private long _serverTimeOut = 1000; + + private final Lock _lock = new ReentrantLock(); + + private final Condition _connectionNotStarted = _lock.newCondition(); + + private final Condition _connectionNotSecure = _lock.newCondition(); + + private final Condition _connectionNotTuned = _lock.newCondition(); + + private final Condition _connectionNotOpened = _lock.newCondition(); + + private final Condition _connectionNotClosed = _lock.newCondition(); + + private ConnectionStartBody _connectionStartBody; + + private ConnectionSecureBody _connectionSecureBody; + + private ConnectionTuneBody _connectionTuneBody; + + private ConnectionOpenOkBody _connectionOpenOkBody; + + private ConnectionCloseOkBody _connectionCloseOkBody; + + public AMQPConnection(TransportConnection connection) + { + _connection = connection; + _currentState = AMQPState.CONNECTION_UNDEFINED; + _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); + } + + /** + * ------------------------------------------- API Methods -------------------------------------------- + */ + + /** + * Opens the TCP connection and let the formalities begin. + */ + public ConnectionStartBody openTCPConnection() throws AMQPException + { + _lock.lock(); + // open the TCP connection + try + { + _connectionStartBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED); + _phase = _connection.connect(); + + // waiting for ConnectionStartBody or error in connection + _connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_connectionStartBody, + "The broker didn't send the ConnectionStartBody in time"); + _currentState = AMQPState.CONNECTION_NOT_STARTED; + return _connectionStartBody; + } + catch (Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + public ConnectionSecureBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException + { + _lock.lock(); + try + { + _connectionSecureBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, + AMQPState.CONNECTION_NOT_SECURE); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId); + _phase.messageSent(msg); + _connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_connectionSecureBody, + "The broker didn't send the ConnectionSecureBody in time"); + _currentState = AMQPState.CONNECTION_NOT_SECURE; + return _connectionSecureBody; + } + catch (Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + /** + * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could + * issue a new challenge + */ + public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException + { + _lock.lock(); + try + { + _connectionTuneBody = null; + _connectionSecureBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED); + _connectionSecureBody = null; // The server could send a fresh challenge + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, + _correlationId); + _phase.messageSent(msg); + _connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS); + if (_connectionTuneBody != null) + { + _currentState = AMQPState.CONNECTION_NOT_TUNED; + return _connectionTuneBody; + } + else if (_connectionSecureBody != null) + { // oops the server sent another challenge + _currentState = AMQPState.CONNECTION_NOT_SECURE; + return _connectionSecureBody; + } + else + { + throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); + } + } + catch (Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException + { + _lock.lock(); + try + { + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED); + _connectionSecureBody = null; + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId); + _phase.messageSent(msg); + _currentState = AMQPState.CONNECTION_NOT_OPENED; + } + catch (Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException + { + _lock.lock(); + try + { + _connectionOpenOkBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, + QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + _connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, + "The broker didn't send the ConnectionOpenOkBody in time"); + _currentState = AMQPState.CONNECTION_OPEN; + return _connectionOpenOkBody; + } + catch (Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException + { + _lock.lock(); + try + { + _connectionCloseOkBody = null; + checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, + QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, + "The broker didn't send the ConnectionCloseOkBody in time"); + _currentState = AMQPState.CONNECTION_CLOSED; + return _connectionCloseOkBody; + } + catch (Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + /** + * ------------------------------------------- AMQMethodListener methods + * -------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + _correlationId = evt.getCorrelationId(); + + if (evt.getMethod() instanceof ConnectionStartBody) + { + _connectionStartBody = (ConnectionStartBody) evt.getMethod(); + _connectionNotStarted.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionSecureBody) + { + _connectionSecureBody = (ConnectionSecureBody) evt.getMethod(); + _connectionNotSecure.signal(); + _connectionNotTuned.signal(); // in case the server has sent another chanllenge + return true; + } + else if (evt.getMethod() instanceof ConnectionTuneBody) + { + _connectionTuneBody = (ConnectionTuneBody) evt.getMethod(); + _connectionNotTuned.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionOpenOkBody) + { + _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod(); + _connectionNotOpened.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionCloseOkBody) + { + _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod(); + _connectionNotClosed.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionCloseBody) + { + handleClose(); + return true; + } + else + { + return false; + } + } + + public void handleClose() throws AMQPException + { + _lock.lock(); + try + { + checkIfValidStateTransition(AMQPState.CONNECTION_OPEN, _currentState, AMQPState.CONNECTION_CLOSING); + _currentState = AMQPState.CONNECTION_CLOSING; + // do the required cleanup and send a ConnectionCloseOkBody + } + catch (Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java new file mode 100644 index 0000000000..5315f7f318 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.framing.ExchangeDeleteBody; +import org.apache.qpid.framing.ExchangeDeleteOkBody; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.model.AMQPMethodEvent; +import org.apache.qpid.nclient.model.AMQPMethodListener; + +/** + * + * This class represents the Exchange class defined in AMQP. + * Each method takes an @see AMQPCallBack object if it wants to know + * the response from the broker to particular method. + * Clients can handle the reponse asynchronously or block for a response + * using AMQPCallBack.isComplete() periodically using a loop. + */ +public class AMQPExchange extends AMQPCallBackSupport implements AMQPMethodListener +{ + private Phase _phase; + + public AMQPExchange(int channelId,Phase phase) + { + super(channelId); + _phase = phase; + } + + /** + * ----------------------------------------------- + * API Methods + * ----------------------------------------------- + */ + public void declare(ExchangeDeclareBody exchangeDeclareBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(exchangeDeclareBody.nowait,exchangeDeclareBody,cb); + _phase.messageSent(msg); + } + + public void delete(ExchangeDeleteBody exchangeDeleteBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(exchangeDeleteBody.nowait,exchangeDeleteBody,cb); + _phase.messageSent(msg); + } + + + /**------------------------------------------- + * AMQPMethodListener methods + *-------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + long localCorrelationId = evt.getLocalCorrelationId(); + AMQMethodBody methodBody = evt.getMethod(); + if ( methodBody instanceof ExchangeDeclareOkBody || methodBody instanceof ExchangeDeleteOkBody) + { + invokeCallBack(localCorrelationId,methodBody); + return true; + } + else + { + return false; + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java new file mode 100644 index 0000000000..e3ad9d6306 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java @@ -0,0 +1,232 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageEmptyBody; +import org.apache.qpid.framing.MessageGetBody; +import org.apache.qpid.framing.MessageOffsetBody; +import org.apache.qpid.framing.MessageOkBody; +import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.framing.MessageQosBody; +import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.framing.MessageRejectBody; +import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.model.AMQPMethodEvent; +import org.apache.qpid.nclient.model.AMQPMethodListener; + +/** + * This class represents the AMQP Message class. + * You need an instance of this class per channel. + * A @see AMQPMessageCallBack class is taken as an argument in the constructor. + * A client can use this class to issue Message class methods on the broker. + * When the broker issues Message class methods on the client, the client is notified + * via the AMQPMessageCallBack interface. + * + * A JMS Message producer implementation can wrap an instance if this and map + * JMS method calls to the appropriate AMQP methods. + * + * AMQPMessageCallBack can be implemented by the JMS MessageConsumer implementation. + * + */ +public class AMQPMessage extends AMQPCallBackSupport implements AMQPMethodListener +{ + private Phase _phase; + private AMQPMessageCallBack _messageCb; + + public AMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb) + { + super(channelId); + _phase = phase; + _messageCb = messageCb; + } + + /** + * ----------------------------------------------- + * API Methods + * ----------------------------------------------- + */ + + public void transfer(MessageTransferBody messageTransferBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageTransferBody,cb); + _phase.messageSent(msg); + } + + public void consume(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb); + _phase.messageSent(msg); + } + + public void cancel(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb); + _phase.messageSent(msg); + } + + public void get(MessageGetBody messageGetBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageGetBody,cb); + _phase.messageSent(msg); + } + + public void recover(MessageRecoverBody messageRecoverBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageRecoverBody,cb); + _phase.messageSent(msg); + } + + public void open(MessageOpenBody messageOpenBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageOpenBody,cb); + _phase.messageSent(msg); + } + + public void close(MessageCloseBody messageCloseBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageCloseBody,cb); + _phase.messageSent(msg); + } + + public void append(MessageAppendBody messageAppendBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageAppendBody,cb); + _phase.messageSent(msg); + } + + public void checkpoint(MessageCheckpointBody messageCheckpointBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageCheckpointBody,cb); + _phase.messageSent(msg); + } + + public void resume(MessageResumeBody messageResumeBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageResumeBody,cb); + _phase.messageSent(msg); + } + + public void qos(MessageQosBody messageQosBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageQosBody,cb); + _phase.messageSent(msg); + } + + /** + * The correlationId from the request. + * For example if a message.transfer is sent with correlationId "ABCD" + * then u need to pass that in. This correlation id is used by the execution layer + * to handle the correlation of method requests and responses + */ + public void ok(MessageOkBody messageOkBody,long correlationId) throws AMQPException + { + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOkBody,correlationId); + _phase.messageSent(msg); + } + + /** + * The correlationId from the request. + * For example if a message.transfer is sent with correlationId "ABCD" + * then u need to pass that in. This correlation id is used by the execution layer + * to handle the correlation of method requests and responses + */ + public void reject(MessageRejectBody messageRejectBody,long correlationId) throws AMQPException + { + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageRejectBody,correlationId); + _phase.messageSent(msg); + } + + /** + * The correlationId from the request. + * For example if a message.resume is sent with correlationId "ABCD" + * then u need to pass that in. This correlation id is used by the execution layer + * to handle the correlation of method requests and responses + */ + public void offset(MessageOffsetBody messageOffsetBody,long correlationId) throws AMQPException + { + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOffsetBody,correlationId); + _phase.messageSent(msg); + } + + /**------------------------------------------- + * AMQPMethodListener methods + *-------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + long localCorrelationId = evt.getLocalCorrelationId(); + AMQMethodBody methodBody = evt.getMethod(); + if ( methodBody instanceof MessageOkBody || + methodBody instanceof MessageRejectBody || + methodBody instanceof MessageEmptyBody) + { + invokeCallBack(localCorrelationId,methodBody); + return true; + } + else if (methodBody instanceof MessageTransferBody) + { + _messageCb.transfer((MessageTransferBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageAppendBody) + { + _messageCb.append((MessageAppendBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageOpenBody) + { + _messageCb.open((MessageOpenBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageCloseBody) + { + _messageCb.close((MessageCloseBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageCheckpointBody) + { + _messageCb.checkpoint((MessageCheckpointBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageRecoverBody) + { + _messageCb.recover((MessageRecoverBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageResumeBody) + { + _messageCb.resume((MessageResumeBody)methodBody, evt.getCorrelationId()); + return true; + } + else + { + return false; + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java new file mode 100644 index 0000000000..183ed9dba8 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageGetBody; +import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.nclient.core.AMQPException; + +/** + * This class also represents the AMQP Message class. + * You need an instance per channel. + * This is passed in as an argument in the constructor of an AMQPMessage instance. + * A client who implements this interface is notified When the broker issues + * Message class methods on the client. + * + * A Client should use the AMQPMessage class when it wants to issue Message class + * methods on the broker. + * + * A JMS MessageConsumer implementation can implement this interface and map + * AMQP Method notifications to the appropriate JMS methods. + * + * Simillarly a JMS MessageProducer implementation can wrap an AMQPMessage instance. + * + */ + +public interface AMQPMessageCallBack +{ + /** + * ----------------------------------------------------------------------- + * This provides Notifications for broker initiated Message class methods. + * All methods have a correlationId that u need to pass into + * the corresponding Message methods when responding to the broker. + * + * For example the correlationID passed in from Message.trasnfer + * should be passed back when u call Message.ok in AMQPMessage + * ----------------------------------------------------------------------- + */ + + + public void transfer(MessageTransferBody messageTransferBody,long correlationId) throws AMQPException; + + public void recover(MessageRecoverBody messageRecoverBody,long correlationId) throws AMQPException; + + public void open(MessageOpenBody messageOpenBody,long correlationId) throws AMQPException ; + + public void close(MessageCloseBody messageCloseBody,long correlationId) throws AMQPException; + + public void append(MessageAppendBody messageAppendBody,long correlationId) throws AMQPException; + + public void checkpoint(MessageCheckpointBody messageCheckpointBody,long correlationId) throws AMQPException; + + public void resume(MessageResumeBody messageResumeBody,long correlationId) throws AMQPException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java new file mode 100644 index 0000000000..a5fe6de298 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java @@ -0,0 +1,117 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueBindOkBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.QueuePurgeBody; +import org.apache.qpid.framing.QueuePurgeOkBody; +import org.apache.qpid.framing.QueueUnbindBody; +import org.apache.qpid.framing.QueueUnbindOkBody; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.model.AMQPMethodEvent; +import org.apache.qpid.nclient.model.AMQPMethodListener; + +/** + * + * This class represents the Queue class defined in AMQP. + * Each method takes an @see AMQPCallBack object if it wants to know + * the response from the broker to a particular method. + * Clients can handle the reponse asynchronously or block for a response + * using AMQPCallBack.isComplete() periodically using a loop. + */ +public class AMQPQueue extends AMQPCallBackSupport implements AMQPMethodListener +{ + private Phase _phase; + + public AMQPQueue(int channelId,Phase phase) + { + super(channelId); + _phase = phase; + } + + /** + * ----------------------------------------------- + * API Methods + * ----------------------------------------------- + */ + public void declare(QueueDeclareBody queueDeclareBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(queueDeclareBody.nowait,queueDeclareBody,cb); + _phase.messageSent(msg); + } + + public void bind(QueueBindBody queueBindBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(queueBindBody.nowait,queueBindBody,cb); + _phase.messageSent(msg); + } + + // Queue.unbind doesn't have nowait + public void unbind(QueueUnbindBody queueUnbindBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(queueUnbindBody,cb); + _phase.messageSent(msg); + } + + public void purge(QueuePurgeBody queuePurgeBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(queuePurgeBody.nowait,queuePurgeBody,cb); + _phase.messageSent(msg); + } + + public void delete(QueueDeleteBody queueDeleteBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(queueDeleteBody.nowait,queueDeleteBody,cb); + _phase.messageSent(msg); + } + + + /**------------------------------------------- + * AMQPMethodListener methods + *-------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + long localCorrelationId = evt.getLocalCorrelationId(); + AMQMethodBody methodBody = evt.getMethod(); + if ( methodBody instanceof QueueDeclareOkBody || + methodBody instanceof QueueBindOkBody || + methodBody instanceof QueueUnbindOkBody || + methodBody instanceof QueuePurgeOkBody || + methodBody instanceof QueueDeleteOkBody + ) + { + invokeCallBack(localCorrelationId,methodBody); + return true; + } + else + { + return false; + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java new file mode 100644 index 0000000000..38421cfca3 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.model.AMQPMethodEvent; +import org.apache.qpid.nclient.model.AMQPMethodListener; + +/** + * This class registeres with the ModelPhase as a AMQMethodListener, + * to receive method events and then it distributes methods to other listerners + * using a filtering criteria. The criteria is channel id and method body class. + * The method listeners are added and removed dynamically + * + * <p/> + */ +public class EventManager implements AMQPMethodListener +{ + private static final Logger _logger = Logger.getLogger(EventManager.class); + + private Map <Integer,Map> _channelMap = new ConcurrentHashMap<Integer,Map>(); + + /** + * ------------------------------------------------ + * methods introduced by AMQPMethodEventManager + * ------------------------------------------------ + */ + public void addMethodEventListener(int channelId,Class clazz,AMQPMethodListener l) + { + Map<Class,List> _methodListenerMap; + if (_channelMap.containsKey(channelId)) + { + _methodListenerMap = _channelMap.get(channelId); + + } + else + { + _methodListenerMap = new ConcurrentHashMap<Class,List>(); + _channelMap.put(channelId, _methodListenerMap); + } + + List<AMQPMethodListener> _listeners; + if (_methodListenerMap.containsKey(clazz)) + { + _listeners = _methodListenerMap.get(clazz); + + } + else + { + _listeners = new ArrayList<AMQPMethodListener>(); + _methodListenerMap.put(clazz, _listeners); + } + + _listeners.add(l); + + } + + public void removeMethodEventListener(int channelId,Class clazz,AMQPMethodListener l) + { + if (_channelMap.containsKey(channelId)) + { + Map<Class,List> _methodListenerMap = _channelMap.get(channelId); + + if (_methodListenerMap.containsKey(clazz)) + { + List<AMQPMethodListener> _listeners = _methodListenerMap.get(clazz); + _listeners.remove(l); + } + + } + } + + + /** + * ------------------------------------------------ + * methods introduced by AMQMethodListener + * ------------------------------------------------ + */ + /* (non-Javadoc) + * @see org.apache.qpid.nclient.model.AMQStateManager#methodReceived(org.apache.qpid.protocol.AMQMethodEvent) + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + if (_channelMap.containsKey(evt.getChannelId())) + { + Map<Class,List> _methodListenerMap = _channelMap.get(evt.getChannelId()); + + if (_methodListenerMap.containsKey(evt.getMethod().getClass())) + { + + List<AMQPMethodListener> _listeners = _methodListenerMap.get(evt.getMethod().getClass()); + for (AMQPMethodListener l:_listeners) + { + l.methodReceived(evt); + } + + return (_listeners.size()>0); + } + + } + + return false; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java new file mode 100644 index 0000000000..8c4cb56971 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java @@ -0,0 +1,54 @@ +package org.apache.qpid.nclient.amqp.sample; + +import java.io.UnsupportedEncodingException; +import java.util.HashSet; +import java.util.StringTokenizer; + +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.security.AMQPCallbackHandler; +import org.apache.qpid.nclient.security.CallbackHandlerRegistry; +import org.apache.qpid.nclient.transport.ConnectionURL; + +public class SecurityHelper +{ + public static String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException + { + final String mechanisms = new String(availableMechanisms, "utf8"); + StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); + HashSet mechanismSet = new HashSet(); + while (tokenizer.hasMoreTokens()) + { + mechanismSet.add(tokenizer.nextToken()); + } + + String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms(); + StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " "); + while (prefTokenizer.hasMoreTokens()) + { + String mech = prefTokenizer.nextToken(); + if (mechanismSet.contains(mech)) + { + return mech; + } + } + return null; + } + + public static AMQPCallbackHandler createCallbackHandler(String mechanism, ConnectionURL url) + throws AMQPException + { + Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism); + try + { + Object instance = mechanismClass.newInstance(); + AMQPCallbackHandler cbh = (AMQPCallbackHandler) instance; + cbh.initialise(url); + return cbh; + } + catch (Exception e) + { + throw new AMQPException("Unable to create callback handler: " + e, e); + } + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java new file mode 100644 index 0000000000..69d2564112 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java @@ -0,0 +1,93 @@ +package org.apache.qpid.nclient.amqp.sample; + +import java.util.StringTokenizer; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; + +import org.apache.qpid.common.ClientProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionSecureOkBody; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ConnectionStartOkBody; +import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; +import org.apache.qpid.nclient.amqp.AMQPConnection; +import org.apache.qpid.nclient.transport.AMQPConnectionURL; +import org.apache.qpid.nclient.transport.ConnectionURL; +import org.apache.qpid.nclient.transport.TransportConnection; +import org.apache.qpid.nclient.transport.TransportConnectionFactory; +import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType; + +/** + * This class illustrates the usage of the API + * Notes this is just a simple demo. + * + * I have used Helper classes to keep the code cleaner. + */ +public class TestClient +{ + private byte major; + private byte minor; + private ConnectionURL _url; + + public AMQPConnection openConnection() throws Exception + { + _url = new AMQPConnectionURL(""); + TransportConnection conn = TransportConnectionFactory.createTransportConnection(_url, ConnectionType.VM); + return new AMQPConnection(conn); + } + + public void handleProtocolNegotiation(AMQPConnection con) throws Exception + { + // ConnectionStartBody + ConnectionStartBody connectionStartBody = con.openTCPConnection(); + major = connectionStartBody.getMajor(); + minor = connectionStartBody.getMajor(); + + FieldTable clientProperties = FieldTableFactory.newFieldTable(); + clientProperties.put(new AMQShortString(ClientProperties.instance.toString()),"Test"); // setting only the client id + + final String locales = new String(connectionStartBody.getLocales(), "utf8"); + final StringTokenizer tokenizer = new StringTokenizer(locales, " "); + + final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms()); + + SaslClient sc = Sasl.createSaslClient(new String[]{mechanism}, + null, "AMQP", "localhost", + null, SecurityHelper.createCallbackHandler(mechanism,_url)); + + ConnectionStartOkBody connectionStartOkBody = + ConnectionStartOkBody.createMethodBody(major, minor, clientProperties, + new AMQShortString(tokenizer.nextToken()), + new AMQShortString(mechanism), + (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null)); + // ConnectionSecureBody + ConnectionSecureBody connectionSecureBody = con.startOk(connectionStartOkBody); + + ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody( + major,minor,sc.evaluateChallenge(connectionSecureBody.getChallenge())); + + // Assuming the server is not going to send another challenge + ConnectionTuneBody connectionTuneBody = (ConnectionTuneBody)con.secureOk(connectionSecureOkBody); + + // Using broker supplied values + ConnectionTuneOkBody connectionTuneOkBody = + ConnectionTuneOkBody.createMethodBody(major,minor, + connectionTuneBody.getChannelMax(), + connectionTuneBody.getFrameMax(), + connectionTuneBody.getHeartbeat()); + con.tuneOk(connectionTuneOkBody); + } + + public static void main(String[] args) + { + TestClient test = new TestClient(); + AMQPConnection con = test.openConnection(); + + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java new file mode 100644 index 0000000000..3555704c4f --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.state; + +/** + * States used in the AMQ protocol. Used by the finite state machine to determine + * valid responses. + */ +public class AMQPState +{ + private final int _id; + + private final String _name; + + private AMQPState(int id, String name) + { + _id = id; + _name = name; + } + + public String toString() + { + return "AMQState: id = " + _id + " name: " + _name; + } + + // Connection state + public static final AMQPState CONNECTION_UNDEFINED = new AMQPState(0, "CONNECTION_UNDEFINED"); + public static final AMQPState CONNECTION_NOT_STARTED = new AMQPState(1, "CONNECTION_NOT_STARTED"); + public static final AMQPState CONNECTION_NOT_SECURE = new AMQPState(2, "CONNECTION_NOT_SECURE"); + public static final AMQPState CONNECTION_NOT_TUNED = new AMQPState(2, "CONNECTION_NOT_TUNED"); + public static final AMQPState CONNECTION_NOT_OPENED = new AMQPState(3, "CONNECTION_NOT_OPENED"); + public static final AMQPState CONNECTION_OPEN = new AMQPState(4, "CONNECTION_OPEN"); + public static final AMQPState CONNECTION_CLOSING = new AMQPState(5, "CONNECTION_CLOSING"); + public static final AMQPState CONNECTION_CLOSED = new AMQPState(6, "CONNECTION_CLOSED"); + + // Channel state + public static final AMQPState CHANNEL_NOT_OPENED = new AMQPState(10, "CHANNEL_NOT_OPENED"); + public static final AMQPState CHANNEL_OPENED = new AMQPState(11, "CHANNEL_OPENED"); + public static final AMQPState CHANNEL_CLOSED = new AMQPState(11, "CHANNEL_CLOSED"); + public static final AMQPState CHANNEL_SUSPEND = new AMQPState(11, "CHANNEL_SUSPEND"); +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java new file mode 100644 index 0000000000..67f854caf9 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java @@ -0,0 +1,8 @@ +package org.apache.qpid.nclient.amqp.state; + +import org.apache.qpid.nclient.core.AMQPException; + +public interface AMQPStateListener +{ + public void stateChanged(AMQPState oldState, AMQPState newState) throws AMQPException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java new file mode 100644 index 0000000000..c1fde7181d --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java @@ -0,0 +1,26 @@ +package org.apache.qpid.nclient.amqp.state; + +import org.apache.qpid.nclient.core.AMQPException; + +public class AMQPStateMachine +{ + protected void checkIfValidStateTransition(AMQPState correctState,AMQPState currentState,AMQPState requiredState) throws IllegalStateTransitionException + { + if (currentState != correctState) + { + throw new IllegalStateTransitionException(currentState,requiredState); + } + } + + protected void checkIfValidStateTransition(AMQPState[] correctStates,AMQPState currentState,AMQPState requiredState) throws IllegalStateTransitionException + { + for(AMQPState correctState :correctStates) + { + if (currentState == correctState) + { + return; + } + } + throw new IllegalStateTransitionException(currentState,requiredState); + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java new file mode 100644 index 0000000000..2956a19e66 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java @@ -0,0 +1,11 @@ +package org.apache.qpid.nclient.amqp.state; + +import org.apache.qpid.AMQException; + +public interface AMQPStateManager +{ + + public void addListener(AMQPStateListener l)throws AMQException; + + public void removeListener(AMQPStateListener l)throws AMQException; +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java new file mode 100644 index 0000000000..cbae4aafee --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.state; + +/** + * The Type of States used in the AMQ protocol. + * This allows to partition listeners by the type of states they want + * to listen rather than all. + * For example an Object might only be interested in Channel state + */ +public class AMQPStateType +{ + private final int _typeId; + + private final String _typeName; + + private AMQPStateType(int id, String name) + { + _typeId = id; + _typeName = name; + } + + public String toString() + { + return "AMQState: id = " + _typeId + " name: " + _typeName; + } + + // Connection state + public static final AMQPStateType CONNECTION_STATE = new AMQPStateType(0, "CONNECTION_STATE"); + public static final AMQPStateType CHANNEL_STATE = new AMQPStateType(1, "CHANNEL_STATE"); + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java new file mode 100644 index 0000000000..fdc24d1d2f --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.state; + +import org.apache.qpid.nclient.core.AMQPException; + +public class IllegalStateTransitionException extends AMQPException +{ + private AMQPState _currentState; + private AMQPState _desiredState; + + public IllegalStateTransitionException(AMQPState currentState, AMQPState desiredState) + { + super("No valid state transition defined from state " + currentState + + " to state " + desiredState); + _currentState = currentState; + _desiredState = desiredState; + } + + public AMQPState getCurrentState() + { + return _currentState; + } + + public AMQPState getDesiredState() + { + return _desiredState; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java new file mode 100644 index 0000000000..1c3bb788a0 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java @@ -0,0 +1,84 @@ +package org.apache.qpid.nclient.config; + +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.configuration.CombinedConfiguration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.SystemConfiguration; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.log4j.Logger; +import org.apache.qpid.nclient.core.QpidConstants; + +/** + * Loads a properties file from classpath. + * These values can be overwritten using system properties + */ +public class ClientConfiguration extends CombinedConfiguration { + + private static final Logger _logger = Logger.getLogger(ClientConfiguration.class); + private static ClientConfiguration _instance = new ClientConfiguration(); + + ClientConfiguration() + { + super(); + addConfiguration(new SystemConfiguration()); + try + { + XMLConfiguration config = new XMLConfiguration(); + config.load(getInputStream()); + addConfiguration(config); + } + catch (ConfigurationException e) + { + _logger.warn("Client Properties missing, using defaults",e); + } + } + + public static ClientConfiguration get() + { + return _instance; + } + + private InputStream getInputStream() + { + if (System.getProperty(QpidConstants.CONFIG_FILE_PATH) != null) + { + try + { + return new FileInputStream((String)System.getProperty(QpidConstants.CONFIG_FILE_PATH)); + } + catch(Exception e) + { + return this.getClass().getResourceAsStream("client.xml"); + } + } + else + { + return this.getClass().getResourceAsStream("client.xml"); + } + + } + + public static void main(String[] args) + { + System.out.println(ClientConfiguration.get().getString(QpidConstants.USE_SHARED_READ_WRITE_POOL)); + + //System.out.println(ClientConfiguration.get().getString("methodListeners.methodListener(1).[@class]")); + int count = ClientConfiguration.get().getMaxIndex(QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER); + System.out.println(count); + + for(int i=0 ;i<count;i++) + { + String methodListener = QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER + "(" + i + ")"; + System.out.println("\n\n"+ClientConfiguration.get().getString(methodListener + QpidConstants.CLASS)); + List<String> list = ClientConfiguration.get().getList(methodListener + "." + QpidConstants.METHOD_CLASS); + for(String s:list) + { + System.out.println(s); + } + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml b/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml new file mode 100644 index 0000000000..587271acd1 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml @@ -0,0 +1,86 @@ +<?xml version="1.0" encoding="ISO-8859-1" ?> +<qpidClientConfig> + +<security> + <saslClientFactoryTypes> + <saslClientFactory type="AMQPLAIN">org.apache.qpid.client.security.amqplain.AmqPlainSaslClientFactory</saslClientFactory> + </saslClientFactoryTypes> + <securityMechanisms> + <securityMechanismHandler type="PLAIN">org.apache.qpid.client.security.UsernamePasswordCallbackHandler</securityMechanismHandler> + <securityMechanismHandler type="CRAM_MD5">org.apache.qpid.client.security.UsernamePasswordCallbackHandler</securityMechanismHandler> + </securityMechanisms> +</security> + +<!-- Transport Layer properties --> +<useSharedReadWritePool>true</useSharedReadWritePool> +<enableDirectBuffers>true</enableDirectBuffers> +<enablePooledAllocator>false</enablePooledAllocator> +<tcpNoDelay>true</tcpNoDelay> +<sendBufferSizeInKb>32</sendBufferSizeInKb> +<reciveBufferSizeInKb>32</reciveBufferSizeInKb> +<qpidVMBrokerClass>org.apache.qpid.server.protocol.AMQPFastProtocolHandler</qpidVMBrokerClass> + +<!-- Execution Layer properties --> +<maxAccumilatedResponses>20</maxAccumilatedResponses> + +<!-- Model Phase properties --> +<serverTimeoutInMilliSeconds>1000</serverTimeoutInMilliSeconds> +<maxAccumilatedResponses>20</maxAccumilatedResponses> +<stateManager></stateManager> +<stateListerners> + <stateType class="org.apache.qpid.nclient.model.state.AMQPStateType.CONNECTION_STATE"> + <stateListerner></stateListerner> + </stateType> + <stateType class="org.apache.qpid.nclient.model.state.AMQPStateType.CHANNEL_STATE"> + <stateListerner></stateListerner> + </stateType> +</stateListerners> + +<methodListeners> + <methodListener class="org.apache.qpid.nclient.amqp.AMQPConnection"> + <methodClass>org.apache.qpid.framing.ConnectionStartBody</methodClass> + <methodClass>org.apache.qpid.framing.ConnectionSecureBody</methodClass> + <methodClass>org.apache.qpid.framing.ConnectionTuneBody</methodClass> + <methodClass>org.apache.qpid.framing.ConnectionOpenOkBody</methodClass> + <methodClass>org.apache.qpid.framing.ConnectionCloseBody</methodClass> + <methodClass>org.apache.qpid.framing.ConnectionCloseOkBody</methodClass> + + <methodClass>org.apache.qpid.framing.ChannelOpenOkBody</methodClass> + <methodClass>org.apache.qpid.framing.ChannelCloseBody</methodClass> + <methodClass>org.apache.qpid.framing.ChannelCloseOkBody</methodClass> + <methodClass>org.apache.qpid.framing.ChannelFlowBody</methodClass> + <methodClass>org.apache.qpid.framing.ChannelFlowOkBody</methodClass> + <methodClass>org.apache.qpid.framing.ChannelOkBody</methodClass> + + <methodClass>org.apache.qpid.framing.ExchangeDeclareOkBody</methodClass> + <methodClass>org.apache.qpid.framing.ExchangeDeleteOkBody</methodClass> + + <methodClass>org.apache.qpid.framing.QueueDeclareOkBody</methodClass> + <methodClass>org.apache.qpid.framing.QueueBindOkBody</methodClass> + <methodClass>org.apache.qpid.framing.QueueUnbindOkBody</methodClass> + <methodClass>org.apache.qpid.framing.QueuePurgeOkBody</methodClass> + <methodClass>org.apache.qpid.framing.QueueDeleteOkBody</methodClass> + + <methodClass>org.apache.qpid.framing.MessageAppendBody</methodClass> + <methodClass>org.apache.qpid.framing.MessageCancelBody</methodClass> + <methodClass>org.apache.qpid.framing.MessageCheckpointBody</methodClass> + <methodClass>org.apache.qpid.framing.MessageCloseBody</methodClass> + <methodClass>org.apache.qpid.framing.MessageGetBody</methodClass> + <methodClass>org.apache.qpid.framing.MessageOffsetBody</methodClass> + <methodClass>org.apache.qpid.framing.MessageOkBody</methodClass> + <methodClass>org.apache.qpid.framing.MessageOpenBody</methodClass> + <methodClass>org.apache.qpid.framing.MessageQosBody</methodClass> + <methodClass>org.apache.qpid.framing.MessageRecoverBody</methodClass> + <methodClass>org.apache.qpid.framing.MessageRejectBody</methodClass> + <methodClass>org.apache.qpid.framing.MessageResumeBody</methodClass> + <methodClass>org.apache.qpid.framing.MessageTransferBody</methodClass> + </methodListener> +</methodListeners> + +<phasePipe> + <phase index="0">org.apache.qpid.nclient.transport.TransportPhase<phase> + <phase index="1">org.apache.qpid.nclient.execution.ExecutionPhase<phase> + <phase index="2">org.apache.qpid.nclient.model.ModelPhase<phase> +</phasePipe> + +</qpidClientConfig>
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java new file mode 100644 index 0000000000..a029f7d4ff --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java @@ -0,0 +1,55 @@ +package org.apache.qpid.nclient.core; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; + + +public class AMQPException extends Exception +{ + private int _errorCode; + + public AMQPException(String message) + { + super(message); + } + + public AMQPException(String msg, Throwable t) + { + super(msg, t); + } + + public AMQPException(int errorCode, String msg, Throwable t) + { + super(msg + " [error code " + errorCode + ']', t); + _errorCode = errorCode; + } + + public AMQPException(int errorCode, String msg) + { + super(msg + " [error code " + errorCode + ']'); + _errorCode = errorCode; + } + + public AMQPException(Logger logger, String msg, Throwable t) + { + this(msg, t); + logger.error(getMessage(), this); + } + + public AMQPException(Logger logger, String msg) + { + this(msg); + logger.error(getMessage(), this); + } + + public AMQPException(Logger logger, int errorCode, String msg) + { + this(errorCode, msg); + logger.error(getMessage(), this); + } + + public int getErrorCode() + { + return _errorCode; + } + } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java new file mode 100644 index 0000000000..bf6a19b920 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.core; + +public abstract class AbstractPhase implements Phase { + + protected PhaseContext _ctx; + protected Phase _nextInFlowPhase; + protected Phase _nextOutFlowPhase; + + + /** + * ------------------------------------------------ + * Phase - method introduced by Phase + * ------------------------------------------------ + */ + public void init(PhaseContext ctx,Phase nextInFlowPhase, Phase nextOutFlowPhase) { + _nextInFlowPhase = nextInFlowPhase; + _nextOutFlowPhase = nextOutFlowPhase; + _ctx = ctx; + } + + /** + * The start is called from the top + * of the pipe and is propogated the + * bottom. + * + * Each phase can override this to do + * any phase specific logic related + * pipe.start() + */ + public void start()throws AMQPException + { + if(_nextOutFlowPhase != null) + { + _nextOutFlowPhase.start(); + } + } + + /** + * Each phase can override this to do + * any phase specific cleanup + */ + public void close()throws AMQPException + { + + } + + public void messageReceived(Object frame) throws AMQPException + { + if(_nextInFlowPhase != null) + { + _nextInFlowPhase.messageReceived(frame); + } + } + + public void messageSent(Object frame) throws AMQPException + { + if (_nextOutFlowPhase != null) + { + _nextOutFlowPhase.messageSent(frame); + } + } + + public PhaseContext getPhaseContext() + { + return _ctx; + } + + public Phase getNextInFlowPhase() { + return _nextInFlowPhase; + } + + public Phase getNextOutFlowPhase() { + return _nextOutFlowPhase; + } + + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java new file mode 100644 index 0000000000..a3455cdacd --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java @@ -0,0 +1,20 @@ +package org.apache.qpid.nclient.core; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class DefaultPhaseContext implements PhaseContext +{ + public Map<String,Object> _props = new ConcurrentHashMap<String,Object>(); + + public Object getProperty(String name) + { + return _props.get(name); + } + + public void setProperty(String name, Object value) + { + _props.put(name, value); + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java new file mode 100644 index 0000000000..7aa10b77ff --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java @@ -0,0 +1,38 @@ +package org.apache.qpid.nclient.core; + + +public interface Phase +{ + + /** + * This method is used to initialize a phase + * + * @param ctx + * @param nextInFlowPhase + * @param nextOutFlowPhase + */ + public void init(PhaseContext ctx,Phase nextInFlowPhase, Phase nextOutFlowPhase); + + /** + * + * Implement logic related to physical opening + * of the pipe + */ + public void start()throws AMQPException; + + /** + * Implement cleanup in this method. + * This indicates the pipe is closing + */ + public void close()throws AMQPException; + + public void messageReceived(Object msg) throws AMQPException; + + public void messageSent(Object msg) throws AMQPException; + + public PhaseContext getPhaseContext(); + + public Phase getNextOutFlowPhase(); + + public Phase getNextInFlowPhase(); +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java new file mode 100644 index 0000000000..d5942fd785 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.core; + +/** + * This can be thought of as a session context associated + * with the pipe. This is transient and is scoped by the + * duration of the physical connection. + * + */ +public interface PhaseContext { + + public Object getProperty(String name); + + public void setProperty(String name, Object value); + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java new file mode 100644 index 0000000000..21602fec8e --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java @@ -0,0 +1,60 @@ +package org.apache.qpid.nclient.core; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.nclient.config.ClientConfiguration; + +public class PhaseFactory +{ + /** + * This method will create the pipe and return a reference + * to the top of the pipeline. + * + * The application can then use this (top most) phase and all + * calls will propogated down the pipe. + * + * Simillar calls orginating at the bottom of the pipeline + * will be propogated to the top. + * + * @param ctx + * @return + * @throws AMQPException + */ + public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException + { + Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>(); + List<String> list = ClientConfiguration.get().getList(QpidConstants.PHASE_PIPE + "." + QpidConstants.PHASE); + for(String s:list) + { + try + { + Phase temp = (Phase)Class.forName(ClientConfiguration.get().getString(s)).newInstance(); + phaseMap.put(ClientConfiguration.get().getInt(s + "." + QpidConstants.INDEX),temp) ; + } + catch(Exception e) + { + throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s),e); + } + } + + Phase current = null; + Phase prev = null; + Phase next = null; + //Lets build the phase pipe. + for (int i=0; i<phaseMap.size();i++) + { + current = phaseMap.get(i); + if (1+1 < phaseMap.size()) + { + next = phaseMap.get(i+1); + } + current.init(ctx, next, prev); + prev = current; + next = null; + } + + return current; + } +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java new file mode 100644 index 0000000000..1a949b7270 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java @@ -0,0 +1,66 @@ +package org.apache.qpid.nclient.core; + +public interface QpidConstants { + + // Common properties + public static long EMPTY_CORRELATION_ID = -1; + public static int CHANNEL_ZERO = 0; + public static String CONFIG_FILE_PATH = "ConfigFilePath"; + + // Phase Context properties + public final static String AMQP_BROKER_DETAILS = "AMQP_BROKER_DETAILS"; + public final static String MINA_IO_CONNECTOR = "MINA_IO_CONNECTOR"; + //public final static String AMQP_MAJOR_VERSION = "AMQP_MAJOR_VERSION"; + //public final static String AMQP_MINOR_VERSION = "AMQP_MINOR_VERSION"; + //public final static String AMQP_SASL_CLIENT = "AMQP_SASL_CLIENT"; + //public final static String AMQP_CLIENT_ID = "AMQP_CLIENT_ID"; + //public final static String AMQP_CONNECTION_TUNE_PARAMETERS = "AMQP_CONNECTION_TUNE_PARAMETERS"; + //public final static String AMQP_VIRTUAL_HOST = "AMQP_VIRTUAL_HOST"; + //public final static String AMQP_MESSAGE_STORE = "AMQP_MESSAGE_STORE"; + + /**--------------------------------------------------------------- + * Configuration file properties + * ------------------------------------------------------------ + */ + + // Model Layer properties + public final static String STATE_MANAGER = "stateManager"; + public final static String METHOD_LISTENERS = "methodListeners"; + public final static String METHOD_LISTENER = "methodListener"; + public final static String CLASS = "[@class]"; + public final static String METHOD_CLASS = "methodClass"; + + public final static String STATE_LISTENERS = "stateListeners"; + public final static String STATE_LISTENER = "stateListener"; + public final static String STATE_TYPE = "stateType"; + + public final static String AMQP_MESSAGE_STORE_CLASS = "AMQP_MESSAGE_STORE_CLASS"; + public final static String SERVER_TIMEOUT_IN_MILLISECONDS = "serverTimeoutInMilliSeconds"; + + // MINA properties + public final static String USE_SHARED_READ_WRITE_POOL = "useSharedReadWritePool"; + public final static String ENABLE_DIRECT_BUFFERS = "enableDirectBuffers"; + public final static String ENABLE_POOLED_ALLOCATOR = "enablePooledAllocator"; + public final static String TCP_NO_DELAY = "tcpNoDelay"; + public final static String SEND_BUFFER_SIZE_IN_KB = "sendBufferSizeInKb"; + public final static String RECEIVE_BUFFER_SIZE_IN_KB = "reciveBufferSizeInKb"; + + // Security properties + public final static String AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES = "saslClientFactoryTypes"; + public final static String AMQP_SECURITY_SASL_CLIENT_FACTORY = "saslClientFactory"; + public final static String TYPE = "[@type]"; + + public final static String AMQP_SECURITY_MECHANISMS = "securityMechanisms"; + public final static String AMQP_SECURITY_MECHANISM_HANDLER = "securityMechanismHandler"; + + // Execution Layer properties + public final static String MAX_ACCUMILATED_RESPONSES = "maxAccumilatedResponses"; + + //Transport Layer properties + public final static String QPID_VM_BROKER_CLASS = "qpidVMBrokerClass"; + + //Phase pipe properties + public final static String PHASE_PIPE = "phasePipe"; + public final static String PHASE = "phase"; + public final static String INDEX = "[@index]"; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java new file mode 100644 index 0000000000..cc1503d414 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java @@ -0,0 +1,19 @@ +package org.apache.qpid.nclient.core; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class TransientPhaseContext implements PhaseContext { + + private Map map = new ConcurrentHashMap(); + + public Object getProperty(String name) { + return map.get(name); + } + + public void setProperty(String name, Object value) { + map.put(name, value); + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java new file mode 100644 index 0000000000..8db955afbc --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java @@ -0,0 +1,151 @@ +package org.apache.qpid.nclient.execution; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQRequestBody; +import org.apache.qpid.framing.AMQResponseBody; +import org.apache.qpid.framing.RequestResponseMappingException; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.AbstractPhase; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.model.AMQPMethodEvent; +import org.apache.qpid.protocol.AMQMethodEvent; + +/** + * Corressponds to the Layer 2 in AMQP. + * This phase handles the correlation of amqp messages + * This class implements the 0.9 spec (request/response) + */ +public class ExecutionPhase extends AbstractPhase{ + + protected static final Logger _logger = Logger.getLogger(ExecutionPhase.class); + protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap(); + protected ConcurrentMap _channelId2ResponseMgrMap = new ConcurrentHashMap(); + + + /** + * -------------------------------------------------- + * Phase related methods + * -------------------------------------------------- + */ + + // should add these in the init method + //_channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false)); + //_channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false)); + + public void messageReceived(Object msg) throws AMQPException + { + AMQFrame frame = (AMQFrame) msg; + final AMQBody bodyFrame = frame.getBodyFrame(); + + if (bodyFrame instanceof AMQRequestBody) + { + AMQPMethodEvent event; + try + { + event = messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody)bodyFrame); + super.messageReceived(event); + } + catch (Exception e) + { + _logger.error("Error handling request",e); + } + + } + else if (bodyFrame instanceof AMQResponseBody) + { + List<AMQPMethodEvent> events; + try + { + events = messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody)bodyFrame); + for (AMQPMethodEvent event: events) + { + super.messageReceived(event); + } + } + catch (Exception e) + { + _logger.error("Error handling response",e); + } + } + } + + /** + * Need to figure out if the message is a request or a response + * that needs to be sent and then delegate it to the Request or response manager + * to prepare it. + */ + public void messageSent(Object msg) throws AMQPException + { + AMQPMethodEvent evt = (AMQPMethodEvent)msg; + if(evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID) + { + // This is a request + AMQFrame frame = handleRequest(evt); + super.messageSent(frame); + } + else + { +// This is a response + List<AMQFrame> frames = handleResponse(evt); + for(AMQFrame frame: frames) + { + super.messageSent(frame); + } + } + } + + /** + * ------------------------------------------------ + * Methods to handle request response + * ----------------------------------------------- + */ + private AMQPMethodEvent messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Request frame received: " + requestBody); + } + ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(channelId); + if (responseManager == null) + throw new AMQException("Unable to find ResponseManager for channel " + channelId); + return responseManager.requestReceived(requestBody); + } + + private List<AMQPMethodEvent> messageResponseBodyReceived(int channelId, AMQResponseBody responseBody) throws Exception + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Response frame received: " + responseBody); + } + RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(channelId); + if (requestManager == null) + throw new AMQException("Unable to find RequestManager for channel " + channelId); + return requestManager.responseReceived(responseBody); + } + + private AMQFrame handleRequest(AMQPMethodEvent evt) + { + RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(evt.getChannelId()); + return requestManager.sendRequest(evt); + } + + private List<AMQFrame> handleResponse(AMQPMethodEvent evt) throws AMQPException + { + ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(evt.getChannelId()); + try + { + return responseManager.sendResponse(evt); + } + catch(Exception e) + { + throw new AMQPException("Error handling response",e); + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java new file mode 100644 index 0000000000..761ec1b050 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.execution; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQRequestBody; +import org.apache.qpid.framing.AMQResponseBody; +import org.apache.qpid.framing.RequestResponseMappingException; +import org.apache.qpid.nclient.model.AMQPMethodEvent; + +public class RequestManager +{ + private static final Logger logger = Logger.getLogger(RequestManager.class); + + private int channel; + + /** + * Used for logging and debugging only - allows the context of this instance + * to be known. + */ + private boolean serverFlag; + private long connectionId; + + /** + * Request and response frames must have a requestID and responseID which + * indepenedently increment from 0 on a per-channel basis. These are the + * counters, and contain the value of the next (not yet used) frame. + */ + private long requestIdCount; + + /** + * These keep track of the last requestId and responseId to be received. + */ + private long lastProcessedResponseId; + + private ConcurrentHashMap<Long, Long> requestSentMap; + + public RequestManager(long connectionId, int channel, boolean serverFlag) + { + this.channel = channel; + this.serverFlag = serverFlag; + this.connectionId = connectionId; + requestIdCount = 1L; + lastProcessedResponseId = 0L; + requestSentMap = new ConcurrentHashMap<Long, Long>(); + } + + // *** Functions to originate a request *** + + public AMQFrame sendRequest(AMQPMethodEvent evt) + { + long requestId = getNextRequestId(); // Get new request ID + AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId, + lastProcessedResponseId, evt.getMethod()); + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + + "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + evt.getMethod()); + } + requestSentMap.put(requestId, evt.getCorrelationId()); + return requestFrame; + } + + public List<AMQPMethodEvent> responseReceived(AMQResponseBody responseBody) + throws Exception + { + long requestIdStart = responseBody.getRequestId(); + long requestIdStop = requestIdStart + responseBody.getBatchOffset(); + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX RES: " + + responseBody + "; " + responseBody.getMethodPayload()); + } + + List<AMQPMethodEvent> events = new ArrayList<AMQPMethodEvent>(); + for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++) + { + if (requestSentMap.get(requestId) == null) + { + throw new RequestResponseMappingException(requestId, + "Failed to locate requestId " + requestId + " in requestSentMap."); + } + long localCorrelationId = requestSentMap.get(requestId); + AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel, responseBody.getMethodPayload(), + requestId,localCorrelationId); + events.add(methodEvent); + requestSentMap.remove(requestId); + } + lastProcessedResponseId = responseBody.getResponseId(); + return events; + } + + // *** Management functions *** + + public int requestsMapSize() + { + return requestSentMap.size(); + } + + // *** Private helper functions *** + + private long getNextRequestId() + { + return requestIdCount++; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java new file mode 100644 index 0000000000..97d9576a4e --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java @@ -0,0 +1,240 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.execution; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQRequestBody; +import org.apache.qpid.framing.AMQResponseBody; +import org.apache.qpid.framing.RequestResponseMappingException; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.model.AMQPMethodEvent; + +public class ResponseManager +{ + private static final Logger logger = Logger.getLogger(ResponseManager.class); + + private int channel; + + /** + * Used for logging and debugging only - allows the context of this instance + * to be known. + */ + private boolean serverFlag; + private long connectionId; + + private int maxAccumulatedResponses = 20; // Default + + /** + * Request and response frames must have a requestID and responseID which + * indepenedently increment from 0 on a per-channel basis. These are the + * counters, and contain the value of the next (not yet used) frame. + */ + private long responseIdCount; + + /** + * These keep track of the last requestId and responseId to be received. + */ + private long lastReceivedRequestId; + + /** + * Last requestID sent in a response (for batching) + */ + private long lastSentRequestId; + + private class ResponseStatus implements Comparable<ResponseStatus> + { + private long requestId; + private AMQMethodBody responseMethodBody; + + public ResponseStatus(long requestId) + { + this.requestId = requestId; + responseMethodBody = null; + } + + public int compareTo(ResponseStatus o) + { + return (int)(requestId - o.requestId); + } + + public String toString() + { + // Need to define this + return ""; + } + } + + private ConcurrentHashMap<Long, ResponseStatus> responseMap; + + public ResponseManager(long connectionId, int channel, boolean serverFlag) + { + this.channel = channel; + this.serverFlag = serverFlag; + this.connectionId = connectionId; + responseIdCount = 1L; + lastReceivedRequestId = 0L; + maxAccumulatedResponses = ClientConfiguration.get().getInt(QpidConstants.MAX_ACCUMILATED_RESPONSES); + responseMap = new ConcurrentHashMap<Long, ResponseStatus>(); + } + + // *** Functions to handle an incoming request *** + + public AMQPMethodEvent requestReceived(AMQRequestBody requestBody) throws Exception + { + long requestId = requestBody.getRequestId(); + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX REQ: " + + requestBody + "; " + requestBody.getMethodPayload()); + } + long responseMark = requestBody.getResponseMark(); + lastReceivedRequestId = requestId; + responseMap.put(requestId, new ResponseStatus(requestId)); + + AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel, + requestBody.getMethodPayload(), requestId); + + return methodEvent; + } + + public List<AMQFrame> sendResponse(AMQPMethodEvent evt) + throws RequestResponseMappingException + { + long requestId = evt.getCorrelationId(); + AMQMethodBody responseMethodBody = evt.getMethod(); + + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + + "] TX RES: Res[# " + requestId + "]; " + responseMethodBody); + } + + ResponseStatus responseStatus = responseMap.get(requestId); + if (responseStatus == null) + { + throw new RequestResponseMappingException(requestId, + "Failed to locate requestId " + requestId + " in responseMap." + responseMap); + } + if (responseStatus.responseMethodBody != null) + { + throw new RequestResponseMappingException(requestId, "RequestId " + + requestId + " already has a response in responseMap."); + } + responseStatus.responseMethodBody = responseMethodBody; + return doBatches(); + } + + // *** Management functions *** + + /** + * Sends batched responses - i.e. all those members of responseMap that have + * received a response. + */ + public synchronized List<AMQFrame> doBatches() + { + long startRequestId = 0; + int numAdditionalRequestIds = 0; + Class responseMethodBodyClass = null; + List<AMQFrame> frames = new ArrayList<AMQFrame>(); + Iterator<Long> lItr = responseMap.keySet().iterator(); + while (lItr.hasNext()) + { + long requestId = lItr.next(); + ResponseStatus responseStatus = responseMap.get(requestId); + if (responseStatus.responseMethodBody != null) + { + frames.add(sendResponseBatchFrame(requestId, 0, responseStatus.responseMethodBody)); + lItr.remove(); + } + } + + return frames; + } + + /** + * Total number of entries in the responseMap - including both those that + * are outstanding (i.e. no response has been received) and those that are + * batched (those for which responses have been received but have not yet + * been collected together and sent). + */ + public int responsesMapSize() + { + return responseMap.size(); + } + + /** + * As the responseMap may contain both outstanding responses (those with + * ResponseStatus.responseMethodBody still null) and responses waiting to + * be batched (those with ResponseStatus.responseMethodBody not null), we + * need to count only those in the map with responseMethodBody null. + */ + public int outstandingResponses() + { + int cnt = 0; + for (Long requestId : responseMap.keySet()) + { + if (responseMap.get(requestId).responseMethodBody == null) + cnt++; + } + return cnt; + } + + /** + * As the responseMap may contain both outstanding responses (those with + * ResponseStatus.responseMethodBody still null) and responses waiting to + * be batched (those with ResponseStatus.responseMethodBody not null), we + * need to count only those in the map with responseMethodBody not null. + */ + public int batchedResponses() + { + int cnt = 0; + for (Long requestId : responseMap.keySet()) + { + if (responseMap.get(requestId).responseMethodBody != null) + cnt++; + } + return cnt; + } + + // *** Private helper functions *** + + private long getNextResponseId() + { + return responseIdCount++; + } + + private AMQFrame sendResponseBatchFrame(long firstRequestId, int numAdditionalRequests, + AMQMethodBody responseMethodBody) + { + long responseId = getNextResponseId(); // Get new response ID + AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId, + firstRequestId, numAdditionalRequests, responseMethodBody); + return responseFrame; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java new file mode 100644 index 0000000000..e7b762b77a --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.nclient.message; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.qpid.client.message.MessageHeaders; + +public class AMQPApplicationMessage { + + private int bytesReceived = 0; + private int channelId; + private byte[] referenceId; + private List<byte[]> contents = new LinkedList<byte[]>(); + private long deliveryTag; + private boolean redeliveredFlag; + private MessageHeaders messageHeaders; + + public AMQPApplicationMessage(int channelId, byte[] referenceId) + { + this.channelId = channelId; + this.referenceId = referenceId; + } + + public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, boolean redeliveredFlag) + { + this.channelId = channelId; + this.deliveryTag = deliveryTag; + this.messageHeaders = messageHeaders; + this.redeliveredFlag = redeliveredFlag; + } + + public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, byte[] content, boolean redeliveredFlag) + { + this.channelId = channelId; + this.deliveryTag = deliveryTag; + this.messageHeaders = messageHeaders; + this.redeliveredFlag = redeliveredFlag; + addContent(content); + } + + public void addContent(byte[] content) + { + contents.add(content); + bytesReceived += content.length; + } + + public int getBytesReceived() + { + return bytesReceived; + } + + public int getChannelId() + { + return channelId; + } + + public byte[] getReferenceId() + { + return referenceId; + } + + public List<byte[]> getContents() + { + return contents; + } + + public long getDeliveryTag() + { + return deliveryTag; + } + + public boolean getRedeliveredFlag() + { + return redeliveredFlag; + } + + public MessageHeaders getMessageHeaders() + { + return messageHeaders; + } + + public String toString() + { + return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" + bytesReceived + "; deliveryTag=" + + deliveryTag + "; MsgHdrs=" + messageHeaders + "Num contents=" + contents.size() + "; First content=" + + new String(contents.get(0)); + } + + public void setDeliveryTag(long deliveryTag) + { + this.deliveryTag = deliveryTag; + } + + public void setMessageHeaders(MessageHeaders messageHeaders) + { + this.messageHeaders = messageHeaders; + } + + public void setRedeliveredFlag(boolean redeliveredFlag) + { + this.redeliveredFlag = redeliveredFlag; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java new file mode 100644 index 0000000000..53a2142718 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.message; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.AbstractPhase; +import org.apache.qpid.nclient.model.ModelPhase; + +public class MessagePhase extends AbstractPhase { + + private final BlockingQueue<AMQPApplicationMessage> _queue = new LinkedBlockingQueue<AMQPApplicationMessage>(); + private static final Logger _logger = Logger.getLogger(ModelPhase.class); + + public void messageReceived(Object msg) throws AMQPException + { + try + { + _queue.put((AMQPApplicationMessage)msg); + } + catch (InterruptedException e) + { + _logger.error("Error adding message to queue", e); + } + super.messageReceived(msg); + } + + public void messageSent(Object msg) throws AMQPException + { + super.messageSent(msg); + } + + public AMQPApplicationMessage getNextMessage() + { + return _queue.poll(); + } + + public AMQPApplicationMessage getNextMessage(long timeout, TimeUnit tu) throws InterruptedException + { + return _queue.poll(timeout, tu); + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java new file mode 100644 index 0000000000..93eecdc0cc --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java @@ -0,0 +1,17 @@ +package org.apache.qpid.nclient.message; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.message.MessageHeaders; + +public interface MessageStore { + + public void removeMessage(String identifier); + + public void storeContentBodyChunk(String identifier,byte[] contentBody) throws AMQException; + + public void storeMessageMetaData(String identifier, MessageHeaders messageHeaders) throws AMQException; + + public AMQPApplicationMessage getMessage(String identifier) throws AMQException; + + public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java new file mode 100644 index 0000000000..26cf2327d7 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java @@ -0,0 +1,40 @@ +package org.apache.qpid.nclient.message; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.message.MessageHeaders; + +public class TransientMessageStore implements MessageStore { + + private Map<String,AMQPApplicationMessage> messageMap = new ConcurrentHashMap<String,AMQPApplicationMessage>(); + + public AMQPApplicationMessage getMessage(String identifier) + throws AMQException + { + return messageMap.get(identifier); + } + + public void removeMessage(String identifier) + { + messageMap.remove(identifier); + } + + public void storeContentBodyChunk(String identifier, byte[] contentBody) + throws AMQException + { + + } + + public void storeMessageMetaData(String identifier, + MessageHeaders messageHeaders) throws AMQException + { + + } + + public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException + { + + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java new file mode 100644 index 0000000000..c33c087da8 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java @@ -0,0 +1,64 @@ +package org.apache.qpid.nclient.model; + +import org.apache.qpid.framing.AMQMethodBody; + +/** + * This class is exactly the same as the AMQMethod event. + * Except I renamed requestId to corelationId, so I could use it both ways. + * + * I didn't want to modify anything in common so that there is no + * impact on the existing code. + * + */ +public class AMQPMethodEvent <M extends AMQMethodBody> { + + private final M _method; + private final int _channelId; + private final long _correlationId; + private long _localCorrletionId = 0; + + public AMQPMethodEvent(int channelId, M method, long correlationId,long localCorrletionId) + { + _channelId = channelId; + _method = method; + _correlationId = correlationId; + _localCorrletionId = localCorrletionId; + } + + public AMQPMethodEvent(int channelId, M method, long correlationId) + { + _channelId = channelId; + _method = method; + _correlationId = correlationId; + } + + public M getMethod() + { + return _method; + } + + public int getChannelId() + { + return _channelId; + } + + public long getCorrelationId() + { + return _correlationId; + } + + public long getLocalCorrelationId() + { + return _localCorrletionId; + } + + public String toString() + { + StringBuilder buf = new StringBuilder("Method event: \n"); + buf.append("Channel id: \n").append(_channelId); + buf.append("Method: \n").append(_method); + buf.append("Request Id: ").append(_correlationId); + buf.append("Local Correlation Id: ").append(_localCorrletionId); + return buf.toString(); + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java new file mode 100644 index 0000000000..52b9f6de91 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java @@ -0,0 +1,11 @@ +package org.apache.qpid.nclient.model; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.nclient.core.AMQPException; + +public interface AMQPMethodListener +{ + + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException; + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java new file mode 100644 index 0000000000..77003b4d21 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java @@ -0,0 +1,133 @@ +package org.apache.qpid.nclient.model; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.log4j.Logger; +import org.apache.qpid.nclient.amqp.state.AMQPStateManager; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.AbstractPhase; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.PhaseContext; +import org.apache.qpid.nclient.core.QpidConstants; + +/** + * This Phase handles Layer 3 functionality of the AMQP spec. + * This class acts as the interface between the API and the pipeline + */ +public class ModelPhase extends AbstractPhase { + + private static final Logger _logger = Logger.getLogger(ModelPhase.class); + + private Map <Class,List> _methodListners = new HashMap<Class,List>(); + + /** + * ------------------------------------------------ + * Phase - methods introduced by Phase + * ------------------------------------------------ + */ + public void init(PhaseContext ctx, Phase nextInFlowPhase, Phase nextOutFlowPhase) + { + super.init(ctx, nextInFlowPhase, nextOutFlowPhase); + try + { + loadMethodListeners(); + } + catch(Exception e) + { + _logger.fatal("Error loading method listeners", e); + } + } + + public void messageReceived(Object msg) throws AMQPException + { + notifyMethodListerners((AMQPMethodEvent)msg); + + // not doing super.methodReceived here, as this is the end of + // the pipeline + //super.messageReceived(msg); + } + + /** + * This method should only except and pass messages + * of Type @see AMQPMethodEvent + */ + public void messageSent(Object msg) throws AMQPException + { + super.messageSent(msg); + } + + /** + * ------------------------------------------------ + * Event Handling + * ------------------------------------------------ + */ + + public void notifyMethodListerners(AMQPMethodEvent event) throws AMQPException + { + if (_methodListners.containsKey(event.getMethod().getClass())) + { + List<AMQPMethodListener> listeners = _methodListners.get(event.getMethod().getClass()); + + if(listeners.size()>0) + { + throw new AMQPException("There are no registered listeners for this method"); + } + + for(AMQPMethodListener l : listeners) + { + try + { + l.methodReceived(event); + } + catch (Exception e) + { + _logger.error("Error handling method event " + event, e); + } + } + } + } + + /** + * ------------------------------------------------ + * Configuration + * ------------------------------------------------ + */ + + /** + * This method loads method listeners from the client.xml file + * For each method class there is a list of listeners + */ + private void loadMethodListeners() throws Exception + { + int count = ClientConfiguration.get().getMaxIndex(QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER); + System.out.println(count); + + for(int i=0 ;i<count;i++) + { + String methodListener = QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER + "(" + i + ")"; + String className = ClientConfiguration.get().getString(methodListener + "." + QpidConstants.CLASS); + Class listenerClass = Class.forName(className); + List<String> list = ClientConfiguration.get().getList(methodListener + "." + QpidConstants.METHOD_CLASS); + for(String s:list) + { + List listeners; + Class methodClass = Class.forName(s); + if (_methodListners.containsKey(methodClass)) + { + listeners = _methodListners.get(methodClass); + } + else + { + listeners = new ArrayList(); + _methodListners.put(methodClass,listeners); + } + listeners.add(listenerClass); + } + } + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java new file mode 100644 index 0000000000..fc5878f6ef --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security; + +import javax.security.auth.callback.CallbackHandler; + +import org.apache.qpid.nclient.transport.ConnectionURL; + +public interface AMQPCallbackHandler extends CallbackHandler +{ + void initialise(ConnectionURL url); +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java new file mode 100644 index 0000000000..28ba2e355c --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.log4j.Logger; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.QpidConstants; + +public class CallbackHandlerRegistry +{ + private static final Logger _logger = Logger.getLogger(CallbackHandlerRegistry.class); + + private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry(); + + private Map<String,Class> _mechanismToHandlerClassMap = new HashMap<String,Class>(); + + private String _mechanisms; + + public static CallbackHandlerRegistry getInstance() + { + return _instance; + } + + public Class getCallbackHandlerClass(String mechanism) + { + return _mechanismToHandlerClassMap.get(mechanism); + } + + public String getMechanisms() + { + return _mechanisms; + } + + private CallbackHandlerRegistry() + { + // first we register any Sasl client factories + DynamicSaslRegistrar.registerSaslProviders(); + parseProperties(); + } + + private void parseProperties() + { + List<String> mechanisms = ClientConfiguration.get().getList(QpidConstants.AMQP_SECURITY_MECHANISMS); + + for (String mechanism : mechanisms) + { + String className = ClientConfiguration.get().getString(QpidConstants.AMQP_SECURITY_MECHANISM_HANDLER + "_" + mechanism); + Class clazz = null; + try + { + clazz = Class.forName(className); + if (!AMQPCallbackHandler.class.isAssignableFrom(clazz)) + { + _logger.warn("SASL provider " + clazz + " does not implement " + AMQPCallbackHandler.class + + ". Skipping"); + continue; + } + _mechanismToHandlerClassMap.put(mechanism, clazz); + if (_mechanisms == null) + { + _mechanisms = mechanism; + } + else + { + // one time cost + _mechanisms = _mechanisms + " " + mechanism; + } + } + catch (ClassNotFoundException ex) + { + _logger.warn("Unable to load class " + className + ". Skipping that SASL provider"); + continue; + } + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java new file mode 100644 index 0000000000..958c6c4782 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security; + +import java.security.Security; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import javax.security.sasl.SaslClientFactory; + +import org.apache.log4j.Logger; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.QpidConstants; + +public class DynamicSaslRegistrar +{ + private static final Logger _logger = Logger.getLogger(DynamicSaslRegistrar.class); + + public static void registerSaslProviders() + { + Map<String, Class<? extends SaslClientFactory>> factories = parseProperties(); + if (factories.size() > 0) + { + Security.addProvider(new JCAProvider(factories)); + _logger.debug("Dynamic SASL provider added as a security provider"); + } + } + + private static Map<String, Class<? extends SaslClientFactory>> parseProperties() + { + List<String> mechanisms = ClientConfiguration.get().getList(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES); + TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister = + new TreeMap<String, Class<? extends SaslClientFactory>>(); + for (String mechanism: mechanisms) + { + String className = ClientConfiguration.get().getString(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY + "_" + mechanism); + try + { + Class<?> clazz = Class.forName(className); + if (!(SaslClientFactory.class.isAssignableFrom(clazz))) + { + _logger.error("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping"); + continue; + } + factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz); + } + catch (Exception ex) + { + _logger.error("Error instantiating SaslClientFactory calss " + className + " - skipping"); + } + } + return factoriesToRegister; + } + + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java new file mode 100644 index 0000000000..10ccb88821 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security; + +import javax.security.sasl.SaslClientFactory; +import java.security.Provider; +import java.security.Security; +import java.util.Map; + +public class JCAProvider extends Provider +{ + public JCAProvider(Map<String, Class<? extends SaslClientFactory>> providerMap) + { + super("AMQSASLProvider", 1.0, "A JCA provider that registers all " + + "AMQ SASL providers that want to be registered"); + register(providerMap); + Security.addProvider(this); + } + + private void register(Map<String, Class<? extends SaslClientFactory>> providerMap) + { + for (Map.Entry<String, Class<? extends SaslClientFactory>> me : + providerMap.entrySet()) + { + put("SaslClientFactory." + me.getKey(), me.getValue().getName()); + } + } +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java new file mode 100644 index 0000000000..7297d07134 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security; + +import java.io.IOException; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; + +import org.apache.qpid.nclient.transport.ConnectionURL; + +public class UsernamePasswordCallbackHandler implements AMQPCallbackHandler +{ + private ConnectionURL _url; + + public void initialise(ConnectionURL url) + { + _url = url; + } + + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException + { + for (int i = 0; i < callbacks.length; i++) + { + Callback cb = callbacks[i]; + if (cb instanceof NameCallback) + { + ((NameCallback)cb).setName(_url.getUsername()); + } + else if (cb instanceof PasswordCallback) + { + ((PasswordCallback)cb).setPassword((_url.getPassword()).toCharArray()); + } + else + { + throw new UnsupportedCallbackException(cb); + } + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java new file mode 100644 index 0000000000..9e878fb839 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java @@ -0,0 +1,344 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.url.URLSyntaxException; + +import java.util.HashMap; +import java.net.URISyntaxException; +import java.net.URI; + +public class AMQPBrokerDetails implements BrokerDetails +{ + private String _host; + + private int _port; + + private String _transport; + + private HashMap<String, String> _options; + + public AMQPBrokerDetails() + { + _options = new HashMap<String, String>(); + } + + public AMQPBrokerDetails(String url) throws URLSyntaxException + { + this(); + // URL should be of format tcp://host:port?option='value',option='value' + try + { + URI connection = new URI(url); + + String transport = connection.getScheme(); + + // Handles some defaults to minimise changes to existing broker URLS e.g. localhost + if (transport != null) + { + //todo this list of valid transports should be enumerated somewhere + if ((!(transport.equalsIgnoreCase("vm") || transport.equalsIgnoreCase("tcp")))) + { + if (transport.equalsIgnoreCase("localhost")) + { + connection = new URI(DEFAULT_TRANSPORT + "://" + url); + transport = connection.getScheme(); + } + else + { + if (url.charAt(transport.length()) == ':' && url.charAt(transport.length() + 1) != '/') + { + //Then most likely we have a host:port value + connection = new URI(DEFAULT_TRANSPORT + "://" + url); + transport = connection.getScheme(); + } + else + { + URLHelper.parseError(0, transport.length(), "Unknown transport", url); + } + } + } + } + else + { + //Default the transport + connection = new URI(DEFAULT_TRANSPORT + "://" + url); + transport = connection.getScheme(); + } + + if (transport == null) + { + URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" + " In broker URL:'" + url + + "' Format: " + URL_FORMAT_EXAMPLE, ""); + } + + setTransport(transport); + + String host = connection.getHost(); + + // Fix for Java 1.5 + if (host == null) + { + host = ""; + } + + setHost(host); + + int port = connection.getPort(); + + if (port == -1) + { + // Fix for when there is port data but it is not automatically parseable by getPort(). + String auth = connection.getAuthority(); + + if (auth != null && auth.contains(":")) + { + int start = auth.indexOf(":") + 1; + int end = start; + boolean looking = true; + boolean found = false; + //Walk the authority looking for a port value. + while (looking) + { + try + { + end++; + Integer.parseInt(auth.substring(start, end)); + + if (end >= auth.length()) + { + looking = false; + found = true; + } + } + catch (NumberFormatException nfe) + { + looking = false; + } + + } + if (found) + { + setPort(Integer.parseInt(auth.substring(start, end))); + } + else + { + URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1, + "Illegal character in port number", connection.toString()); + } + + } + else + { + setPort(DEFAULT_PORT); + } + } + else + { + setPort(port); + } + + String queryString = connection.getQuery(); + + URLHelper.parseOptions(_options, queryString); + + //Fragment is #string (not used) + } + catch (URISyntaxException uris) + { + if (uris instanceof URLSyntaxException) + { + throw (URLSyntaxException) uris; + } + + URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); + } + } + + public AMQPBrokerDetails(String host, int port, boolean useSSL) + { + _host = host; + _port = port; + + if (useSSL) + { + setOption(OPTIONS_SSL, "true"); + } + } + + public String getHost() + { + return _host; + } + + public void setHost(String _host) + { + this._host = _host; + } + + public int getPort() + { + return _port; + } + + public void setPort(int _port) + { + this._port = _port; + } + + public String getTransport() + { + return _transport; + } + + public void setTransport(String _transport) + { + this._transport = _transport; + } + + public String getOption(String key) + { + return _options.get(key); + } + + public void setOption(String key, String value) + { + _options.put(key, value); + } + + public long getTimeout() + { + if (_options.containsKey(OPTIONS_CONNECT_TIMEOUT)) + { + try + { + return Long.parseLong(_options.get(OPTIONS_CONNECT_TIMEOUT)); + } + catch (NumberFormatException nfe) + { + //Do nothing as we will use the default below. + } + } + + return BrokerDetails.DEFAULT_CONNECT_TIMEOUT; + } + + public void setTimeout(long timeout) + { + setOption(OPTIONS_CONNECT_TIMEOUT, Long.toString(timeout)); + } + + public String toString() + { + StringBuffer sb = new StringBuffer(); + + sb.append(_transport); + sb.append("://"); + + if (!(_transport.equalsIgnoreCase("vm"))) + { + sb.append(_host); + } + + sb.append(':'); + sb.append(_port); + + sb.append(printOptionsURL()); + + return sb.toString(); + } + + public boolean equals(Object o) + { + if (!(o instanceof BrokerDetails)) + { + return false; + } + + BrokerDetails bd = (BrokerDetails) o; + + return _host.equalsIgnoreCase(bd.getHost()) && (_port == bd.getPort()) + && _transport.equalsIgnoreCase(bd.getTransport()) && (useSSL() == bd.useSSL()); + + //todo do we need to compare all the options as well? + } + + private String printOptionsURL() + { + StringBuffer optionsURL = new StringBuffer(); + + optionsURL.append('?'); + + if (!(_options.isEmpty())) + { + + for (String key : _options.keySet()) + { + optionsURL.append(key); + + optionsURL.append("='"); + + optionsURL.append(_options.get(key)); + + optionsURL.append("'"); + + optionsURL.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + } + + //removeKey the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options + optionsURL.deleteCharAt(optionsURL.length() - 1); + + return optionsURL.toString(); + } + + public boolean useSSL() + { + // To be friendly to users we should be case insensitive. + // or simply force users to conform to OPTIONS_SSL + // todo make case insensitive by trying ssl Ssl sSl ssL SSl SsL sSL SSL + + if (_options.containsKey(OPTIONS_SSL)) + { + return _options.get(OPTIONS_SSL).equalsIgnoreCase("true"); + } + + return USE_SSL_DEFAULT; + } + + public void useSSL(boolean ssl) + { + setOption(OPTIONS_SSL, Boolean.toString(ssl)); + } + + public static String checkTransport(String broker) + { + if ((!broker.contains("://"))) + { + return "tcp://" + broker; + } + else + { + return broker; + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java new file mode 100644 index 0000000000..d0259830c6 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java @@ -0,0 +1,412 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.url.URLSyntaxException; + +import java.util.*; +import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; + +public class AMQPConnectionURL implements ConnectionURL +{ + private String _url; + private String _failoverMethod; + private HashMap<String, String> _failoverOptions; + private HashMap<String, String> _options; + private List<BrokerDetails> _brokers; + private String _clientName; + private String _username; + private String _password; + private String _virtualHost; + + public AMQPConnectionURL(String fullURL) throws URLSyntaxException + { + _url = fullURL; + _options = new HashMap<String, String>(); + _brokers = new LinkedList<BrokerDetails>(); + _failoverOptions = new HashMap<String, String>(); + + try + { + URI connection = new URI(fullURL); + + if (connection.getScheme() == null || !(connection.getScheme().equalsIgnoreCase(AMQ_PROTOCOL))) + { + throw new URISyntaxException(fullURL, "Not an AMQP URL"); + } + + if (connection.getHost() == null || connection.getHost().equals("")) + { + String uid = getUniqueClientID(); + if (uid == null) + { + URLHelper.parseError(-1, "Client Name not specified", fullURL); + } + else + { + setClientName(uid); + } + + } + else + { + setClientName(connection.getHost()); + } + + String userInfo = connection.getUserInfo(); + + if (userInfo == null) + { + //Fix for Java 1.5 which doesn't parse UserInfo for non http URIs + userInfo = connection.getAuthority(); + + if (userInfo != null) + { + int atIndex = userInfo.indexOf('@'); + + if (atIndex != -1) + { + userInfo = userInfo.substring(0, atIndex); + } + else + { + userInfo = null; + } + } + + } + + if (userInfo == null) + { + URLHelper.parseError(AMQ_PROTOCOL.length() + 3, + "User information not found on url", fullURL); + } + else + { + parseUserInfo(userInfo); + } + String virtualHost = connection.getPath(); + + if (virtualHost != null && (!virtualHost.equals(""))) + { + setVirtualHost(virtualHost); + } + else + { + int authLength = connection.getAuthority().length(); + int start = AMQ_PROTOCOL.length() + 3; + int testIndex = start + authLength; + if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?') + { + URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL); + } + else + { + URLHelper.parseError(-1, "Virtual host not specified", fullURL); + } + + } + + + URLHelper.parseOptions(_options, connection.getQuery()); + + processOptions(); + + //Fragment is #string (not used) + //System.out.println(connection.getFragment()); + + } + catch (URISyntaxException uris) + { + if (uris instanceof URLSyntaxException) + { + throw (URLSyntaxException) uris; + } + + int slash = fullURL.indexOf("\\"); + + if (slash == -1) + { + URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); + } + else + { + if (slash != 0 && fullURL.charAt(slash - 1) == ':') + { + URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL); + } + else + { + URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL); + } + } + + } + } + + private String getUniqueClientID() + { + try + { + InetAddress addr = InetAddress.getLocalHost(); + return addr.getHostName() + System.currentTimeMillis(); + } + catch (UnknownHostException e) + { + return null; + } + } + + private void parseUserInfo(String userinfo) throws URLSyntaxException + { + //user info = user:pass + + int colonIndex = userinfo.indexOf(':'); + + if (colonIndex == -1) + { + URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(), + "Null password in user information not allowed.", _url); + } + else + { + setUsername(userinfo.substring(0, colonIndex)); + setPassword(userinfo.substring(colonIndex + 1)); + } + + } + + private void processOptions() throws URLSyntaxException + { + if (_options.containsKey(OPTIONS_BROKERLIST)) + { + String brokerlist = _options.get(OPTIONS_BROKERLIST); + + //brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value' + StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR); + + while (st.hasMoreTokens()) + { + String broker = st.nextToken(); + + _brokers.add(new AMQPBrokerDetails(broker)); + } + + _options.remove(OPTIONS_BROKERLIST); + } + + if (_options.containsKey(OPTIONS_FAILOVER)) + { + String failover = _options.get(OPTIONS_FAILOVER); + + // failover='method?option='value',option='value'' + + int methodIndex = failover.indexOf('?'); + + if (methodIndex > -1) + { + _failoverMethod = failover.substring(0, methodIndex); + URLHelper.parseOptions(_failoverOptions, failover.substring(methodIndex + 1)); + } + else + { + _failoverMethod = failover; + } + + _options.remove(OPTIONS_FAILOVER); + } + } + + public String getURL() + { + return _url; + } + + public String getFailoverMethod() + { + return _failoverMethod; + } + + public String getFailoverOption(String key) + { + return _failoverOptions.get(key); + } + + public void setFailoverOption(String key, String value) + { + _failoverOptions.put(key, value); + } + + public int getBrokerCount() + { + return _brokers.size(); + } + + public BrokerDetails getBrokerDetails(int index) + { + if (index < _brokers.size()) + { + return _brokers.get(index); + } + else + { + return null; + } + } + + public void addBrokerDetails(BrokerDetails broker) + { + if (!(_brokers.contains(broker))) + { + _brokers.add(broker); + } + } + + public List<BrokerDetails> getAllBrokerDetails() + { + return _brokers; + } + + public String getClientName() + { + return _clientName; + } + + public void setClientName(String clientName) + { + _clientName = clientName; + } + + public String getUsername() + { + return _username; + } + + public void setUsername(String username) + { + _username = username; + } + + public String getPassword() + { + return _password; + } + + public void setPassword(String password) + { + _password = password; + } + + public String getVirtualHost() + { + return _virtualHost; + } + + public void setVirtualHost(String virtuaHost) + { + _virtualHost = virtuaHost; + } + + public String getOption(String key) + { + return _options.get(key); + } + + public void setOption(String key, String value) + { + _options.put(key, value); + } + + public String toString() + { + StringBuffer sb = new StringBuffer(); + + sb.append(AMQ_PROTOCOL); + sb.append("://"); + + if (_username != null) + { + sb.append(_username); + + if (_password != null) + { + sb.append(':'); + sb.append(_password); + } + + sb.append('@'); + } + + sb.append(_clientName); + + sb.append(_virtualHost); + + sb.append(optionsToString()); + + return sb.toString(); + } + + private String optionsToString() + { + StringBuffer sb = new StringBuffer(); + + sb.append("?" + OPTIONS_BROKERLIST + "='"); + + for (BrokerDetails service : _brokers) + { + sb.append(service.toString()); + sb.append(';'); + } + + sb.deleteCharAt(sb.length() - 1); + sb.append("'"); + + if (_failoverMethod != null) + { + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + sb.append(OPTIONS_FAILOVER + "='"); + sb.append(_failoverMethod); + sb.append(URLHelper.printOptions(_failoverOptions)); + sb.append("'"); + } + + return sb.toString(); + } + + + public static void main(String[] args) throws URLSyntaxException + { + + String url2 = "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'"; + //"amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''"; + + //ConnectionURL connectionurl2 = new AMQConnectionURL(url2); + + System.out.println(url2); + //System.out.println(connectionurl2); + + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java new file mode 100644 index 0000000000..fe20a1e8dd --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + +public interface BrokerDetails +{ + + /* + * Known URL Options + * @see ConnectionURL + */ + public static final String OPTIONS_RETRY = "retries"; + public static final String OPTIONS_SSL = ConnectionURL.OPTIONS_SSL; + public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout"; + public static final int DEFAULT_PORT = 5672; + + public static final String TCP = "tcp"; + public static final String VM = "vm"; + + public static final String DEFAULT_TRANSPORT = TCP; + + public static final String URL_FORMAT_EXAMPLE = + "<transport>://<hostname>[:<port Default=\"" + DEFAULT_PORT + "\">][?<option>='<value>'[,<option>='<value>']]"; + + public static final long DEFAULT_CONNECT_TIMEOUT = 30000L; + public static final boolean USE_SSL_DEFAULT = false; + + String getHost(); + + void setHost(String host); + + int getPort(); + + void setPort(int port); + + String getTransport(); + + void setTransport(String transport); + + boolean useSSL(); + + void useSSL(boolean ssl); + + String getOption(String key); + + void setOption(String key, String value); + + long getTimeout(); + + void setTimeout(long timeout); + + String toString(); + + boolean equals(Object o); +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java new file mode 100644 index 0000000000..79653dc442 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + +import java.util.List; + +/** + Connection URL format + For TCP: + amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\'&option=\'value\' + + For VMBroker: + vm://:3/virtualpath?option=\'value\''&failover='method?option=\'value\'&option='value''" + + Options are of course optional except for requiring a single broker in the broker list. + The option seperator is defined to be either '&' or ',' + */ +public interface ConnectionURL +{ + public static final String AMQ_PROTOCOL = "amqp"; + public static final String OPTIONS_BROKERLIST = "brokerlist"; + public static final String OPTIONS_FAILOVER = "failover"; + public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount"; + public static final String OPTIONS_SSL = "ssl"; + + String getURL(); + + String getFailoverMethod(); + + String getFailoverOption(String key); + + int getBrokerCount(); + + BrokerDetails getBrokerDetails(int index); + + void addBrokerDetails(BrokerDetails broker); + + List<BrokerDetails> getAllBrokerDetails(); + + String getClientName(); + + void setClientName(String clientName); + + String getUsername(); + + void setUsername(String username); + + String getPassword(); + + void setPassword(String password); + + String getVirtualHost(); + + void setVirtualHost(String virtualHost); + + String getOption(String key); + + void setOption(String key, String value); +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java new file mode 100644 index 0000000000..aae3677f8b --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java @@ -0,0 +1,82 @@ +package org.apache.qpid.nclient.transport; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.DefaultPhaseContext; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.PhaseContext; +import org.apache.qpid.nclient.core.PhaseFactory; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.pool.ReadWriteThreadModel; + +public class TCPConnection implements TransportConnection +{ + private static final Logger _logger = Logger.getLogger(TCPConnection.class); + private BrokerDetails _brokerDetails; + private IoConnector _ioConnector; + private Phase _phase; + + protected TCPConnection(ConnectionURL url) + { + _brokerDetails = url.getBrokerDetails(0); + + ByteBuffer.setUseDirectBuffers(ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_DIRECT_BUFFERS)); + + // the MINA default is currently to use the pooled allocator although this may change in future + // once more testing of the performance of the simple allocator has been done + if (ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_POOLED_ALLOCATOR)) + { + // Not sure what the original code wanted use :) + } + else + { + _logger.info("Using SimpleByteBufferAllocator"); + ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + } + + final IoConnector ioConnector = new SocketConnector(); + SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); + + // if we do not use our own thread model we get the MINA default which is to use + // its own leader-follower model + if (ClientConfiguration.get().getBoolean(QpidConstants.USE_SHARED_READ_WRITE_POOL)) + { + cfg.setThreadModel(ReadWriteThreadModel.getInstance()); + } + + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + scfg.setTcpNoDelay(ClientConfiguration.get().getBoolean(QpidConstants.TCP_NO_DELAY)); + scfg.setSendBufferSize(ClientConfiguration.get().getInt(QpidConstants.SEND_BUFFER_SIZE_IN_KB)*1024); + scfg.setReceiveBufferSize(ClientConfiguration.get().getInt(QpidConstants.RECEIVE_BUFFER_SIZE_IN_KB)*1024); + } + + // Returns the phase pipe + public Phase connect() throws AMQPException + { + PhaseContext ctx = new DefaultPhaseContext(); + ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails); + ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector); + + _phase = PhaseFactory.createPhasePipe(ctx); + _phase.start(); + + return _phase; + } + + public void close() throws AMQPException + { + + } + + public Phase getPhasePipe() + { + return _phase; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java new file mode 100644 index 0000000000..9df353a2df --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + + +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; + +public interface TransportConnection +{ + public Phase connect()throws AMQPException; + + public void close()throws AMQPException; + + public Phase getPhasePipe(); +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java new file mode 100644 index 0000000000..5c97100bdc --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java @@ -0,0 +1,44 @@ +package org.apache.qpid.nclient.transport; + +import java.net.URISyntaxException; + +public class TransportConnectionFactory +{ + public enum ConnectionType + { + TCP,VM + } + + public static TransportConnection createTransportConnection(String url,ConnectionType type) throws URISyntaxException + { + return createTransportConnection(new AMQPConnectionURL(url),type); + + } + + public static TransportConnection createTransportConnection(ConnectionURL url,ConnectionType type) + { + switch (type) + { + case TCP : default: + { + return createTCPConnection(url); + } + + case VM : + { + return createVMConnection(url); + } + } + + } + + private static TransportConnection createTCPConnection(ConnectionURL url) + { + return new TCPConnection(url); + } + + private static TransportConnection createVMConnection(ConnectionURL url) + { + return null; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java new file mode 100644 index 0000000000..b1bc7b4c8c --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java @@ -0,0 +1,263 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.WriteFuture; +import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.mina.transport.vmpipe.VmPipeAddress; +import org.apache.mina.transport.vmpipe.VmPipeConnector; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.AbstractPhase; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.ssl.BogusSSLContextFactory; + +/** + * The Transport Phase corresponds to the Layer 1 in AMQP It works at the Frame + * layer + * + */ +public class TransportPhase extends AbstractPhase implements IoHandler, ProtocolVersionList +{ + + private static final Logger _logger = Logger + .getLogger(TransportPhase.class); + + private IoSession _ioSession; + private BrokerDetails _brokerDetails; + + protected WriteFuture _lastWriteFuture; + + /** + * ------------------------------------------------ + * Phase - methods introduced by Phase + * ------------------------------------------------ + */ + + public void start()throws AMQPException + { + _brokerDetails = (BrokerDetails)_ctx.getProperty(QpidConstants.AMQP_BROKER_DETAILS); + IoConnector ioConnector = (IoConnector)_ctx.getProperty(QpidConstants.MINA_IO_CONNECTOR); + + final SocketAddress address; + if (ioConnector instanceof VmPipeConnector) + { + address = new VmPipeAddress(_brokerDetails.getPort()); + } + else + { + address = new InetSocketAddress(_brokerDetails.getHost(), _brokerDetails.getPort()); + _logger.info("Attempting connection to " + address); + + } + + ConnectFuture future = ioConnector.connect(address,this); + + // wait for connection to complete + if (future.join(_brokerDetails.getTimeout())) + { + // we call getSession which throws an IOException if there has been an error connecting + future.getSession(); + } + else + { + throw new AMQPException("Timeout waiting for connection."); + } + } + + public void messageReceived(Object frame) throws AMQPException + { + super.messageReceived(frame); + } + + public void messageSent(Object frame) throws AMQPException + { + + _ioSession.write(frame); + } + + /** + * ------------------------------------------------ + * IoHandler - methods introduced by IoHandler + * ------------------------------------------------ + */ + public void sessionIdle(IoSession session, IdleStatus status) + throws Exception + { + _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + + status); + if (IdleStatus.WRITER_IDLE.equals(status)) + { + // write heartbeat frame: + _logger.debug("Sent heartbeat"); + session.write(HeartbeatBody.FRAME); + // HeartbeatDiagnostics.sent(); + } else if (IdleStatus.READER_IDLE.equals(status)) + { + // failover: + // HeartbeatDiagnostics.timeout(); + _logger.warn("Timed out while waiting for heartbeat from peer."); + session.close(); + } + } + + public void messageReceived(IoSession session, Object message) + throws Exception + { + AMQFrame frame = (AMQFrame) message; + final AMQBody bodyFrame = frame.getBodyFrame(); + + if (bodyFrame instanceof HeartbeatBody) + { + _logger.debug("Received heartbeat"); + } else + { + messageReceived(bodyFrame); + } + // _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); + } + + public void messageSent(IoSession session, Object message) throws Exception + { + _logger.debug("Sent frame " + message); + } + + public void exceptionCaught(IoSession session, Throwable cause) + throws Exception + { + // Need to handle failover + sessionClosed(session); + } + + public void sessionClosed(IoSession session) throws Exception + { + // Need to handle failover + _logger.info("Protocol Session [" + this + "] closed"); + } + + public void sessionCreated(IoSession session) throws Exception + { + _logger.debug("Protocol session created for session " + + System.identityHashCode(session)); + + final ProtocolCodecFilter pcf = new ProtocolCodecFilter( + new AMQCodecFactory(false)); + + if (ClientConfiguration.get().getBoolean( + QpidConstants.USE_SHARED_READ_WRITE_POOL)) + { + session.getFilterChain().addBefore("AsynchronousWriteFilter", + "protocolFilter", pcf); + } else + { + session.getFilterChain().addLast("protocolFilter", pcf); + } + // we only add the SSL filter where we have an SSL connection + if (_brokerDetails.useSSL()) + { + // FIXME: Bogus context cannot be used in production. + SSLFilter sslFilter = new SSLFilter(BogusSSLContextFactory + .getInstance(false)); + sslFilter.setUseClientMode(true); + session.getFilterChain().addBefore("protocolFilter", "ssl", + sslFilter); + } + + try + { + + ReadWriteThreadModel threadModel = ReadWriteThreadModel + .getInstance(); + threadModel.getAsynchronousReadFilter().createNewJobForSession( + session); + threadModel.getAsynchronousWriteFilter().createNewJobForSession( + session); + } catch (RuntimeException e) + { + e.printStackTrace(); + } + + doAMQPConnectionNegotiation(); + } + + public void sessionOpened(IoSession session) throws Exception + { + _logger.debug("Protocol session opened for session " + + System.identityHashCode(session)); + } + + /** + * ---------------------------------------------------------- + * Protocol related methods + * ---------------------------------------------------------- + */ + private void doAMQPConnectionNegotiation() + { + int i = pv.length - 1; + writeFrame(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); + } + + /** + * ---------------------------------------------------------- + * Write Operations + * ---------------------------------------------------------- + */ + public void writeFrame(AMQDataBlock frame) + { + writeFrame(frame, false); + } + + public void writeFrame(AMQDataBlock frame, boolean wait) + { + WriteFuture f = _ioSession.write(frame); + if (wait) + { + // fixme -- time out? + f.join(); + } else + { + _lastWriteFuture = f; + } + } + + /** + * ----------------------------------------------------------- Failover + * section ----------------------------------------------------------- + */ +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java new file mode 100644 index 0000000000..c13bb873a7 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java @@ -0,0 +1,133 @@ +package org.apache.qpid.nclient.transport; + +import java.io.IOException; + +import org.apache.log4j.Logger; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.transport.vmpipe.VmPipeAcceptor; +import org.apache.mina.transport.vmpipe.VmPipeAddress; +import org.apache.mina.transport.vmpipe.VmPipeConnector; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.DefaultPhaseContext; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.PhaseContext; +import org.apache.qpid.nclient.core.PhaseFactory; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.pool.PoolingFilter; +import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.pool.ReferenceCountingExecutorService; + +public class VMConnection implements TransportConnection +{ + private static final Logger _logger = Logger.getLogger(VMConnection.class); + private BrokerDetails _brokerDetails; + private IoConnector _ioConnector; + private Phase _phase; + + protected VMConnection(ConnectionURL url) + { + _brokerDetails = url.getBrokerDetails(0); + _ioConnector = new VmPipeConnector(); + final IoServiceConfig cfg = _ioConnector.getDefaultConfig(); + ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance(); + PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executorService, + "AsynchronousReadFilter"); + cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead); + PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executorService, + "AsynchronousWriteFilter"); + cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite); + } + + public Phase connect() throws AMQPException + { + createVMBroker(); + + PhaseContext ctx = new DefaultPhaseContext(); + ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails); + ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector); + + _phase = PhaseFactory.createPhasePipe(ctx); + _phase.start(); + + return _phase; + + } + + private void createVMBroker()throws AMQPException + { + _logger.info("Creating InVM Qpid.AMQP listening on port " + _brokerDetails.getPort()); + + VmPipeAcceptor acceptor = new VmPipeAcceptor(); + IoServiceConfig config = acceptor.getDefaultConfig(); + config.setThreadModel(ReadWriteThreadModel.getInstance()); + + IoHandlerAdapter provider = null; + try + { + VmPipeAddress pipe = new VmPipeAddress(_brokerDetails.getPort()); + provider = createBrokerInstance(_brokerDetails.getPort()); + acceptor.bind(pipe, provider); + _logger.info("Created InVM Qpid.AMQP listening on port " + _brokerDetails.getPort()); + } + catch (IOException e) + { + _logger.error(e); + VmPipeAddress pipe = new VmPipeAddress(_brokerDetails.getPort()); + acceptor.unbind(pipe); + + throw new AMQPException("Error creating VM broker",e); + } + } + + private IoHandlerAdapter createBrokerInstance(int port) throws AMQPException + { + String protocolProviderClass = ClientConfiguration.get().getString(QpidConstants.QPID_VM_BROKER_CLASS); + _logger.info("Creating Qpid protocol provider: " + protocolProviderClass); + + // can't use introspection to get Provider as it is a server class. + // need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access. + + //get correct constructor and pass in instancec ID - "port" + IoHandlerAdapter provider; + try + { + Class[] cnstr = {Integer.class}; + Object[] params = {port}; + provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); + //Give the broker a second to create + _logger.info("Created VMBroker Instance:" + port); + } + catch (Exception e) + { + _logger.info("Unable to create InVM Qpid broker on port " + port + ". due to : " + e.getCause()); + _logger.error(e); + String because; + if (e.getCause() == null) + { + because = e.toString(); + } + else + { + because = e.getCause().toString(); + } + + + throw new AMQPException(port, because + " Stopped InVM Qpid.AMQP creation",e); + } + + return provider; + } + + public void close() throws AMQPException + { + + } + + public Phase getPhasePipe() + { + return _phase; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java b/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java new file mode 100644 index 0000000000..e161e30227 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java @@ -0,0 +1,14 @@ +package org.apache.qpid.nclient.util; + +import org.apache.qpid.nclient.core.AMQPException; + +public class AMQPValidator +{ + public static void throwExceptionOnNull(Object obj, String msg) throws AMQPException + { + if(obj == null) + { + throw new AMQPException(msg); + } + } +} |