diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-03-29 22:24:20 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-03-29 22:24:20 +0000 |
commit | 3c9e433f7068925533a69ca65e32af0379594757 (patch) | |
tree | 01283e54db06bc9705e8ec93dd668238bd966b90 | |
parent | 0f9044243547ded8521af0c8d0ff81d791d8048d (diff) | |
download | qpid-python-3c9e433f7068925533a69ca65e32af0379594757.tar.gz |
First cut of the AMQP java API
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@523854 13f79535-47bb-0310-9956-ffa450edef68
38 files changed, 2615 insertions, 904 deletions
diff --git a/java/newclient/.project b/java/newclient/.project index 0522e6b5dc..f1c93ad7a9 100644 --- a/java/newclient/.project +++ b/java/newclient/.project @@ -1,17 +1,14 @@ -<?xml version="1.0" encoding="UTF-8"?> <projectDescription> - <name>qpid-newclient</name> - <comment></comment> - <projects> - </projects> - <buildSpec> - <buildCommand> - <name>org.eclipse.jdt.core.javabuilder</name> - <arguments> - </arguments> - </buildCommand> - </buildSpec> - <natures> - <nature>org.eclipse.jdt.core.javanature</nature> - </natures> -</projectDescription> + <name>qpid-newclient</name> + <comment/> + <projects/> + <buildSpec> + <buildCommand> + <name>org.eclipse.jdt.core.javabuilder</name> + <arguments/> + </buildCommand> + </buildSpec> + <natures> + <nature>org.eclipse.jdt.core.javanature</nature> + </natures> +</projectDescription>
\ No newline at end of file diff --git a/java/newclient/src/main/java/client.log4j b/java/newclient/src/main/java/client.log4j index 525433e9a9..c2c7326479 100644 --- a/java/newclient/src/main/java/client.log4j +++ b/java/newclient/src/main/java/client.log4j @@ -16,10 +16,11 @@ # specific language governing permissions and limitations
# under the License.
#
-log4j.rootLogger=${root.logging.level}
+log4j.rootLogger=DEBUG
-log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+#log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+log4j.logger.org.apache.qpid=DEBUG, console
log4j.additivity.org.apache.qpid=false
log4j.appender.console=org.apache.log4j.ConsoleAppender
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java index 890b0dd6eb..f984e812c2 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java @@ -1,11 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.nclient.amqp; import java.security.SecureRandom; import java.util.concurrent.ConcurrentHashMap; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.QpidConstants; -import org.apache.qpid.nclient.model.AMQPMethodEvent; public abstract class AMQPCallBackSupport { @@ -31,16 +52,16 @@ public abstract class AMQPCallBackSupport { if(noWait) { - // u only need to register if u are expecting a response - long localCorrelationId = getNextCorrelationId(); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId); - _cbMap.put(localCorrelationId, cb); - return msg; + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID); + return msg; } else { - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID); - return msg; + // u only need to register if u are expecting a response + long localCorrelationId = getNextCorrelationId(); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId); + _cbMap.put(localCorrelationId, cb); + return msg; } } @@ -52,12 +73,25 @@ public abstract class AMQPCallBackSupport return msg; } - protected void invokeCallBack(long localCorrelationId, AMQMethodBody methodBody) + protected void invokeCallBack(long localCorrelationId, AMQMethodBody methodBody)throws AMQPException { - if(_cbMap.contains(localCorrelationId)) + if(_cbMap.containsKey(localCorrelationId)) { AMQPCallBack cb = (AMQPCallBack)_cbMap.get(localCorrelationId); - cb.brokerResponded(methodBody); + if(cb == null) + { + throw new AMQPException("Unable to find the callback object responsible for handling " + methodBody); + } + else + { + cb.setIsComplete(true); + cb.brokerResponded(methodBody); + } + _cbMap.remove(localCorrelationId); + } + else + { + //ignore, as this event is for another class instance } } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java index d86f948e28..7ba9120415 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java @@ -36,249 +36,317 @@ import org.apache.qpid.framing.ChannelOkBody; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; import org.apache.qpid.framing.ChannelResumeBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; import org.apache.qpid.nclient.amqp.state.AMQPState; import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; import org.apache.qpid.nclient.config.ClientConfiguration; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.Phase; import org.apache.qpid.nclient.core.QpidConstants; -import org.apache.qpid.nclient.model.AMQPMethodEvent; -import org.apache.qpid.nclient.model.AMQPMethodListener; import org.apache.qpid.nclient.util.AMQPValidator; /** - * This represents the Channel class defined in the AMQP protocol. - * This class is a finite state machine and is thread safe by design. - * Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown. - * Only one thread can enter the methods that change state, at a given time. - * The AMQP protocol recommends one thread per channel by design. - * - * A JMS Session can wrap an instance of this class. + * This represents the Channel class defined in the AMQP protocol. This class is a finite state machine and is thread + * safe by design. Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown. Only + * one thread can enter the methods that change state, at a given time. The AMQP protocol recommends one thread per + * channel by design. + * + * A JMS Session can wrap an instance of this class. */ public class AMQPChannel extends AMQPStateMachine implements AMQPMethodListener { -private static final Logger _logger = Logger.getLogger(AMQPChannel.class); - - //the channelId assigned for this channel - private int _channelId; - private Phase _phase; - private AMQPState _currentState; - private final AMQPState[] _validCloseStates = new AMQPState[]{AMQPState.CHANNEL_OPENED,AMQPState.CHANNEL_SUSPEND}; - private final AMQPState[] _validResumeStates = new AMQPState[]{AMQPState.CHANNEL_CLOSED,AMQPState.CHANNEL_NOT_OPENED}; - - // The wait period until a server sends a respond - private long _serverTimeOut = 1000; - private final Lock _lock = new ReentrantLock(); - private final Condition _channelNotOpend = _lock.newCondition(); - private final Condition _channelNotClosed = _lock.newCondition(); - private final Condition _channelFlowNotResponded = _lock.newCondition(); - private final Condition _channelNotResumed = _lock.newCondition(); - - private ChannelOpenOkBody _channelOpenOkBody; - private ChannelCloseOkBody _channelCloseOkBody; + private static final Logger _logger = Logger.getLogger(AMQPChannel.class); + + // the channelId assigned for this channel + private int _channelId; + + private Phase _phase; + + private AMQPState _currentState; + + private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND }; + + private final AMQPState[] _validResumeStates = new AMQPState[] { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED }; + + // The wait period until a server sends a respond + private long _serverTimeOut = 1000; + + private final Lock _lock = new ReentrantLock(); + + private final Condition _channelNotOpend = _lock.newCondition(); + + private final Condition _channelNotClosed = _lock.newCondition(); + + private final Condition _channelFlowNotResponded = _lock.newCondition(); + + private final Condition _channelNotResumed = _lock.newCondition(); + + private ChannelOpenOkBody _channelOpenOkBody; + + private ChannelCloseOkBody _channelCloseOkBody; + private ChannelFlowOkBody _channelFlowOkBody; + private ChannelOkBody _channelOkBody; - public AMQPChannel(int channelId) + private ChannelCloseBody _channelCloseBody; + + protected AMQPChannel(int channelId, Phase phase) { - _channelId = channelId; - _currentState = AMQPState.CHANNEL_NOT_OPENED; - _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); + _channelId = channelId; + _phase = phase; + _currentState = AMQPState.CHANNEL_NOT_OPENED; + _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); } - - /**------------------------------------------- + + /** + * ------------------------------------------- * API Methods - *-------------------------------------------- + * -------------------------------------------- */ - - /** - * Opens the channel - */ - public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException + + /** + * Opens the channel + */ + public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException + { + _lock.lock(); + try + { + _channelOpenOkBody = null; + checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _channelNotOpend.await(); + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time"); + _currentState = AMQPState.CHANNEL_OPENED; + return _channelOpenOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in channel.open", e); + } + finally + { + _lock.unlock(); + } + } + + /** + * Close the channel + */ + public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException + { + _lock.lock(); + try + { + _channelCloseOkBody = null; + checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _channelNotClosed.await(); + AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time"); + _currentState = AMQPState.CHANNEL_CLOSED; + return _channelCloseOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in channel.close", e); + } + finally + { + _lock.unlock(); + } + } + + /** + * Channel Flow + */ + public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException + { + _lock.lock(); + try + { + _channelFlowOkBody = null; + if (channelFlowBody.active) + { + checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED); + } + else + { + checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND); + } + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _channelFlowNotResponded.await(); + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time"); + handleChannelFlowState(_channelFlowOkBody.active); + return _channelFlowOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in channel.flow", e); + } + finally { - _lock.lock(); - try { - _channelOpenOkBody = null; - checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED,_currentState,AMQPState.CHANNEL_OPENED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelOpenBody,QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - _channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS); - AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time"); - _currentState = AMQPState.CHANNEL_OPENED; - return _channelOpenOkBody; - } - catch(Exception e) - { - throw new AMQPException("XXX"); - } - finally - { - _lock.unlock(); - } + _lock.unlock(); } - - /** - * Close the channel - */ - public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException + } + + /** + * Close the channel + */ + public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException + { + _lock.lock(); + try { - _lock.lock(); - try { - _channelCloseOkBody = null; - checkIfValidStateTransition(_validCloseStates,_currentState,AMQPState.CHANNEL_CLOSED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelCloseBody,QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - _channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); - AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time"); - _currentState = AMQPState.CHANNEL_CLOSED; - return _channelCloseOkBody; - } - catch(Exception e) - { - throw new AMQPException("XXX"); - } - finally - { - _lock.unlock(); - } + _channelOkBody = null; + checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _channelNotResumed.await(); + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_channelOkBody, + "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time"); + _currentState = AMQPState.CHANNEL_OPENED; + return _channelOkBody; } - - /** - * Channel Flow - */ - public ChannelFlowOkBody close(ChannelFlowBody channelFlowBody) throws AMQPException + catch (Exception e) { - _lock.lock(); - try { - _channelFlowOkBody = null; - if(channelFlowBody.active) - { - checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND,_currentState,AMQPState.CHANNEL_OPENED); - } - else - { - checkIfValidStateTransition(AMQPState.CHANNEL_OPENED,_currentState,AMQPState.CHANNEL_SUSPEND); - } - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelFlowBody,QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - _channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS); - AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time"); - handleChannelFlowState(_channelFlowOkBody.active); - return _channelFlowOkBody; - } - catch(Exception e) - { - throw new AMQPException("XXX"); - } - finally - { - _lock.unlock(); - } + throw new AMQPException("Error in channel.resume", e); } - - /** - * Close the channel - */ - public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException + finally { - _lock.lock(); - try { - _channelOkBody = null; - checkIfValidStateTransition(_validResumeStates,_currentState,AMQPState.CHANNEL_OPENED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelResumeBody,QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - _channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS); - AMQPValidator.throwExceptionOnNull(_channelOkBody, "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time"); - _currentState = AMQPState.CHANNEL_OPENED; - return _channelOkBody; - } - catch(Exception e) - { - throw new AMQPException("XXX"); - } - finally - { - _lock.unlock(); - } + _lock.unlock(); } - - /**------------------------------------------- + } + + /** + * ------------------------------------------- * AMQPMethodListener methods - *-------------------------------------------- + * -------------------------------------------- */ - public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException - { - if (evt.getMethod() instanceof ChannelOpenOkBody) - { - _channelOpenOkBody = (ChannelOpenOkBody)evt.getMethod(); - _channelNotOpend.signal(); - return true; - } - else if (evt.getMethod() instanceof ChannelCloseOkBody) - { - _channelCloseOkBody = (ChannelCloseOkBody)evt.getMethod(); - _channelNotClosed.signal(); - return true; - } - else if (evt.getMethod() instanceof ChannelCloseBody) - { - handleChannelClose((ChannelCloseBody)evt.getMethod()); - return true; - } - else if (evt.getMethod() instanceof ChannelFlowOkBody) - { - _channelFlowOkBody = (ChannelFlowOkBody)evt.getMethod(); - _channelFlowNotResponded.signal(); - return true; - } - else if (evt.getMethod() instanceof ChannelFlowBody) - { - handleChannelFlow((ChannelFlowBody)evt.getMethod()); - return true; - } - else if (evt.getMethod() instanceof ChannelOkBody) - { - _channelOkBody = (ChannelOkBody)evt.getMethod(); - //In this case the only method expecting channel-ok is channel-resume - // haven't implemented ping and pong. - _channelNotResumed.signal(); - return true; - } - else - { - return false; - } + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + _lock.lock(); + try + { + if (evt.getMethod() instanceof ChannelOpenOkBody) + { + _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod(); + _channelNotOpend.signal(); + return true; + } + else if (evt.getMethod() instanceof ChannelCloseOkBody) + { + _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod(); + _channelNotClosed.signal(); + return true; + } + else if (evt.getMethod() instanceof ChannelCloseBody) + { + _channelCloseBody = (ChannelCloseBody)evt.getMethod(); + // release the correct lock as u may have some conditions waiting. + // while an error occured and the broker has sent a close. + releaseLocks(); + handleChannelClose(_channelCloseBody); + return true; + } + else if (evt.getMethod() instanceof ChannelFlowOkBody) + { + _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod(); + _channelFlowNotResponded.signal(); + return true; + } + else if (evt.getMethod() instanceof ChannelFlowBody) + { + handleChannelFlow((ChannelFlowBody) evt.getMethod()); + return true; + } + else if (evt.getMethod() instanceof ChannelOkBody) + { + _channelOkBody = (ChannelOkBody) evt.getMethod(); + // In this case the only method expecting channel-ok is channel-resume + // haven't implemented ping and pong. + _channelNotResumed.signal(); + return true; + } + else + { + return false; + } + } + finally + { + _lock.unlock(); + } } - - private void handleChannelClose(ChannelCloseBody channelCloseBody) + + private void handleChannelClose(ChannelCloseBody channelCloseBody) + { + _currentState = AMQPState.CHANNEL_CLOSED; + // handle channel related cleanup + } + + private void releaseLocks() + { + if(_currentState == AMQPState.CHANNEL_NOT_OPENED) + { + _channelNotOpend.signal(); + _channelNotResumed.signal(); // It could be a channel.resume call + } + else if(_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND) { - try - { - _lock.lock(); - _currentState = AMQPState.CHANNEL_CLOSED; - } - finally - { - _lock.unlock(); - } + _channelFlowNotResponded.signal(); } - - private void handleChannelFlow(ChannelFlowBody channelFlowBody) + else if(_currentState == AMQPState.CHANNEL_CLOSED) { - _lock.lock(); - try - { - handleChannelFlowState(channelFlowBody.active); - } - finally - { - _lock.unlock(); - } + _channelNotResumed.signal(); } - - private void handleChannelFlowState(boolean flow) + } + + private void checkIfConnectionClosed()throws AMQPException + { + if (_channelCloseBody != null) { - _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND; + String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() + + " with reply code (" + _channelCloseBody.getReplyCode() + ") " + + "caused by class " + _channelCloseBody.getClassId() + + " and method " + _channelCloseBody.getMethod(); + + throw new AMQPException(error); } + } + + private void handleChannelFlow(ChannelFlowBody channelFlowBody) + { + _lock.lock(); + try + { + handleChannelFlowState(channelFlowBody.active); + } + finally + { + _lock.unlock(); + } + } + + private void handleChannelFlowState(boolean flow) + { + _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND; + } } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java new file mode 100644 index 0000000000..a3f9b2c24d --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java @@ -0,0 +1,255 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.framing.ChannelOkBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.framing.ConnectionOpenOkBody; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.framing.ExchangeDeleteOkBody; +import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageGetBody; +import org.apache.qpid.framing.MessageOffsetBody; +import org.apache.qpid.framing.MessageOkBody; +import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.framing.MessageQosBody; +import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.framing.MessageRejectBody; +import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.framing.QueueBindOkBody; +import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.QueuePurgeOkBody; +import org.apache.qpid.framing.QueueUnbindOkBody; +import org.apache.qpid.nclient.amqp.event.AMQPEventManager; +import org.apache.qpid.nclient.amqp.state.AMQPStateManager; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.DefaultPhaseContext; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.PhaseContext; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.transport.AMQPConnectionURL; +import org.apache.qpid.nclient.transport.ConnectionURL; +import org.apache.qpid.nclient.transport.TransportConnection; +import org.apache.qpid.nclient.transport.TransportConnectionFactory; +import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType; +import org.apache.qpid.url.URLSyntaxException; + +/** + * The Class Factory creates AMQP Class + * equivalents defined in the spec. + * + * There should one instance per connection. + * The factory class creates all the support + * classes and provides an instance of the + * AMQP class in ready-to-use state. + * + */ +public class AMQPClassFactory +{ + //Need an event manager per connection + private AMQPEventManager _eventManager = new QpidEventManager(); + + // Need a state manager per connection + private AMQPStateManager _stateManager = new QpidStateManager(); + + //Need a phase pipe per connection + private Phase _phase; + + //One instance per connection + private AMQPConnection _amqpConnection; + + public AMQPClassFactory() + { + + } + + public AMQPConnection createConnection(String urlStr,ConnectionType type)throws AMQPException, URLSyntaxException + { + AMQPConnectionURL url = new AMQPConnectionURL(urlStr); + return createConnectionClass(url,type); + } + + public AMQPConnection createConnectionClass(ConnectionURL url,ConnectionType type)throws AMQPException + { + if (_amqpConnection == null) + { + PhaseContext ctx = new DefaultPhaseContext(); + ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager); + + TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type,ctx); + _amqpConnection = new AMQPConnection(conn); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class,_amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class,_amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class,_amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class,_amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class,_amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class,_amqpConnection); + } + return _amqpConnection; + } + + public AMQPChannel createChannelClass(int channel)throws AMQPException + { + checkIfConnectionStarted(); + AMQPChannel amqpChannel = new AMQPChannel(channel,_phase); + _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class,amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelCloseBody.class,amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class,amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelFlowBody.class,amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class,amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelOkBody.class,amqpChannel); + return amqpChannel; + } + + public void destroyChannelClass(int channel,AMQPChannel amqpChannel)throws AMQPException + { + _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class,amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class,amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class,amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class,amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class,amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelOkBody.class,amqpChannel); + } + + public AMQPExchange createExchangeClass(int channel)throws AMQPException + { + checkIfConnectionStarted(); + AMQPExchange amqpExchange = new AMQPExchange(channel,_phase); + _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class,amqpExchange); + _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class,amqpExchange); + return amqpExchange; + } + + public void destoryExchangeClass(int channel, AMQPExchange amqpExchange)throws AMQPException + { + _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class,amqpExchange); + _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class,amqpExchange); + } + + public AMQPQueue createQueueClass(int channel)throws AMQPException + { + checkIfConnectionStarted(); + AMQPQueue amqpQueue = new AMQPQueue(channel,_phase); + _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class,amqpQueue); + _eventManager.addMethodEventListener(channel, QueueBindOkBody.class,amqpQueue); + _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class,amqpQueue); + _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class,amqpQueue); + _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class,amqpQueue); + return amqpQueue; + } + + public void destroyQueueClass(int channel,AMQPQueue amqpQueue)throws AMQPException + { + _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class,amqpQueue); + _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class,amqpQueue); + _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class,amqpQueue); + _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class,amqpQueue); + _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class,amqpQueue); + } + + public AMQPMessage createMessageClass(int channel,AMQPMessageCallBack messageCb)throws AMQPException + { + checkIfConnectionStarted(); + AMQPMessage amqpMessage = new AMQPMessage(channel,_phase,messageCb); + _eventManager.addMethodEventListener(channel, MessageAppendBody.class,amqpMessage); + _eventManager.addMethodEventListener(channel, MessageCancelBody.class,amqpMessage); + _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class,amqpMessage); + _eventManager.addMethodEventListener(channel, MessageCloseBody.class,amqpMessage); + _eventManager.addMethodEventListener(channel, MessageGetBody.class,amqpMessage); + _eventManager.addMethodEventListener(channel, MessageOffsetBody.class,amqpMessage); + _eventManager.addMethodEventListener(channel, MessageOkBody.class,amqpMessage); + _eventManager.addMethodEventListener(channel, MessageOpenBody.class,amqpMessage); + _eventManager.addMethodEventListener(channel, MessageRecoverBody.class,amqpMessage); + _eventManager.addMethodEventListener(channel, MessageRejectBody.class,amqpMessage); + _eventManager.addMethodEventListener(channel, MessageResumeBody.class,amqpMessage); + _eventManager.addMethodEventListener(channel, MessageQosBody.class,amqpMessage); + _eventManager.addMethodEventListener(channel, MessageTransferBody.class,amqpMessage); + + return amqpMessage; + } + + public void destoryMessageClass(int channel,AMQPMessage amqpMessage)throws AMQPException + { + _eventManager.removeMethodEventListener(channel, MessageAppendBody.class,amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageCancelBody.class,amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class,amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageCloseBody.class,amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageGetBody.class,amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class,amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageOkBody.class,amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageOpenBody.class,amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class,amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageRejectBody.class,amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageResumeBody.class,amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageQosBody.class,amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageTransferBody.class,amqpMessage); + } + + //This class should register as a state listener for AMQPConnection + private void checkIfConnectionStarted() throws AMQPException + { + if (_phase == null) + { + _phase = _amqpConnection.getPhasePipe(); + + if (_phase == null) + { + throw new AMQPException("Cannot create a channel until connection is ready"); + } + } + } + + /** + * Extention point + * Other interested parties can obtain a reference to the event manager + * and add listeners to get notified of events + * + */ + public AMQPEventManager getEventManager() + { + return _eventManager; + } + + /** + * Extention point + * Other interested parties can obtain a reference to the state manager + * and add listeners to get notified of state changes + * + */ + public AMQPStateManager getStateManager() + { + return null; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java index 9c9e913cd3..aea25a403b 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java @@ -37,14 +37,14 @@ import org.apache.qpid.framing.ConnectionStartBody; import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.ConnectionTuneBody; import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; import org.apache.qpid.nclient.amqp.state.AMQPState; import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; import org.apache.qpid.nclient.config.ClientConfiguration; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.Phase; import org.apache.qpid.nclient.core.QpidConstants; -import org.apache.qpid.nclient.model.AMQPMethodEvent; -import org.apache.qpid.nclient.model.AMQPMethodListener; import org.apache.qpid.nclient.transport.TransportConnection; import org.apache.qpid.nclient.util.AMQPValidator; @@ -65,9 +65,8 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen private AMQPState _currentState; - private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED, - AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, - AMQPState.CONNECTION_OPEN, }; + private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, + AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, AMQPState.CONNECTION_OPEN, }; // The wait period until a server sends a respond private long _serverTimeOut = 1000; @@ -93,8 +92,10 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen private ConnectionOpenOkBody _connectionOpenOkBody; private ConnectionCloseOkBody _connectionCloseOkBody; + + private ConnectionCloseBody _connectionCloseBody; - public AMQPConnection(TransportConnection connection) + protected AMQPConnection(TransportConnection connection) { _connection = connection; _currentState = AMQPState.CONNECTION_UNDEFINED; @@ -102,12 +103,14 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen } /** - * ------------------------------------------- API Methods -------------------------------------------- - */ + * ------------------------------------------- + * API Methods + * -------------------------------------------- + */ /** - * Opens the TCP connection and let the formalities begin. - */ + * Opens the TCP connection and let the formalities begin. + */ public ConnectionStartBody openTCPConnection() throws AMQPException { _lock.lock(); @@ -119,15 +122,17 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen _phase = _connection.connect(); // waiting for ConnectionStartBody or error in connection - _connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS); - AMQPValidator.throwExceptionOnNull(_connectionStartBody, - "The broker didn't send the ConnectionStartBody in time"); + //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotStarted.await(); + + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time"); _currentState = AMQPState.CONNECTION_NOT_STARTED; return _connectionStartBody; } - catch (Exception e) + catch (InterruptedException e) { - throw new AMQPException("XXX"); + throw new AMQPException("Error opening connection to broker", e); } finally { @@ -135,25 +140,43 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen } } - public ConnectionSecureBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException + /** + * The current java broker implementation can send a connection tune body + * as a response to the startOk. Not sure if that is the correct behaviour. + */ + public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException { _lock.lock(); try { _connectionSecureBody = null; - checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, - AMQPState.CONNECTION_NOT_SECURE); + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE); AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId); _phase.messageSent(msg); - _connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS); - AMQPValidator.throwExceptionOnNull(_connectionSecureBody, - "The broker didn't send the ConnectionSecureBody in time"); - _currentState = AMQPState.CONNECTION_NOT_SECURE; - return _connectionSecureBody; + //_connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotSecure.await(); + //AMQPValidator.throwExceptionOnNull(_connectionSecureBody, "The broker didn't send the ConnectionSecureBody in time"); + //_currentState = AMQPState.CONNECTION_NOT_SECURE; + + checkIfConnectionClosed(); + if (_connectionTuneBody != null) + { + _currentState = AMQPState.CONNECTION_NOT_TUNED; + return _connectionTuneBody; + } + else if (_connectionSecureBody != null) + { // oops the server sent another challenge + _currentState = AMQPState.CONNECTION_NOT_SECURE; + return _connectionSecureBody; + } + else + { + throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); + } } - catch (Exception e) + catch (InterruptedException e) { - throw new AMQPException("XXX"); + throw new AMQPException("Error in connection.startOk", e); } finally { @@ -162,9 +185,9 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen } /** - * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could - * issue a new challenge - */ + * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could + * issue a new challenge + */ public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException { _lock.lock(); @@ -173,11 +196,15 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen _connectionTuneBody = null; _connectionSecureBody = null; checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED); + _connectionSecureBody = null; // The server could send a fresh challenge - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, - _correlationId); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId); _phase.messageSent(msg); - _connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS); + + //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotTuned.await(); + checkIfConnectionClosed(); + if (_connectionTuneBody != null) { _currentState = AMQPState.CONNECTION_NOT_TUNED; @@ -193,9 +220,9 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); } } - catch (Exception e) + catch (InterruptedException e) { - throw new AMQPException("XXX"); + throw new AMQPException("Error in connection.secureOk", e); } finally { @@ -214,10 +241,6 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen _phase.messageSent(msg); _currentState = AMQPState.CONNECTION_NOT_OPENED; } - catch (Exception e) - { - throw new AMQPException("XXX"); - } finally { _lock.unlock(); @@ -229,20 +252,26 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen _lock.lock(); try { + // If the broker sends a connection close due to an error with the + // Connection tune ok, then this call will verify that + checkIfConnectionClosed(); + _connectionOpenOkBody = null; checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, - QpidConstants.EMPTY_CORRELATION_ID); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID); _phase.messageSent(msg); - _connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS); - AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, - "The broker didn't send the ConnectionOpenOkBody in time"); + + //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotOpened.await(); + + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time"); _currentState = AMQPState.CONNECTION_OPEN; return _connectionOpenOkBody; } - catch (Exception e) + catch (InterruptedException e) { - throw new AMQPException("XXX"); + throw new AMQPException("Error in connection.open", e); } finally { @@ -257,18 +286,16 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen { _connectionCloseOkBody = null; checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, - QpidConstants.EMPTY_CORRELATION_ID); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID); _phase.messageSent(msg); _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); - AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, - "The broker didn't send the ConnectionCloseOkBody in time"); + AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time"); _currentState = AMQPState.CONNECTION_CLOSED; return _connectionCloseOkBody; } - catch (Exception e) + catch (InterruptedException e) { - throw new AMQPException("XXX"); + throw new AMQPException("Error in connection.close", e); } finally { @@ -277,72 +304,117 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen } /** - * ------------------------------------------- AMQMethodListener methods - * -------------------------------------------- - */ + * ------------------------------------------- AMQMethodListener methods + * -------------------------------------------- + */ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException { - _correlationId = evt.getCorrelationId(); + _lock.lock(); + try + { + _correlationId = evt.getCorrelationId(); - if (evt.getMethod() instanceof ConnectionStartBody) + if (evt.getMethod() instanceof ConnectionStartBody) + { + _connectionStartBody = (ConnectionStartBody) evt.getMethod(); + _connectionNotStarted.signalAll(); + return true; + } + else if (evt.getMethod() instanceof ConnectionSecureBody) + { + _connectionSecureBody = (ConnectionSecureBody) evt.getMethod(); + _connectionNotSecure.signal(); + _connectionNotTuned.signal(); // in case the server has sent another chanllenge + return true; + } + else if (evt.getMethod() instanceof ConnectionTuneBody) + { + _connectionTuneBody = (ConnectionTuneBody) evt.getMethod(); + _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk + _connectionNotTuned.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionOpenOkBody) + { + _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod(); + _connectionNotOpened.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionCloseOkBody) + { + _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod(); + _connectionNotClosed.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionCloseBody) + { + _connectionCloseBody = (ConnectionCloseBody)evt.getMethod(); + // release the correct lock as u may have some conditions waiting. + // while an error occured and the broker has sent a close. + releaseLocks(); + handleClose(); + return true; + } + else + { + return false; + } + } + finally { - _connectionStartBody = (ConnectionStartBody) evt.getMethod(); - _connectionNotStarted.signal(); - return true; + _lock.unlock(); } - else if (evt.getMethod() instanceof ConnectionSecureBody) + } + + private void handleClose() throws AMQPException + { + try + { + _currentState = AMQPState.CONNECTION_CLOSING; + // do the required cleanup and send a ConnectionCloseOkBody + } + catch (Exception e) { - _connectionSecureBody = (ConnectionSecureBody) evt.getMethod(); - _connectionNotSecure.signal(); - _connectionNotTuned.signal(); // in case the server has sent another chanllenge - return true; + throw new AMQPException("Error handling connection.close from broker", e); } - else if (evt.getMethod() instanceof ConnectionTuneBody) + } + + private void checkIfConnectionClosed()throws AMQPException + { + if (_connectionCloseBody != null) { - _connectionTuneBody = (ConnectionTuneBody) evt.getMethod(); - _connectionNotTuned.signal(); - return true; + String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + + " with reply code (" + _connectionCloseBody.getReplyCode() + ") " + + "caused by class " + _connectionCloseBody.getClassId() + + " and method " + _connectionCloseBody.getMethod(); + + throw new AMQPException(error); } - else if (evt.getMethod() instanceof ConnectionOpenOkBody) + } + + private void releaseLocks() + { + if(_currentState == AMQPState.CONNECTION_NOT_OPENED) { - _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod(); _connectionNotOpened.signal(); - return true; } - else if (evt.getMethod() instanceof ConnectionCloseOkBody) + else if(_currentState == AMQPState.CONNECTION_UNDEFINED) { - _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod(); - _connectionNotClosed.signal(); - return true; + _connectionNotStarted.signal(); } - else if (evt.getMethod() instanceof ConnectionCloseBody) + else if(_currentState == AMQPState.CONNECTION_NOT_STARTED) { - handleClose(); - return true; + _connectionNotSecure.signal(); } - else + else if(_currentState == AMQPState.CONNECTION_NOT_SECURE) { - return false; + _connectionNotTuned.signal(); } } - public void handleClose() throws AMQPException + public Phase getPhasePipe() { - _lock.lock(); - try - { - checkIfValidStateTransition(AMQPState.CONNECTION_OPEN, _currentState, AMQPState.CONNECTION_CLOSING); - _currentState = AMQPState.CONNECTION_CLOSING; - // do the required cleanup and send a ConnectionCloseOkBody - } - catch (Exception e) - { - throw new AMQPException("XXX"); - } - finally - { - _lock.unlock(); - } + return _phase; } }
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java index 5315f7f318..35db9c6a75 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java @@ -26,10 +26,10 @@ import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.ExchangeDeclareOkBody; import org.apache.qpid.framing.ExchangeDeleteBody; import org.apache.qpid.framing.ExchangeDeleteOkBody; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.Phase; -import org.apache.qpid.nclient.model.AMQPMethodEvent; -import org.apache.qpid.nclient.model.AMQPMethodListener; /** * @@ -43,7 +43,7 @@ public class AMQPExchange extends AMQPCallBackSupport implements AMQPMethodListe { private Phase _phase; - public AMQPExchange(int channelId,Phase phase) + protected AMQPExchange(int channelId,Phase phase) { super(channelId); _phase = phase; diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java index e3ad9d6306..1b86108411 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java @@ -25,6 +25,7 @@ import org.apache.qpid.framing.MessageAppendBody; import org.apache.qpid.framing.MessageCancelBody; import org.apache.qpid.framing.MessageCheckpointBody; import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageConsumeBody; import org.apache.qpid.framing.MessageEmptyBody; import org.apache.qpid.framing.MessageGetBody; import org.apache.qpid.framing.MessageOffsetBody; @@ -35,10 +36,10 @@ import org.apache.qpid.framing.MessageRecoverBody; import org.apache.qpid.framing.MessageRejectBody; import org.apache.qpid.framing.MessageResumeBody; import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.Phase; -import org.apache.qpid.nclient.model.AMQPMethodEvent; -import org.apache.qpid.nclient.model.AMQPMethodListener; /** * This class represents the AMQP Message class. @@ -59,7 +60,7 @@ public class AMQPMessage extends AMQPCallBackSupport implements AMQPMethodListen private Phase _phase; private AMQPMessageCallBack _messageCb; - public AMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb) + protected AMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb) { super(channelId); _phase = phase; @@ -78,9 +79,9 @@ public class AMQPMessage extends AMQPCallBackSupport implements AMQPMethodListen _phase.messageSent(msg); } - public void consume(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException + public void consume(MessageConsumeBody messageConsumeBody,AMQPCallBack cb) throws AMQPException { - AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb); + AMQPMethodEvent msg = handleAsynchronousCall(messageConsumeBody,cb); _phase.messageSent(msg); } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java index 183ed9dba8..c10d6975c6 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java @@ -21,10 +21,8 @@ package org.apache.qpid.nclient.amqp; import org.apache.qpid.framing.MessageAppendBody; -import org.apache.qpid.framing.MessageCancelBody; import org.apache.qpid.framing.MessageCheckpointBody; import org.apache.qpid.framing.MessageCloseBody; -import org.apache.qpid.framing.MessageGetBody; import org.apache.qpid.framing.MessageOpenBody; import org.apache.qpid.framing.MessageRecoverBody; import org.apache.qpid.framing.MessageResumeBody; diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java index a5fe6de298..cfe87cb3eb 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java @@ -31,10 +31,10 @@ import org.apache.qpid.framing.QueuePurgeBody; import org.apache.qpid.framing.QueuePurgeOkBody; import org.apache.qpid.framing.QueueUnbindBody; import org.apache.qpid.framing.QueueUnbindOkBody; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.Phase; -import org.apache.qpid.nclient.model.AMQPMethodEvent; -import org.apache.qpid.nclient.model.AMQPMethodListener; /** * @@ -48,7 +48,7 @@ public class AMQPQueue extends AMQPCallBackSupport implements AMQPMethodListener { private Phase _phase; - public AMQPQueue(int channelId,Phase phase) + protected AMQPQueue(int channelId,Phase phase) { super(channelId); _phase = phase; diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java deleted file mode 100644 index 38421cfca3..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.log4j.Logger; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.model.AMQPMethodEvent; -import org.apache.qpid.nclient.model.AMQPMethodListener; - -/** - * This class registeres with the ModelPhase as a AMQMethodListener, - * to receive method events and then it distributes methods to other listerners - * using a filtering criteria. The criteria is channel id and method body class. - * The method listeners are added and removed dynamically - * - * <p/> - */ -public class EventManager implements AMQPMethodListener -{ - private static final Logger _logger = Logger.getLogger(EventManager.class); - - private Map <Integer,Map> _channelMap = new ConcurrentHashMap<Integer,Map>(); - - /** - * ------------------------------------------------ - * methods introduced by AMQPMethodEventManager - * ------------------------------------------------ - */ - public void addMethodEventListener(int channelId,Class clazz,AMQPMethodListener l) - { - Map<Class,List> _methodListenerMap; - if (_channelMap.containsKey(channelId)) - { - _methodListenerMap = _channelMap.get(channelId); - - } - else - { - _methodListenerMap = new ConcurrentHashMap<Class,List>(); - _channelMap.put(channelId, _methodListenerMap); - } - - List<AMQPMethodListener> _listeners; - if (_methodListenerMap.containsKey(clazz)) - { - _listeners = _methodListenerMap.get(clazz); - - } - else - { - _listeners = new ArrayList<AMQPMethodListener>(); - _methodListenerMap.put(clazz, _listeners); - } - - _listeners.add(l); - - } - - public void removeMethodEventListener(int channelId,Class clazz,AMQPMethodListener l) - { - if (_channelMap.containsKey(channelId)) - { - Map<Class,List> _methodListenerMap = _channelMap.get(channelId); - - if (_methodListenerMap.containsKey(clazz)) - { - List<AMQPMethodListener> _listeners = _methodListenerMap.get(clazz); - _listeners.remove(l); - } - - } - } - - - /** - * ------------------------------------------------ - * methods introduced by AMQMethodListener - * ------------------------------------------------ - */ - /* (non-Javadoc) - * @see org.apache.qpid.nclient.model.AMQStateManager#methodReceived(org.apache.qpid.protocol.AMQMethodEvent) - */ - public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException - { - if (_channelMap.containsKey(evt.getChannelId())) - { - Map<Class,List> _methodListenerMap = _channelMap.get(evt.getChannelId()); - - if (_methodListenerMap.containsKey(evt.getMethod().getClass())) - { - - List<AMQPMethodListener> _listeners = _methodListenerMap.get(evt.getMethod().getClass()); - for (AMQPMethodListener l:_listeners) - { - l.methodReceived(evt); - } - - return (_listeners.size()>0); - } - - } - - return false; - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java new file mode 100644 index 0000000000..9655da6912 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java @@ -0,0 +1,123 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.nclient.amqp.event.AMQPEventManager; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; +import org.apache.qpid.nclient.core.AMQPException; + +/** + * This class registeres with the ModelPhase as a AMQMethodListener, + * to receive method events and then it distributes methods to other listerners + * using a filtering criteria. The criteria is channel id and method body class. + * The method listeners are added and removed dynamically + * + * <p/> + */ +public class QpidEventManager implements AMQPEventManager +{ + private static final Logger _logger = Logger.getLogger(QpidEventManager.class); + + private Map<Integer, Map> _channelMap = new ConcurrentHashMap<Integer, Map>(); + + /** + * ------------------------------------------------ + * methods introduced by AMQEventManager + * ------------------------------------------------ + */ + public void addMethodEventListener(int channelId, Class clazz, AMQPMethodListener l) + { + Map<Class, List> _methodListenerMap; + if (_channelMap.containsKey(channelId)) + { + _methodListenerMap = _channelMap.get(channelId); + + } + else + { + _methodListenerMap = new ConcurrentHashMap<Class, List>(); + _channelMap.put(channelId, _methodListenerMap); + } + + List<AMQPMethodListener> _listeners; + if (_methodListenerMap.containsKey(clazz)) + { + _listeners = _methodListenerMap.get(clazz); + } + else + { + _listeners = new ArrayList<AMQPMethodListener>(); + _methodListenerMap.put(clazz, _listeners); + } + + _listeners.add(l); + + } + + public void removeMethodEventListener(int channelId, Class clazz, AMQPMethodListener l) + { + if (_channelMap.containsKey(channelId)) + { + Map<Class, List> _methodListenerMap = _channelMap.get(channelId); + + if (_methodListenerMap.containsKey(clazz)) + { + List<AMQPMethodListener> _listeners = _methodListenerMap.get(clazz); + _listeners.remove(l); + } + + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.model.AMQStateManager#methodReceived(org.apache.qpid.protocol.AMQMethodEvent) + */ + public <B extends AMQMethodBody> boolean notifyEvent(AMQPMethodEvent<B> evt) throws AMQPException + { + if (_channelMap.containsKey(evt.getChannelId())) + { + Map<Class, List> _methodListenerMap = _channelMap.get(evt.getChannelId()); + + if (_methodListenerMap.containsKey(evt.getMethod().getClass())) + { + + List<AMQPMethodListener> _listeners = _methodListenerMap.get(evt.getMethod().getClass()); + for (AMQPMethodListener l : _listeners) + { + l.methodReceived(evt); + } + + return (_listeners.size() > 0); + } + + } + + return false; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java new file mode 100644 index 0000000000..e49524ee30 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.AMQException; +import org.apache.qpid.nclient.amqp.state.AMQPStateListener; +import org.apache.qpid.nclient.amqp.state.AMQPStateManager; + +public class QpidStateManager implements AMQPStateManager +{ + + public void addListener(AMQPStateListener l) throws AMQException + { + // TODO Auto-generated method stub + + } + + public void removeListener(AMQPStateListener l) throws AMQException + { + // TODO Auto-generated method stub + + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java new file mode 100644 index 0000000000..f682659f9e --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java @@ -0,0 +1,11 @@ +package org.apache.qpid.nclient.amqp.event; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.nclient.core.AMQPException; + +public interface AMQPEventManager +{ + public void addMethodEventListener(int channelId, Class clazz, AMQPMethodListener l); + public void removeMethodEventListener(int channelId, Class clazz, AMQPMethodListener l); + public <B extends AMQMethodBody> boolean notifyEvent(AMQPMethodEvent<B> evt) throws AMQPException; +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java new file mode 100644 index 0000000000..c6641890e0 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java @@ -0,0 +1,78 @@ +package org.apache.qpid.nclient.amqp.event; + +import org.apache.qpid.framing.AMQMethodBody; + +/** + * This class is exactly the same as the AMQMethod event. + * Except I renamed requestId to corelationId, so I could use it both ways. + * + * I didn't want to modify anything in common so that there is no + * impact on the existing code. + * + */ +public class AMQPMethodEvent<M extends AMQMethodBody> +{ + + private final M _method; + + private final int _channelId; + + /** + * This is the rquest id from the broker when it sent me a request + * when I respond I remember this id and copy this to the outgoing + * response. + */ + private final long _correlationId; + + /** + * I could use _correlationId, bcos when I send a request + * this field is blank and is only used internally. But I + * used a seperate field to make it more clear. + */ + private long _localCorrletionId = 0; + + public AMQPMethodEvent(int channelId, M method, long correlationId, long localCorrletionId) + { + _channelId = channelId; + _method = method; + _correlationId = correlationId; + _localCorrletionId = localCorrletionId; + } + + public AMQPMethodEvent(int channelId, M method, long correlationId) + { + _channelId = channelId; + _method = method; + _correlationId = correlationId; + } + + public M getMethod() + { + return _method; + } + + public int getChannelId() + { + return _channelId; + } + + public long getCorrelationId() + { + return _correlationId; + } + + public long getLocalCorrelationId() + { + return _localCorrletionId; + } + + public String toString() + { + StringBuilder buf = new StringBuilder("Method event: \n"); + buf.append("Channel id: \n").append(_channelId); + buf.append("Method: \n").append(_method); + buf.append("Request Id: ").append(_correlationId); + buf.append("Local Correlation Id: ").append(_localCorrletionId); + return buf.toString(); + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java index 52b9f6de91..e77a38121c 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java @@ -1,4 +1,4 @@ -package org.apache.qpid.nclient.model; +package org.apache.qpid.nclient.amqp.event; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.nclient.core.AMQPException; diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java new file mode 100644 index 0000000000..528b47beb4 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java @@ -0,0 +1,57 @@ +package org.apache.qpid.nclient.amqp.sample; + +import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.nclient.amqp.AMQPMessageCallBack; +import org.apache.qpid.nclient.core.AMQPException; + +public class MessageHelper implements AMQPMessageCallBack +{ + + public void append(MessageAppendBody messageAppendBody, long correlationId) throws AMQPException + { + // TODO Auto-generated method stub + + } + + public void checkpoint(MessageCheckpointBody messageCheckpointBody, long correlationId) throws AMQPException + { + // TODO Auto-generated method stub + + } + + public void close(MessageCloseBody messageCloseBody, long correlationId) throws AMQPException + { + // TODO Auto-generated method stub + + } + + public void open(MessageOpenBody messageOpenBody, long correlationId) throws AMQPException + { + // TODO Auto-generated method stub + + } + + public void recover(MessageRecoverBody messageRecoverBody, long correlationId) throws AMQPException + { + // TODO Auto-generated method stub + + } + + public void resume(MessageResumeBody messageResumeBody, long correlationId) throws AMQPException + { + // TODO Auto-generated method stub + + } + + public void transfer(MessageTransferBody messageTransferBody, long correlationId) throws AMQPException + { + System.out.println("The Broker has sent a message" + messageTransferBody.toString()); + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java index 69d2564112..166d565f81 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java @@ -1,25 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.nclient.amqp.sample; import java.util.StringTokenizer; +import java.util.UUID; import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; +import org.apache.qpid.AMQException; import org.apache.qpid.common.ClientProperties; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.framing.ChannelOpenBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.ConnectionOpenBody; +import org.apache.qpid.framing.ConnectionOpenOkBody; import org.apache.qpid.framing.ConnectionSecureBody; import org.apache.qpid.framing.ConnectionSecureOkBody; import org.apache.qpid.framing.ConnectionStartBody; import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.ConnectionTuneBody; import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.framing.Content; +import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; +import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageConsumeBody; +import org.apache.qpid.framing.MessageGetBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.QueuePurgeBody; +import org.apache.qpid.framing.QueuePurgeOkBody; +import org.apache.qpid.nclient.amqp.AMQPCallBack; +import org.apache.qpid.nclient.amqp.AMQPChannel; +import org.apache.qpid.nclient.amqp.AMQPClassFactory; import org.apache.qpid.nclient.amqp.AMQPConnection; +import org.apache.qpid.nclient.amqp.AMQPExchange; +import org.apache.qpid.nclient.amqp.AMQPMessage; +import org.apache.qpid.nclient.amqp.AMQPQueue; import org.apache.qpid.nclient.transport.AMQPConnectionURL; import org.apache.qpid.nclient.transport.ConnectionURL; -import org.apache.qpid.nclient.transport.TransportConnection; -import org.apache.qpid.nclient.transport.TransportConnectionFactory; import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType; /** @@ -27,26 +74,34 @@ import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionTy * Notes this is just a simple demo. * * I have used Helper classes to keep the code cleaner. + * Will break this into unit tests later on */ + +@SuppressWarnings("unused") public class TestClient { - private byte major; - private byte minor; + private byte _major; + private byte _minor; private ConnectionURL _url; + private static int _channel = 2; + // Need a Class factory per connection + private AMQPClassFactory _classFactory = new AMQPClassFactory(); + private int _ticket; public AMQPConnection openConnection() throws Exception { - _url = new AMQPConnectionURL(""); - TransportConnection conn = TransportConnectionFactory.createTransportConnection(_url, ConnectionType.VM); - return new AMQPConnection(conn); + //_url = new AMQPConnectionURL("amqp://guest:guest@test/localhost?brokerlist='vm://:3'"); + + _url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'"); + return _classFactory.createConnectionClass(_url,ConnectionType.TCP); } - public void handleProtocolNegotiation(AMQPConnection con) throws Exception + public void handleConnectionNegotiation(AMQPConnection con) throws Exception { // ConnectionStartBody ConnectionStartBody connectionStartBody = con.openTCPConnection(); - major = connectionStartBody.getMajor(); - minor = connectionStartBody.getMajor(); + _major = connectionStartBody.getMajor(); + _minor = connectionStartBody.getMinor(); FieldTable clientProperties = FieldTableFactory.newFieldTable(); clientProperties.put(new AMQShortString(ClientProperties.instance.toString()),"Test"); // setting only the client id @@ -61,33 +116,335 @@ public class TestClient null, SecurityHelper.createCallbackHandler(mechanism,_url)); ConnectionStartOkBody connectionStartOkBody = - ConnectionStartOkBody.createMethodBody(major, minor, clientProperties, + ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties, new AMQShortString(tokenizer.nextToken()), new AMQShortString(mechanism), (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null)); // ConnectionSecureBody - ConnectionSecureBody connectionSecureBody = con.startOk(connectionStartOkBody); + AMQMethodBody body = con.startOk(connectionStartOkBody); + ConnectionTuneBody connectionTuneBody; - ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody( - major,minor,sc.evaluateChallenge(connectionSecureBody.getChallenge())); + if (body instanceof ConnectionSecureBody) + { + ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody)body; + ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody( + _major,_minor,sc.evaluateChallenge(connectionSecureBody.getChallenge())); + //Assuming the server is not going to send another challenge + connectionTuneBody = (ConnectionTuneBody)con.secureOk(connectionSecureOkBody); + + } + else + { + connectionTuneBody = (ConnectionTuneBody)body; + } - // Assuming the server is not going to send another challenge - ConnectionTuneBody connectionTuneBody = (ConnectionTuneBody)con.secureOk(connectionSecureOkBody); // Using broker supplied values ConnectionTuneOkBody connectionTuneOkBody = - ConnectionTuneOkBody.createMethodBody(major,minor, + ConnectionTuneOkBody.createMethodBody(_major,_minor, connectionTuneBody.getChannelMax(), connectionTuneBody.getFrameMax(), connectionTuneBody.getHeartbeat()); con.tuneOk(connectionTuneOkBody); + + ConnectionOpenBody connectionOpenBody = + ConnectionOpenBody.createMethodBody(_major,_minor,null, true,new AMQShortString(_url.getVirtualHost())); + + ConnectionOpenOkBody connectionOpenOkBody = con.open(connectionOpenBody); + } + + public void handleChannelNegotiation() throws Exception + { + AMQPChannel channel = _classFactory.createChannelClass(_channel); + + ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1")); + ChannelOpenOkBody channelOpenOkBody = channel.open(channelOpenBody); + + //lets have some fun + ChannelFlowBody channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, false); + + ChannelFlowOkBody channelFlowOkBody = channel.flow(channelFlowBody); + System.out.println("Channel is " + (channelFlowOkBody.getActive()? "active" : "suspend")); + + channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, true); + channelFlowOkBody = channel.flow(channelFlowBody); + System.out.println("Channel is " + (channelFlowOkBody.getActive()? "active" : "suspend")); + } + + public void createExchange() throws Exception + { + AMQPExchange exchange = _classFactory.createExchangeClass(_channel); + + ExchangeDeclareBody exchangeDeclareBody = + ExchangeDeclareBody.createMethodBody(_major, _minor, + null, // arguments + false,//auto delete + false,// durable + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME), + true, //internal + false,// nowait + false,// passive + _ticket, + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)); + + AMQPCallBack cb = createCallBackWithMessage("Broker has created the exchange"); + exchange.declare(exchangeDeclareBody, cb); + // Blocking for response + while (!cb.isComplete()){} + } + + + public void createAndBindQueue()throws Exception + { + AMQPQueue queue = _classFactory.createQueueClass(_channel); + + QueueDeclareBody queueDeclareBody = + QueueDeclareBody.createMethodBody(_major, _minor, + null, //arguments + false,//auto delete + false,// durable + false, //exclusive, + false, //nowait, + false, //passive, + new AMQShortString("MyTestQueue"), + 0); + + AMQPCallBack cb = new AMQPCallBack(){ + + @Override + public void brokerResponded(AMQMethodBody body) + { + QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody)body; + System.out.println("[Broker has created the queue, " + + "message count " + queueDeclareOkBody.getMessageCount() + + "consumer count " + queueDeclareOkBody.getConsumerCount() + "]\n"); + } + + @Override + public void brokerRespondedWithError(AMQException e) + { + } + + }; + + queue.declare(queueDeclareBody, cb); + //Blocking for response + while (!cb.isComplete()){} + + QueueBindBody queueBindBody = + QueueBindBody.createMethodBody(_major, _minor, + null, //arguments + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),//exchange + false, //nowait + new AMQShortString("MyTestQueue"), //queue + new AMQShortString("RH"), //routingKey + 0 //ticket + ); + + cb = createCallBackWithMessage("Broker has bound the queue"); + queue.bind(queueBindBody, cb); + //Blocking for response + while (!cb.isComplete()){} + } + + public void purgeQueue()throws Exception + { + AMQPQueue queue = _classFactory.createQueueClass(_channel); + + QueuePurgeBody queuePurgeBody = + QueuePurgeBody.createMethodBody(_major, _minor, + false, //nowait + new AMQShortString("MyTestQueue"), //queue + 0 //ticket + ); + + AMQPCallBack cb = new AMQPCallBack(){ + + @Override + public void brokerResponded(AMQMethodBody body) + { + QueuePurgeOkBody queuePurgeOkBody = (QueuePurgeOkBody)body; + System.out.println("[Broker has purged the queue, message count " + queuePurgeOkBody.getMessageCount() + "]\n"); + } + + @Override + public void brokerRespondedWithError(AMQException e) + { + } + + }; + + queue.purge(queuePurgeBody, cb); + //Blocking for response + while (!cb.isComplete()){} + } + + public void deleteQueue()throws Exception + { + AMQPQueue queue = _classFactory.createQueueClass(_channel); + + QueueDeleteBody queueDeleteBody = + QueueDeleteBody.createMethodBody(_major, _minor, + false, //ifEmpty + false, //ifUnused + false, //nowait + new AMQShortString("MyTestQueue"), //queue + 0 //ticket + ); + + AMQPCallBack cb = new AMQPCallBack(){ + + @Override + public void brokerResponded(AMQMethodBody body) + { + QueueDeleteOkBody queueDeleteOkBody = (QueueDeleteOkBody)body; + System.out.println("[Broker has deleted the queue, message count " + queueDeleteOkBody.getMessageCount() + "]\n"); + } + @Override + public void brokerRespondedWithError(AMQException e) + { + } + + }; + + queue.delete(queueDeleteBody, cb); + //Blocking for response + while (!cb.isComplete()){} + + } + + public void publishAndSubscribe() throws Exception + { + AMQPMessage message = _classFactory.createMessageClass(_channel,new MessageHelper()); + MessageConsumeBody messageConsumeBody = MessageConsumeBody.createMethodBody(_major, _minor, + new AMQShortString("myClient"),// destination + false, //exclusive + null, //filter + false, //noAck, + false, //noLocal, + new AMQShortString("MyTestQueue"), //queue + 0 //ticket + ); + + AMQPCallBack cb = createCallBackWithMessage("Broker has accepted the consume"); + message.consume(messageConsumeBody, cb); + //Blocking for response + while (!cb.isComplete()){} + + // Sending 5 messages serially + for (int i=0; i<5; i++) + { + cb = createCallBackWithMessage("Broker has accepted msg " + i); + message.transfer(createMessages("Test" + i),cb); + while (!cb.isComplete()){} + } + + MessageCancelBody messageCancelBody = MessageCancelBody.createMethodBody(_major, _minor, new AMQShortString("myClient")); + + AMQPCallBack cb2 = createCallBackWithMessage("Broker has accepted the consume cancel"); + message.cancel(messageCancelBody, cb2); + + } + + private MessageTransferBody createMessages(String content) throws Exception + { + FieldTable headers = FieldTableFactory.newFieldTable(); + headers.setAsciiString(new AMQShortString("Test"), System.currentTimeMillis() + ""); + + MessageTransferBody messageTransferBody = + MessageTransferBody.createMethodBody(_major, _minor, + new AMQShortString("testApp"), //appId + headers, //applicationHeaders + new Content(Content.TypeEnum.INLINE_T,content.getBytes()), //body + new AMQShortString(""), //contentEncoding, + new AMQShortString("text/plain"), //contentType + new AMQShortString("testApp"), //correlationId + (short)1, //deliveryMode non persistant + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange + 0l, //expiration + false, //immediate + false, //mandatory + new AMQShortString(UUID.randomUUID().toString()), //messageId + (short)0, //priority + false, //redelivered + new AMQShortString("RH"), //replyTo + new AMQShortString("RH"), //routingKey, + "abc".getBytes(), //securityToken + 0, //ticket + System.currentTimeMillis(), //timestamp + new AMQShortString(""), //transactionId + 0l, //ttl, + new AMQShortString("Hello") //userId + ); + + return messageTransferBody; + + } + + public void publishAndGet() throws Exception + { + AMQPMessage message = _classFactory.createMessageClass(_channel,new MessageHelper()); + AMQPCallBack cb = createCallBackWithMessage("Broker has accepted msg 5"); + + MessageGetBody messageGetBody = + MessageGetBody.createMethodBody(_major, _minor, + new AMQShortString("myClient"), + false, //noAck + new AMQShortString("MyTestQueue"), //queue + 0 //ticket + ); + + //AMQPMessage message = _classFactory.createMessage(_channel,new MessageHelper()); + message.transfer(createMessages("Test"),cb); + while(!cb.isComplete()){} + + cb = createCallBackWithMessage("Broker has accepted get"); + message.get(messageGetBody, cb); + } + + // Creates a gneric call back and prints the given message + private AMQPCallBack createCallBackWithMessage(final String msg) + { + AMQPCallBack cb = new AMQPCallBack(){ + + @Override + public void brokerResponded(AMQMethodBody body) + { + System.out.println(msg); + } + + @Override + public void brokerRespondedWithError(AMQException e) + { + } + + }; + + return cb; + } + public static void main(String[] args) { TestClient test = new TestClient(); - AMQPConnection con = test.openConnection(); - + try + { + AMQPConnection con = test.openConnection(); + test.handleConnectionNegotiation(con); + test.handleChannelNegotiation(); + test.createExchange(); + test.createAndBindQueue(); + test.publishAndSubscribe(); + test.purgeQueue(); + test.publishAndGet(); + test.deleteQueue(); + } + catch (Exception e) + { + e.printStackTrace(); + } } } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java index 1c3bb788a0..7bc77e02c0 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java @@ -4,6 +4,9 @@ import java.io.FileInputStream; import java.io.InputStream; import java.util.Collection; import java.util.List; +import java.util.TreeMap; + +import javax.security.sasl.SaslClientFactory; import org.apache.commons.configuration.CombinedConfiguration; import org.apache.commons.configuration.ConfigurationException; @@ -11,6 +14,7 @@ import org.apache.commons.configuration.SystemConfiguration; import org.apache.commons.configuration.XMLConfiguration; import org.apache.log4j.Logger; import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.security.AMQPCallbackHandler; /** * Loads a properties file from classpath. @@ -64,21 +68,33 @@ public class ClientConfiguration extends CombinedConfiguration { public static void main(String[] args) { - System.out.println(ClientConfiguration.get().getString(QpidConstants.USE_SHARED_READ_WRITE_POOL)); - - //System.out.println(ClientConfiguration.get().getString("methodListeners.methodListener(1).[@class]")); - int count = ClientConfiguration.get().getMaxIndex(QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER); - System.out.println(count); - - for(int i=0 ;i<count;i++) - { - String methodListener = QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER + "(" + i + ")"; - System.out.println("\n\n"+ClientConfiguration.get().getString(methodListener + QpidConstants.CLASS)); - List<String> list = ClientConfiguration.get().getList(methodListener + "." + QpidConstants.METHOD_CLASS); - for(String s:list) - { - System.out.println(s); - } - } + String key = QpidConstants.AMQP_SECURITY + "." + + QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES + "." + + QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY; + + TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister = + new TreeMap<String, Class<? extends SaslClientFactory>>(); + + int index = ClientConfiguration.get().getMaxIndex(key); + + for (int i=0; i<index+1;i++) + { + String mechanism = ClientConfiguration.get().getString(key + "(" + i + ")[@type]"); + String className = ClientConfiguration.get().getString(key + "(" + i + ")" ); + try + { + Class<?> clazz = Class.forName(className); + if (!(SaslClientFactory.class.isAssignableFrom(clazz))) + { + _logger.error("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping"); + continue; + } + factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz); + } + catch (Exception ex) + { + _logger.error("Error instantiating SaslClientFactory calss " + className + " - skipping"); + } + } } } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml b/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml index 587271acd1..93e6f38074 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml @@ -1,86 +1,36 @@ <?xml version="1.0" encoding="ISO-8859-1" ?> <qpidClientConfig> -<security> - <saslClientFactoryTypes> - <saslClientFactory type="AMQPLAIN">org.apache.qpid.client.security.amqplain.AmqPlainSaslClientFactory</saslClientFactory> - </saslClientFactoryTypes> - <securityMechanisms> - <securityMechanismHandler type="PLAIN">org.apache.qpid.client.security.UsernamePasswordCallbackHandler</securityMechanismHandler> - <securityMechanismHandler type="CRAM_MD5">org.apache.qpid.client.security.UsernamePasswordCallbackHandler</securityMechanismHandler> - </securityMechanisms> -</security> - -<!-- Transport Layer properties --> -<useSharedReadWritePool>true</useSharedReadWritePool> -<enableDirectBuffers>true</enableDirectBuffers> -<enablePooledAllocator>false</enablePooledAllocator> -<tcpNoDelay>true</tcpNoDelay> -<sendBufferSizeInKb>32</sendBufferSizeInKb> -<reciveBufferSizeInKb>32</reciveBufferSizeInKb> -<qpidVMBrokerClass>org.apache.qpid.server.protocol.AMQPFastProtocolHandler</qpidVMBrokerClass> - -<!-- Execution Layer properties --> -<maxAccumilatedResponses>20</maxAccumilatedResponses> - -<!-- Model Phase properties --> -<serverTimeoutInMilliSeconds>1000</serverTimeoutInMilliSeconds> -<maxAccumilatedResponses>20</maxAccumilatedResponses> -<stateManager></stateManager> -<stateListerners> - <stateType class="org.apache.qpid.nclient.model.state.AMQPStateType.CONNECTION_STATE"> - <stateListerner></stateListerner> - </stateType> - <stateType class="org.apache.qpid.nclient.model.state.AMQPStateType.CHANNEL_STATE"> - <stateListerner></stateListerner> - </stateType> -</stateListerners> - -<methodListeners> - <methodListener class="org.apache.qpid.nclient.amqp.AMQPConnection"> - <methodClass>org.apache.qpid.framing.ConnectionStartBody</methodClass> - <methodClass>org.apache.qpid.framing.ConnectionSecureBody</methodClass> - <methodClass>org.apache.qpid.framing.ConnectionTuneBody</methodClass> - <methodClass>org.apache.qpid.framing.ConnectionOpenOkBody</methodClass> - <methodClass>org.apache.qpid.framing.ConnectionCloseBody</methodClass> - <methodClass>org.apache.qpid.framing.ConnectionCloseOkBody</methodClass> - - <methodClass>org.apache.qpid.framing.ChannelOpenOkBody</methodClass> - <methodClass>org.apache.qpid.framing.ChannelCloseBody</methodClass> - <methodClass>org.apache.qpid.framing.ChannelCloseOkBody</methodClass> - <methodClass>org.apache.qpid.framing.ChannelFlowBody</methodClass> - <methodClass>org.apache.qpid.framing.ChannelFlowOkBody</methodClass> - <methodClass>org.apache.qpid.framing.ChannelOkBody</methodClass> - - <methodClass>org.apache.qpid.framing.ExchangeDeclareOkBody</methodClass> - <methodClass>org.apache.qpid.framing.ExchangeDeleteOkBody</methodClass> - - <methodClass>org.apache.qpid.framing.QueueDeclareOkBody</methodClass> - <methodClass>org.apache.qpid.framing.QueueBindOkBody</methodClass> - <methodClass>org.apache.qpid.framing.QueueUnbindOkBody</methodClass> - <methodClass>org.apache.qpid.framing.QueuePurgeOkBody</methodClass> - <methodClass>org.apache.qpid.framing.QueueDeleteOkBody</methodClass> - - <methodClass>org.apache.qpid.framing.MessageAppendBody</methodClass> - <methodClass>org.apache.qpid.framing.MessageCancelBody</methodClass> - <methodClass>org.apache.qpid.framing.MessageCheckpointBody</methodClass> - <methodClass>org.apache.qpid.framing.MessageCloseBody</methodClass> - <methodClass>org.apache.qpid.framing.MessageGetBody</methodClass> - <methodClass>org.apache.qpid.framing.MessageOffsetBody</methodClass> - <methodClass>org.apache.qpid.framing.MessageOkBody</methodClass> - <methodClass>org.apache.qpid.framing.MessageOpenBody</methodClass> - <methodClass>org.apache.qpid.framing.MessageQosBody</methodClass> - <methodClass>org.apache.qpid.framing.MessageRecoverBody</methodClass> - <methodClass>org.apache.qpid.framing.MessageRejectBody</methodClass> - <methodClass>org.apache.qpid.framing.MessageResumeBody</methodClass> - <methodClass>org.apache.qpid.framing.MessageTransferBody</methodClass> - </methodListener> -</methodListeners> - -<phasePipe> - <phase index="0">org.apache.qpid.nclient.transport.TransportPhase<phase> - <phase index="1">org.apache.qpid.nclient.execution.ExecutionPhase<phase> - <phase index="2">org.apache.qpid.nclient.model.ModelPhase<phase> -</phasePipe> + <security> + <saslClientFactoryTypes> + <saslClientFactory type="AMQPLAIN">org.apache.qpid.nclient.security.amqplain.AmqPlainSaslClientFactory</saslClientFactory> + </saslClientFactoryTypes> + <securityMechanisms> + <securityMechanismHandler type="PLAIN">org.apache.qpid.nclient.security.UsernamePasswordCallbackHandler</securityMechanismHandler> + <securityMechanismHandler type="CRAM_MD5">org.apache.qpid.nclient.security.UsernamePasswordCallbackHandler</securityMechanismHandler> + </securityMechanisms> + </security> + + <!-- Transport Layer properties --> + <useSharedReadWritePool>false</useSharedReadWritePool> + <enableDirectBuffers>true</enableDirectBuffers> + <enablePooledAllocator>false</enablePooledAllocator> + <tcpNoDelay>true</tcpNoDelay> + <sendBufferSizeInKb>32</sendBufferSizeInKb> + <reciveBufferSizeInKb>32</reciveBufferSizeInKb> + <qpidVMBrokerClass>org.apache.qpid.server.protocol.AMQPFastProtocolHandler</qpidVMBrokerClass> + + <!-- Execution Layer properties --> + <maxAccumilatedResponses>20</maxAccumilatedResponses> + + <!-- Model Phase properties --> + <serverTimeoutInMilliSeconds>60000</serverTimeoutInMilliSeconds> + <maxAccumilatedResponses>20</maxAccumilatedResponses> + + <phasePipe> + <phase index="0">org.apache.qpid.nclient.transport.TransportPhase</phase> + <phase index="1">org.apache.qpid.nclient.execution.ExecutionPhase</phase> + <phase index="2">org.apache.qpid.nclient.model.ModelPhase</phase> + </phasePipe> </qpidClientConfig>
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java index 21602fec8e..9542aab344 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java @@ -24,19 +24,22 @@ public class PhaseFactory */ public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException { + String key = QpidConstants.PHASE_PIPE + "." + QpidConstants.PHASE; Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>(); - List<String> list = ClientConfiguration.get().getList(QpidConstants.PHASE_PIPE + "." + QpidConstants.PHASE); + List<String> list = ClientConfiguration.get().getList(key); + int index = 0; for(String s:list) { try { - Phase temp = (Phase)Class.forName(ClientConfiguration.get().getString(s)).newInstance(); - phaseMap.put(ClientConfiguration.get().getInt(s + "." + QpidConstants.INDEX),temp) ; + Phase temp = (Phase)Class.forName(s).newInstance(); + phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + QpidConstants.INDEX),temp) ; } catch(Exception e) { throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s),e); } + index++; } Phase current = null; @@ -46,7 +49,7 @@ public class PhaseFactory for (int i=0; i<phaseMap.size();i++) { current = phaseMap.get(i); - if (1+1 < phaseMap.size()) + if (i+1 < phaseMap.size()) { next = phaseMap.get(i+1); } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java index 1a949b7270..9998fda6bb 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java @@ -10,13 +10,7 @@ public interface QpidConstants { // Phase Context properties public final static String AMQP_BROKER_DETAILS = "AMQP_BROKER_DETAILS"; public final static String MINA_IO_CONNECTOR = "MINA_IO_CONNECTOR"; - //public final static String AMQP_MAJOR_VERSION = "AMQP_MAJOR_VERSION"; - //public final static String AMQP_MINOR_VERSION = "AMQP_MINOR_VERSION"; - //public final static String AMQP_SASL_CLIENT = "AMQP_SASL_CLIENT"; - //public final static String AMQP_CLIENT_ID = "AMQP_CLIENT_ID"; - //public final static String AMQP_CONNECTION_TUNE_PARAMETERS = "AMQP_CONNECTION_TUNE_PARAMETERS"; - //public final static String AMQP_VIRTUAL_HOST = "AMQP_VIRTUAL_HOST"; - //public final static String AMQP_MESSAGE_STORE = "AMQP_MESSAGE_STORE"; + public final static String EVENT_MANAGER = "EVENT_MANAGER"; /**--------------------------------------------------------------- * Configuration file properties @@ -24,17 +18,7 @@ public interface QpidConstants { */ // Model Layer properties - public final static String STATE_MANAGER = "stateManager"; - public final static String METHOD_LISTENERS = "methodListeners"; - public final static String METHOD_LISTENER = "methodListener"; - public final static String CLASS = "[@class]"; - public final static String METHOD_CLASS = "methodClass"; - public final static String STATE_LISTENERS = "stateListeners"; - public final static String STATE_LISTENER = "stateListener"; - public final static String STATE_TYPE = "stateType"; - - public final static String AMQP_MESSAGE_STORE_CLASS = "AMQP_MESSAGE_STORE_CLASS"; public final static String SERVER_TIMEOUT_IN_MILLISECONDS = "serverTimeoutInMilliSeconds"; // MINA properties @@ -50,6 +34,7 @@ public interface QpidConstants { public final static String AMQP_SECURITY_SASL_CLIENT_FACTORY = "saslClientFactory"; public final static String TYPE = "[@type]"; + public final static String AMQP_SECURITY = "security"; public final static String AMQP_SECURITY_MECHANISMS = "securityMechanisms"; public final static String AMQP_SECURITY_MECHANISM_HANDLER = "securityMechanismHandler"; diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java index 8db955afbc..1305500439 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java @@ -10,142 +10,179 @@ import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQRequestBody; import org.apache.qpid.framing.AMQResponseBody; -import org.apache.qpid.framing.RequestResponseMappingException; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.AbstractPhase; import org.apache.qpid.nclient.core.QpidConstants; -import org.apache.qpid.nclient.model.AMQPMethodEvent; -import org.apache.qpid.protocol.AMQMethodEvent; /** * Corressponds to the Layer 2 in AMQP. * This phase handles the correlation of amqp messages * This class implements the 0.9 spec (request/response) */ -public class ExecutionPhase extends AbstractPhase{ +public class ExecutionPhase extends AbstractPhase +{ + + protected static final Logger _logger = Logger.getLogger(ExecutionPhase.class); - protected static final Logger _logger = Logger.getLogger(ExecutionPhase.class); protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap(); + protected ConcurrentMap _channelId2ResponseMgrMap = new ConcurrentHashMap(); - - /** - * -------------------------------------------------- - * Phase related methods - * -------------------------------------------------- - */ - + /** + * -------------------------------------------------- + * Phase related methods + * -------------------------------------------------- + */ + // should add these in the init method //_channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false)); //_channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false)); - - public void messageReceived(Object msg) throws AMQPException - { - AMQFrame frame = (AMQFrame) msg; - final AMQBody bodyFrame = frame.getBodyFrame(); - - if (bodyFrame instanceof AMQRequestBody) - { - AMQPMethodEvent event; - try - { - event = messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody)bodyFrame); - super.messageReceived(event); - } - catch (Exception e) - { - _logger.error("Error handling request",e); - } - - } - else if (bodyFrame instanceof AMQResponseBody) - { - List<AMQPMethodEvent> events; - try - { - events = messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody)bodyFrame); - for (AMQPMethodEvent event: events) - { - super.messageReceived(event); - } - } - catch (Exception e) - { - _logger.error("Error handling response",e); - } - } - } + public void messageReceived(Object msg) throws AMQPException + { + AMQFrame frame = (AMQFrame) msg; + final AMQBody bodyFrame = frame.getBodyFrame(); - /** - * Need to figure out if the message is a request or a response - * that needs to be sent and then delegate it to the Request or response manager - * to prepare it. - */ - public void messageSent(Object msg) throws AMQPException + if (bodyFrame instanceof AMQRequestBody) { - AMQPMethodEvent evt = (AMQPMethodEvent)msg; - if(evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID) + AMQPMethodEvent event; + try + { + event = messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody) bodyFrame); + super.messageReceived(event); + } + catch (Exception e) + { + _logger.error("Error handling request", e); + } + + } + else if (bodyFrame instanceof AMQResponseBody) + { + List<AMQPMethodEvent> events; + try + { + events = messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody) bodyFrame); + for (AMQPMethodEvent event : events) { - // This is a request - AMQFrame frame = handleRequest(evt); - super.messageSent(frame); + super.messageReceived(event); } - else - { -// This is a response - List<AMQFrame> frames = handleResponse(evt); - for(AMQFrame frame: frames) - { - super.messageSent(frame); - } - } + } + catch (Exception e) + { + _logger.error("Error handling response", e); + } + } + } + + /** + * Need to figure out if the message is a request or a response + * that needs to be sent and then delegate it to the Request or response manager + * to prepare it. + */ + public void messageSent(Object msg) throws AMQPException + { + AMQPMethodEvent evt = (AMQPMethodEvent) msg; + if (evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID) + { + // This is a request + AMQFrame frame = handleRequest(evt); + super.messageSent(frame); } + else + { + // This is a response + List<AMQFrame> frames = handleResponse(evt); + for (AMQFrame frame : frames) + { + super.messageSent(frame); + } + } + } - /** - * ------------------------------------------------ - * Methods to handle request response - * ----------------------------------------------- - */ + /** + * ------------------------------------------------ + * Methods to handle request response + * ----------------------------------------------- + */ private AMQPMethodEvent messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception { - if (_logger.isDebugEnabled()) - { - _logger.debug("Request frame received: " + requestBody); - } - ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(channelId); - if (responseManager == null) - throw new AMQException("Unable to find ResponseManager for channel " + channelId); - return responseManager.requestReceived(requestBody); + if (_logger.isDebugEnabled()) + { + _logger.debug("Request frame received: " + requestBody); + } + + ResponseManager responseManager; + if(_channelId2ResponseMgrMap.containsKey(channelId)) + { + responseManager = (ResponseManager) _channelId2ResponseMgrMap.get(channelId); + } + else + { + responseManager = new ResponseManager(0,channelId,false); + _channelId2ResponseMgrMap.put(channelId, responseManager); + } + return responseManager.requestReceived(requestBody); } - - private List<AMQPMethodEvent> messageResponseBodyReceived(int channelId, AMQResponseBody responseBody) throws Exception + + private List<AMQPMethodEvent> messageResponseBodyReceived(int channelId, AMQResponseBody responseBody) + throws Exception { - if (_logger.isDebugEnabled()) - { - _logger.debug("Response frame received: " + responseBody); - } - RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(channelId); - if (requestManager == null) - throw new AMQException("Unable to find RequestManager for channel " + channelId); - return requestManager.responseReceived(responseBody); + if (_logger.isDebugEnabled()) + { + _logger.debug("Response frame received: " + responseBody); + } + + RequestManager requestManager; + if (_channelId2RequestMgrMap.containsKey(channelId)) + { + requestManager = (RequestManager) _channelId2RequestMgrMap.get(channelId); + } + else + { + requestManager = new RequestManager(0,channelId,false); + _channelId2RequestMgrMap.put(channelId, requestManager); + } + + return requestManager.responseReceived(responseBody); } - + private AMQFrame handleRequest(AMQPMethodEvent evt) { - RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(evt.getChannelId()); - return requestManager.sendRequest(evt); + int channelId = evt.getChannelId(); + RequestManager requestManager; + if (_channelId2RequestMgrMap.containsKey(channelId)) + { + requestManager = (RequestManager) _channelId2RequestMgrMap.get(channelId); + } + else + { + requestManager = new RequestManager(0,channelId,false); + _channelId2RequestMgrMap.put(channelId, requestManager); + } + return requestManager.sendRequest(evt); } - + private List<AMQFrame> handleResponse(AMQPMethodEvent evt) throws AMQPException { - ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(evt.getChannelId()); - try - { - return responseManager.sendResponse(evt); - } - catch(Exception e) - { - throw new AMQPException("Error handling response",e); - } + int channelId = evt.getChannelId(); + ResponseManager responseManager; + if(_channelId2ResponseMgrMap.containsKey(channelId)) + { + responseManager = (ResponseManager) _channelId2ResponseMgrMap.get(channelId); + } + else + { + responseManager = new ResponseManager(0,channelId,false); + _channelId2ResponseMgrMap.put(channelId, responseManager); + } + try + { + return responseManager.sendResponse(evt); + } + catch (Exception e) + { + throw new AMQPException("Error handling response", e); + } } } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java index 761ec1b050..0084f27717 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java @@ -29,7 +29,7 @@ import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQRequestBody; import org.apache.qpid.framing.AMQResponseBody; import org.apache.qpid.framing.RequestResponseMappingException; -import org.apache.qpid.nclient.model.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; public class RequestManager { @@ -56,7 +56,7 @@ public class RequestManager */ private long lastProcessedResponseId; - private ConcurrentHashMap<Long, Long> requestSentMap; + private ConcurrentHashMap<Long, CorrelationID> requestSentMap; public RequestManager(long connectionId, int channel, boolean serverFlag) { @@ -65,7 +65,7 @@ public class RequestManager this.connectionId = connectionId; requestIdCount = 1L; lastProcessedResponseId = 0L; - requestSentMap = new ConcurrentHashMap<Long, Long>(); + requestSentMap = new ConcurrentHashMap<Long, CorrelationID>(); } // *** Functions to originate a request *** @@ -80,7 +80,7 @@ public class RequestManager logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + evt.getMethod()); } - requestSentMap.put(requestId, evt.getCorrelationId()); + requestSentMap.put(requestId, new CorrelationID(evt.getCorrelationId(), evt.getLocalCorrelationId())); return requestFrame; } @@ -103,9 +103,9 @@ public class RequestManager throw new RequestResponseMappingException(requestId, "Failed to locate requestId " + requestId + " in requestSentMap."); } - long localCorrelationId = requestSentMap.get(requestId); + CorrelationID correlationID = requestSentMap.get(requestId); AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel, responseBody.getMethodPayload(), - requestId,localCorrelationId); + correlationID.getSystemCorrelationID(),correlationID.getLocalCorrelationID()); events.add(methodEvent); requestSentMap.remove(requestId); } @@ -126,4 +126,40 @@ public class RequestManager { return requestIdCount++; } + + private class CorrelationID + { + // Use for the request/response stuff + private long _systemCorrelationID; + // used internally to track callbacks + private long _localCorrelationID; + + CorrelationID(long systemCorrelationID,long localCorrelationID) + { + _localCorrelationID = localCorrelationID; + _systemCorrelationID = systemCorrelationID; + } + + public long getLocalCorrelationID() + { + return _localCorrelationID; + } + + public void setLocalCorrelationID(long correlationID) + { + _localCorrelationID = correlationID; + } + + public long getSystemCorrelationID() + { + return _systemCorrelationID; + } + + public void setSystemCorrelationID(long correlationID) + { + _systemCorrelationID = correlationID; + } + + + } } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java index 97d9576a4e..c5a75d242f 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java @@ -31,9 +31,9 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQRequestBody; import org.apache.qpid.framing.AMQResponseBody; import org.apache.qpid.framing.RequestResponseMappingException; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; import org.apache.qpid.nclient.config.ClientConfiguration; import org.apache.qpid.nclient.core.QpidConstants; -import org.apache.qpid.nclient.model.AMQPMethodEvent; public class ResponseManager { diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java index e7b762b77a..d79525c5b2 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java @@ -24,8 +24,6 @@ package org.apache.qpid.nclient.message; import java.util.LinkedList; import java.util.List; -import org.apache.qpid.client.message.MessageHeaders; - public class AMQPApplicationMessage { private int bytesReceived = 0; diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java new file mode 100644 index 0000000000..562aa7b06e --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java @@ -0,0 +1,679 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +package org.apache.qpid.nclient.message; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQPInvalidClassException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import java.util.Enumeration; + +public class MessageHeaders +{ + private static final Logger _logger = Logger.getLogger(MessageHeaders.class); + + private AMQShortString _contentType; + + private AMQShortString _encoding; + + private AMQShortString _destination; + + private AMQShortString _exchange; + + private FieldTable _jmsHeaders; + + private short _deliveryMode; + + private short _priority; + + private AMQShortString _correlationId; + + private AMQShortString _replyTo; + + private long _expiration; + + private AMQShortString _messageId; + + private long _timestamp; + + private AMQShortString _type; + + private AMQShortString _userId; + + private AMQShortString _appId; + + private AMQShortString _transactionId; + + private AMQShortString _routingKey; + + private int _size; + + public int getSize() + { + return _size; + } + + public void setSize(int size) + { + this._size = size; + } + + public MessageHeaders() + { + } + + public AMQShortString getContentType() + { + return _contentType; + } + + public void setContentType(AMQShortString contentType) + { + _contentType = contentType; + } + + public AMQShortString getEncoding() + { + return _encoding; + } + + public void setEncoding(AMQShortString encoding) + { + _encoding = encoding; + } + + public FieldTable getJMSHeaders() + { + if (_jmsHeaders == null) + { + setJMSHeaders(FieldTableFactory.newFieldTable()); + } + + return _jmsHeaders; + } + + public void setJMSHeaders(FieldTable headers) + { + _jmsHeaders = headers; + } + + + public short getDeliveryMode() + { + return _deliveryMode; + } + + public void setDeliveryMode(short deliveryMode) + { + _deliveryMode = deliveryMode; + } + + public short getPriority() + { + return _priority; + } + + public void setPriority(short priority) + { + _priority = priority; + } + + public AMQShortString getCorrelationId() + { + return _correlationId; + } + + public void setCorrelationId(AMQShortString correlationId) + { + _correlationId = correlationId; + } + + public AMQShortString getReplyTo() + { + return _replyTo; + } + + public void setReplyTo(AMQShortString replyTo) + { + _replyTo = replyTo; + } + + public long getExpiration() + { + return _expiration; + } + + public void setExpiration(long expiration) + { + _expiration = expiration; + } + + + public AMQShortString getMessageId() + { + return _messageId; + } + + public void setMessageId(AMQShortString messageId) + { + _messageId = messageId; + } + + public long getTimestamp() + { + return _timestamp; + } + + public void setTimestamp(long timestamp) + { + _timestamp = timestamp; + } + + public AMQShortString getType() + { + return _type; + } + + public void setType(AMQShortString type) + { + _type = type; + } + + public AMQShortString getUserId() + { + return _userId; + } + + public void setUserId(AMQShortString userId) + { + _userId = userId; + } + + public AMQShortString getAppId() + { + return _appId; + } + + public void setAppId(AMQShortString appId) + { + _appId = appId; + } + + // MapMessage Interface + + public boolean getBoolean(AMQShortString string) throws JMSException + { + Boolean b = getJMSHeaders().getBoolean(string); + + if (b == null) + { + if (getJMSHeaders().containsKey(string)) + { + Object str = getJMSHeaders().getObject(string); + + if (str == null || !(str instanceof AMQShortString)) + { + throw new MessageFormatException("getBoolean can't use " + string + " item."); + } + else + { + return Boolean.valueOf(((AMQShortString)str).asString()); + } + } + else + { + b = Boolean.valueOf(null); + } + } + + return b; + } + + public char getCharacter(AMQShortString string) throws JMSException + { + Character c = getJMSHeaders().getCharacter(string); + + if (c == null) + { + if (getJMSHeaders().isNullStringValue(string.asString())) + { + throw new NullPointerException("Cannot convert null char"); + } + else + { + throw new MessageFormatException("getChar can't use " + string + " item."); + } + } + else + { + return (char) c; + } + } + + public byte[] getBytes(AMQShortString string) throws JMSException + { + byte[] bs = getJMSHeaders().getBytes(string); + + if (bs == null) + { + throw new MessageFormatException("getBytes can't use " + string + " item."); + } + else + { + return bs; + } + } + + public byte getByte(AMQShortString string) throws JMSException + { + Byte b = getJMSHeaders().getByte(string); + if (b == null) + { + if (getJMSHeaders().containsKey(string)) + { + Object str = getJMSHeaders().getObject(string); + + if (str == null || !(str instanceof AMQShortString)) + { + throw new MessageFormatException("getByte can't use " + string + " item."); + } + else + { + return Byte.valueOf(((AMQShortString)str).asString()); + } + } + else + { + b = Byte.valueOf(null); + } + } + + return b; + } + + public short getShort(AMQShortString string) throws JMSException + { + Short s = getJMSHeaders().getShort(string); + + if (s == null) + { + s = Short.valueOf(getByte(string)); + } + + return s; + } + + public int getInteger(AMQShortString string) throws JMSException + { + Integer i = getJMSHeaders().getInteger(string); + + if (i == null) + { + i = Integer.valueOf(getShort(string)); + } + + return i; + } + + public long getLong(AMQShortString string) throws JMSException + { + Long l = getJMSHeaders().getLong(string); + + if (l == null) + { + l = Long.valueOf(getInteger(string)); + } + + return l; + } + + public float getFloat(AMQShortString string) throws JMSException + { + Float f = getJMSHeaders().getFloat(string); + + if (f == null) + { + if (getJMSHeaders().containsKey(string)) + { + Object str = getJMSHeaders().getObject(string); + + if (str == null || !(str instanceof AMQShortString)) + { + throw new MessageFormatException("getFloat can't use " + string + " item."); + } + else + { + return Float.valueOf(((AMQShortString)str).asString()); + } + } + else + { + f = Float.valueOf(null); + } + + } + + return f; + } + + public double getDouble(AMQShortString string) throws JMSException + { + Double d = getJMSHeaders().getDouble(string); + + if (d == null) + { + d = Double.valueOf(getFloat(string)); + } + + return d; + } + + public AMQShortString getString(AMQShortString string) throws JMSException + { + AMQShortString s = new AMQShortString(getJMSHeaders().getString(string.asString())); + + if (s == null) + { + if (getJMSHeaders().containsKey(string)) + { + Object o = getJMSHeaders().getObject(string); + if (o instanceof byte[]) + { + throw new MessageFormatException("getObject couldn't find " + string + " item."); + } + else + { + if (o == null) + { + return null; + } + else + { + s = (AMQShortString) o; + } + } + } + } + + return s; + } + + public Object getObject(AMQShortString string) throws JMSException + { + return getJMSHeaders().getObject(string); + } + + public void setBoolean(AMQShortString string, boolean b) throws JMSException + { + checkPropertyName(string); + getJMSHeaders().setBoolean(string, b); + } + + public void setChar(AMQShortString string, char c) throws JMSException + { + checkPropertyName(string); + getJMSHeaders().setChar(string, c); + } + + public Object setBytes(AMQShortString string, byte[] bytes) + { + return getJMSHeaders().setBytes(string, bytes); + } + + public Object setBytes(AMQShortString string, byte[] bytes, int start, int length) + { + return getJMSHeaders().setBytes(string, bytes, start, length); + } + + public void setByte(AMQShortString string, byte b) throws JMSException + { + checkPropertyName(string); + getJMSHeaders().setByte(string, b); + } + + public void setShort(AMQShortString string, short i) throws JMSException + { + checkPropertyName(string); + getJMSHeaders().setShort(string, i); + } + + public void setInteger(AMQShortString string, int i) throws JMSException + { + checkPropertyName(string); + getJMSHeaders().setInteger(string, i); + } + + public void setLong(AMQShortString string, long l) throws JMSException + { + checkPropertyName(string); + getJMSHeaders().setLong(string, l); + } + + public void setFloat(AMQShortString string, float v) throws JMSException + { + checkPropertyName(string); + getJMSHeaders().setFloat(string, v); + } + + public void setDouble(AMQShortString string, double v) throws JMSException + { + checkPropertyName(string); + getJMSHeaders().setDouble(string, v); + } + + public void setString(AMQShortString string, AMQShortString string1) throws JMSException + { + checkPropertyName(string); + getJMSHeaders().setString(string.asString(), string1.asString()); + } + + public void setObject(AMQShortString string, Object object) throws JMSException + { + checkPropertyName(string); + try + { + getJMSHeaders().setObject(string, object); + } + catch (AMQPInvalidClassException aice) + { + throw new MessageFormatException("Only primatives are allowed object is:" + object.getClass()); + } + } + + public boolean itemExists(AMQShortString string) throws JMSException + { + return getJMSHeaders().containsKey(string); + } + + public Enumeration getPropertyNames() + { + return getJMSHeaders().getPropertyNames(); + } + + public void clear() + { + getJMSHeaders().clear(); + } + + public boolean propertyExists(AMQShortString propertyName) + { + return getJMSHeaders().propertyExists(propertyName); + } + + public Object put(Object key, Object value) + { + return getJMSHeaders().setObject(key.toString(), value); + } + + public Object remove(AMQShortString propertyName) + { + return getJMSHeaders().remove(propertyName); + } + + public boolean isEmpty() + { + return getJMSHeaders().isEmpty(); + } + + public void writeToBuffer(ByteBuffer data) + { + getJMSHeaders().writeToBuffer(data); + } + + public Enumeration getMapNames() + { + return getPropertyNames(); + } + + protected static void checkPropertyName(CharSequence propertyName) + { + if (propertyName == null) + { + throw new IllegalArgumentException("Property name must not be null"); + } + else if (propertyName.length() == 0) + { + throw new IllegalArgumentException("Property name must not be the empty string"); + } + + checkIdentiferFormat(propertyName); + } + + protected static void checkIdentiferFormat(CharSequence propertyName) + { +// JMS requirements 3.5.1 Property Names +// Identifiers: +// - An identifier is an unlimited-length character sequence that must begin +// with a Java identifier start character; all following characters must be Java +// identifier part characters. An identifier start character is any character for +// which the method Character.isJavaIdentifierStart returns true. This includes +// '_' and '$'. An identifier part character is any character for which the +// method Character.isJavaIdentifierPart returns true. +// - Identifiers cannot be the names NULL, TRUE, or FALSE. +// – Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or +// ESCAPE. +// – Identifiers are either header field references or property references. The +// type of a property value in a message selector corresponds to the type +// used to set the property. If a property that does not exist in a message is +// referenced, its value is NULL. The semantics of evaluating NULL values +// in a selector are described in Section 3.8.1.2, “Null Values.” +// – The conversions that apply to the get methods for properties do not +// apply when a property is used in a message selector expression. For +// example, suppose you set a property as a string value, as in the +// following: +// myMessage.setStringProperty("NumberOfOrders", "2"); +// The following expression in a message selector would evaluate to false, +// because a string cannot be used in an arithmetic expression: +// "NumberOfOrders > 1" +// – Identifiers are case sensitive. +// – Message header field references are restricted to JMSDeliveryMode, +// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and +// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be +// null and if so are treated as a NULL value. + + if (Boolean.getBoolean("strict-jms")) + { + // JMS start character + if (!(Character.isJavaIdentifierStart(propertyName.charAt(0)))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character"); + } + + // JMS part character + int length = propertyName.length(); + for (int c = 1; c < length; c++) + { + if (!(Character.isJavaIdentifierPart(propertyName.charAt(c)))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character"); + } + } + + + + + // JMS invalid names + if ((propertyName.equals("NULL") + || propertyName.equals("TRUE") + || propertyName.equals("FALSE") + || propertyName.equals("NOT") + || propertyName.equals("AND") + || propertyName.equals("OR") + || propertyName.equals("BETWEEN") + || propertyName.equals("LIKE") + || propertyName.equals("IN") + || propertyName.equals("IS") + || propertyName.equals("ESCAPE"))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS"); + } + } + + } + + public AMQShortString getTransactionId() + { + return _transactionId; + } + + public void setTransactionId(AMQShortString id) + { + _transactionId = id; + } + + public AMQShortString getDestination() + { + return _destination; + } + + public void setDestination(AMQShortString destination) + { + this._destination = destination; + } + + public AMQShortString getExchange() + { + return _exchange; + } + + public void setExchange(AMQShortString exchange) + { + this._exchange = exchange; + } + + public AMQShortString getRoutingKey() + { + return _routingKey; + } + + public void setRoutingKey(AMQShortString routingKey) + { + this._routingKey = routingKey; + } +} + + diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java index 93eecdc0cc..efd7264f96 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java @@ -1,7 +1,6 @@ package org.apache.qpid.nclient.message; import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.MessageHeaders; public interface MessageStore { diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java index 26cf2327d7..eb5a9c1778 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java @@ -4,7 +4,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.MessageHeaders; public class TransientMessageStore implements MessageStore { diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java deleted file mode 100644 index c33c087da8..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java +++ /dev/null @@ -1,64 +0,0 @@ -package org.apache.qpid.nclient.model; - -import org.apache.qpid.framing.AMQMethodBody; - -/** - * This class is exactly the same as the AMQMethod event. - * Except I renamed requestId to corelationId, so I could use it both ways. - * - * I didn't want to modify anything in common so that there is no - * impact on the existing code. - * - */ -public class AMQPMethodEvent <M extends AMQMethodBody> { - - private final M _method; - private final int _channelId; - private final long _correlationId; - private long _localCorrletionId = 0; - - public AMQPMethodEvent(int channelId, M method, long correlationId,long localCorrletionId) - { - _channelId = channelId; - _method = method; - _correlationId = correlationId; - _localCorrletionId = localCorrletionId; - } - - public AMQPMethodEvent(int channelId, M method, long correlationId) - { - _channelId = channelId; - _method = method; - _correlationId = correlationId; - } - - public M getMethod() - { - return _method; - } - - public int getChannelId() - { - return _channelId; - } - - public long getCorrelationId() - { - return _correlationId; - } - - public long getLocalCorrelationId() - { - return _localCorrletionId; - } - - public String toString() - { - StringBuilder buf = new StringBuilder("Method event: \n"); - buf.append("Channel id: \n").append(_channelId); - buf.append("Method: \n").append(_method); - buf.append("Request Id: ").append(_correlationId); - buf.append("Local Correlation Id: ").append(_localCorrletionId); - return buf.toString(); - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java index 77003b4d21..d845059ee7 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java @@ -1,13 +1,12 @@ package org.apache.qpid.nclient.model; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.log4j.Logger; -import org.apache.qpid.nclient.amqp.state.AMQPStateManager; -import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.amqp.event.AMQPEventManager; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.AbstractPhase; import org.apache.qpid.nclient.core.Phase; @@ -31,15 +30,7 @@ public class ModelPhase extends AbstractPhase { */ public void init(PhaseContext ctx, Phase nextInFlowPhase, Phase nextOutFlowPhase) { - super.init(ctx, nextInFlowPhase, nextOutFlowPhase); - try - { - loadMethodListeners(); - } - catch(Exception e) - { - _logger.fatal("Error loading method listeners", e); - } + super.init(ctx, nextInFlowPhase, nextOutFlowPhase); } public void messageReceived(Object msg) throws AMQPException @@ -68,66 +59,13 @@ public class ModelPhase extends AbstractPhase { public void notifyMethodListerners(AMQPMethodEvent event) throws AMQPException { - if (_methodListners.containsKey(event.getMethod().getClass())) - { - List<AMQPMethodListener> listeners = _methodListners.get(event.getMethod().getClass()); - - if(listeners.size()>0) - { - throw new AMQPException("There are no registered listeners for this method"); - } - - for(AMQPMethodListener l : listeners) - { - try - { - l.methodReceived(event); - } - catch (Exception e) - { - _logger.error("Error handling method event " + event, e); - } - } - } + AMQPEventManager eventManager = (AMQPEventManager)_ctx.getProperty(QpidConstants.EVENT_MANAGER); + eventManager.notifyEvent(event); } /** * ------------------------------------------------ * Configuration * ------------------------------------------------ - */ - - /** - * This method loads method listeners from the client.xml file - * For each method class there is a list of listeners - */ - private void loadMethodListeners() throws Exception - { - int count = ClientConfiguration.get().getMaxIndex(QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER); - System.out.println(count); - - for(int i=0 ;i<count;i++) - { - String methodListener = QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER + "(" + i + ")"; - String className = ClientConfiguration.get().getString(methodListener + "." + QpidConstants.CLASS); - Class listenerClass = Class.forName(className); - List<String> list = ClientConfiguration.get().getList(methodListener + "." + QpidConstants.METHOD_CLASS); - for(String s:list) - { - List listeners; - Class methodClass = Class.forName(s); - if (_methodListners.containsKey(methodClass)) - { - listeners = _methodListners.get(methodClass); - } - else - { - listeners = new ArrayList(); - _methodListners.put(methodClass,listeners); - } - listeners.add(listenerClass); - } - } - } - + */ } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java index 28ba2e355c..428cd6753d 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java @@ -62,11 +62,16 @@ public class CallbackHandlerRegistry private void parseProperties() { - List<String> mechanisms = ClientConfiguration.get().getList(QpidConstants.AMQP_SECURITY_MECHANISMS); - - for (String mechanism : mechanisms) + String key = QpidConstants.AMQP_SECURITY + "." + + QpidConstants.AMQP_SECURITY_MECHANISMS + "." + + QpidConstants.AMQP_SECURITY_MECHANISM_HANDLER; + + int index = ClientConfiguration.get().getMaxIndex(key); + + for (int i=0; i<index+1;i++) { - String className = ClientConfiguration.get().getString(QpidConstants.AMQP_SECURITY_MECHANISM_HANDLER + "_" + mechanism); + String mechanism = ClientConfiguration.get().getString(key + "(" + i + ")[@type]"); + String className = ClientConfiguration.get().getString(key + "(" + i + ")" ); Class clazz = null; try { diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java new file mode 100644 index 0000000000..1097346c1d --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security.amqplain; + +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; + +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.Callback; + +/** + * Implements the "AMQPlain" authentication protocol that uses FieldTables to send username and pwd. + * + */ +public class AmqPlainSaslClient implements SaslClient +{ + /** + * The name of this mechanism + */ + public static final String MECHANISM = "AMQPLAIN"; + + private CallbackHandler _cbh; + + public AmqPlainSaslClient(CallbackHandler cbh) + { + _cbh = cbh; + } + + public String getMechanismName() + { + return "AMQPLAIN"; + } + + public boolean hasInitialResponse() + { + return true; + } + + public byte[] evaluateChallenge(byte[] challenge) throws SaslException + { + // we do not care about the prompt or the default name + NameCallback nameCallback = new NameCallback("prompt", "defaultName"); + PasswordCallback pwdCallback = new PasswordCallback("prompt", false); + Callback[] callbacks = new Callback[]{nameCallback, pwdCallback}; + try + { + _cbh.handle(callbacks); + } + catch (Exception e) + { + throw new SaslException("Error handling SASL callbacks: " + e, e); + } + FieldTable table = FieldTableFactory.newFieldTable(); + table.setString("LOGIN", nameCallback.getName()); + table.setString("PASSWORD", new String(pwdCallback.getPassword())); + return table.getDataAsBytes(); + } + + public boolean isComplete() + { + return true; + } + + public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException + { + throw new SaslException("Not supported"); + } + + public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException + { + throw new SaslException("Not supported"); + } + + public Object getNegotiatedProperty(String propName) + { + return null; + } + + public void dispose() throws SaslException + { + _cbh = null; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java new file mode 100644 index 0000000000..f98c1e3a58 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security.amqplain; + +import javax.security.sasl.SaslClientFactory; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.Sasl; +import javax.security.auth.callback.CallbackHandler; +import java.util.Map; + +public class AmqPlainSaslClientFactory implements SaslClientFactory +{ + public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName, Map props, CallbackHandler cbh) throws SaslException + { + for (int i = 0; i < mechanisms.length; i++) + { + if (mechanisms[i].equals(AmqPlainSaslClient.MECHANISM)) + { + if (cbh == null) + { + throw new SaslException("CallbackHandler must not be null"); + } + return new AmqPlainSaslClient(cbh); + } + } + return null; + } + + public String[] getMechanismNames(Map props) + { + if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || + props.containsKey(Sasl.POLICY_NODICTIONARY) || + props.containsKey(Sasl.POLICY_NOACTIVE)) + { + // returned array must be non null according to interface documentation + return new String[0]; + } + else + { + return new String[]{AmqPlainSaslClient.MECHANISM}; + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java index aae3677f8b..734aa68a9d 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java @@ -22,10 +22,12 @@ public class TCPConnection implements TransportConnection private BrokerDetails _brokerDetails; private IoConnector _ioConnector; private Phase _phase; + private PhaseContext _ctx; - protected TCPConnection(ConnectionURL url) + protected TCPConnection(ConnectionURL url, PhaseContext ctx) { _brokerDetails = url.getBrokerDetails(0); + _ctx = ctx; ByteBuffer.setUseDirectBuffers(ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_DIRECT_BUFFERS)); @@ -41,8 +43,8 @@ public class TCPConnection implements TransportConnection ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); } - final IoConnector ioConnector = new SocketConnector(); - SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); + _ioConnector = new SocketConnector(); + SocketConnectorConfig cfg = (SocketConnectorConfig) _ioConnector.getDefaultConfig(); // if we do not use our own thread model we get the MINA default which is to use // its own leader-follower model @@ -59,12 +61,11 @@ public class TCPConnection implements TransportConnection // Returns the phase pipe public Phase connect() throws AMQPException - { - PhaseContext ctx = new DefaultPhaseContext(); - ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails); - ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector); + { + _ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails); + _ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector); - _phase = PhaseFactory.createPhasePipe(ctx); + _phase = PhaseFactory.createPhasePipe(_ctx); _phase.start(); return _phase; diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java index 5c97100bdc..ce2145c08b 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java @@ -2,6 +2,8 @@ package org.apache.qpid.nclient.transport; import java.net.URISyntaxException; +import org.apache.qpid.nclient.core.PhaseContext; + public class TransportConnectionFactory { public enum ConnectionType @@ -9,36 +11,26 @@ public class TransportConnectionFactory TCP,VM } - public static TransportConnection createTransportConnection(String url,ConnectionType type) throws URISyntaxException + public static TransportConnection createTransportConnection(String url,ConnectionType type, PhaseContext ctx) throws URISyntaxException { - return createTransportConnection(new AMQPConnectionURL(url),type); + return createTransportConnection(new AMQPConnectionURL(url),type,ctx); } - public static TransportConnection createTransportConnection(ConnectionURL url,ConnectionType type) + public static TransportConnection createTransportConnection(ConnectionURL url,ConnectionType type, PhaseContext ctx) { switch (type) { case TCP : default: { - return createTCPConnection(url); + return new TCPConnection(url,ctx); } case VM : { - return createVMConnection(url); + return new VMConnection(url,ctx); } } } - - private static TransportConnection createTCPConnection(ConnectionURL url) - { - return new TCPConnection(url); - } - - private static TransportConnection createVMConnection(ConnectionURL url) - { - return null; - } } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java index b1bc7b4c8c..911e855d4f 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java @@ -108,7 +108,6 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol public void messageSent(Object frame) throws AMQPException { - _ioSession.write(frame); } @@ -120,7 +119,7 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol public void sessionIdle(IoSession session, IdleStatus status) throws Exception { - _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + _logger.debug("Protocol Session for [ " + this + " : " + session + "] idle: " + status); if (IdleStatus.WRITER_IDLE.equals(status)) { @@ -148,9 +147,8 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol _logger.debug("Received heartbeat"); } else { - messageReceived(bodyFrame); - } - // _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); + messageReceived(frame); + } } public void messageSent(IoSession session, Object message) throws Exception @@ -162,18 +160,19 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol throws Exception { // Need to handle failover - sessionClosed(session); + _logger.info("Exception caught for [ " + this + " : Session " + System.identityHashCode(session) + "]",cause); + //sessionClosed(session); } public void sessionClosed(IoSession session) throws Exception { // Need to handle failover - _logger.info("Protocol Session [" + this + "] closed"); + _logger.info("Protocol Session for [ " + this + " : " + System.identityHashCode(session) + "] closed"); } public void sessionCreated(IoSession session) throws Exception { - _logger.debug("Protocol session created for session " + _logger.info("Protocol session created for " + this + " session : " + System.identityHashCode(session)); final ProtocolCodecFilter pcf = new ProtocolCodecFilter( @@ -184,7 +183,8 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol { session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf); - } else + } + else { session.getFilterChain().addLast("protocolFilter", pcf); } @@ -213,12 +213,13 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol e.printStackTrace(); } + _ioSession = session; doAMQPConnectionNegotiation(); } public void sessionOpened(IoSession session) throws Exception { - _logger.debug("Protocol session opened for session " + _logger.info("Protocol session opened for " + this + " : session " + System.identityHashCode(session)); } @@ -230,6 +231,7 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol private void doAMQPConnectionNegotiation() { int i = pv.length - 1; + _logger.debug("Engaging in connection negotiation"); writeFrame(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); } @@ -257,7 +259,8 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol } /** - * ----------------------------------------------------------- Failover - * section ----------------------------------------------------------- + * ----------------------------------------------------------- + * Failover section + * ----------------------------------------------------------- */ } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java index c13bb873a7..ba38848149 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java @@ -26,10 +26,13 @@ public class VMConnection implements TransportConnection private BrokerDetails _brokerDetails; private IoConnector _ioConnector; private Phase _phase; + private PhaseContext _ctx; - protected VMConnection(ConnectionURL url) + protected VMConnection(ConnectionURL url,PhaseContext ctx) { _brokerDetails = url.getBrokerDetails(0); + _ctx = ctx; + _ioConnector = new VmPipeConnector(); final IoServiceConfig cfg = _ioConnector.getDefaultConfig(); ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance(); @@ -45,11 +48,10 @@ public class VMConnection implements TransportConnection { createVMBroker(); - PhaseContext ctx = new DefaultPhaseContext(); - ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails); - ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector); + _ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails); + _ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector); - _phase = PhaseFactory.createPhasePipe(ctx); + _phase = PhaseFactory.createPhasePipe(_ctx); _phase.start(); return _phase; |