From 48f73ce3731366fd1b014faa3bbaa2820f41e8bb Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Mon, 28 May 2007 20:26:55 +0000 Subject: This is the initial checkin for the Qpid java client which is built on top of the AMQP protocol level client/framework git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@542314 13f79535-47bb-0310-9956-ffa450edef68 --- java/client/pom.xml | 5 - .../apache/qpid/nclient/amqp/AMQPClassFactory.java | 12 +- .../nclient/amqp/qpid/QpidAMQPClassFactory.java | 64 ++--- .../qpid/nclient/amqp/qpid/QpidStateManager.java | 5 +- .../qpid/nclient/amqp/sample/SecurityHelper.java | 74 ------ .../qpid/nclient/amqp/sample/TestClient.java | 7 +- .../qpid/nclient/amqp/state/AMQPStateManager.java | 5 +- .../apache/qpid/nclient/api/QpidConnection.java | 33 +++ .../org/apache/qpid/nclient/api/QpidException.java | 35 +++ .../qpid/nclient/api/QpidExchangeHelper.java | 43 ++++ .../qpid/nclient/api/QpidMessageConsumer.java | 34 +++ .../apache/qpid/nclient/api/QpidMessageHelper.java | 47 ++++ .../qpid/nclient/api/QpidMessageProducer.java | 28 +++ .../apache/qpid/nclient/api/QpidQueueHelper.java | 47 ++++ .../org/apache/qpid/nclient/api/QpidSession.java | 53 +++++ .../qpid/nclient/api/QpidTransactionHelper.java | 42 ++++ .../qpid/nclient/impl/AMQPCallbackHelper.java | 50 ++++ .../apache/qpid/nclient/impl/AbstractResource.java | 51 ++++ .../apache/qpid/nclient/impl/HelperTemplate.java | 39 ++++ .../qpid/nclient/impl/QpidConnectionImpl.java | 242 +++++++++++++++++++ .../qpid/nclient/impl/QpidExchangeHelperImpl.java | 101 ++++++++ .../qpid/nclient/impl/QpidMessageConsumerImpl.java | 200 ++++++++++++++++ .../qpid/nclient/impl/QpidMessageProducerImpl.java | 103 ++++++++ .../qpid/nclient/impl/QpidQueueHelperImpl.java | 178 ++++++++++++++ .../apache/qpid/nclient/impl/QpidSessionImpl.java | 259 +++++++++++++++++++++ .../apache/qpid/nclient/impl/SecurityHelper.java | 74 ++++++ .../nclient/message/AMQPApplicationMessage.java | 12 + .../qpid/nclient/message/MessageHeaders.java | 130 +++++++---- .../apache/qpid/nclient/message/MessageStore.java | 12 +- .../nclient/message/TransientMessageStore.java | 35 ++- 30 files changed, 1830 insertions(+), 190 deletions(-) delete mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConnection.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidException.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidExchangeHelper.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageConsumer.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageHelper.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageProducer.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidQueueHelper.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidSession.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidTransactionHelper.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/impl/AMQPCallbackHelper.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/impl/HelperTemplate.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidExchangeHelperImpl.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidQueueHelperImpl.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/impl/SecurityHelper.java diff --git a/java/client/pom.xml b/java/client/pom.xml index 6d6bb7cae2..854428fb39 100644 --- a/java/client/pom.xml +++ b/java/client/pom.xml @@ -52,11 +52,6 @@ geronimo-jms_1.1_spec - - org.apache.geronimo.specs - geronimo-jta_1.1_spec - - commons-collections commons-collections diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java index 4976daa4fa..a310941d18 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java @@ -21,10 +21,6 @@ package org.apache.qpid.nclient.amqp; import org.apache.qpid.nclient.amqp.event.AMQPEventManager; -import org.apache.qpid.nclient.amqp.qpid.QpidAMQPChannel; -import org.apache.qpid.nclient.amqp.qpid.QpidAMQPExchange; -import org.apache.qpid.nclient.amqp.qpid.QpidAMQPMessage; -import org.apache.qpid.nclient.amqp.qpid.QpidAMQPQueue; import org.apache.qpid.nclient.amqp.state.AMQPStateManager; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.transport.ConnectionURL; @@ -40,19 +36,19 @@ public interface AMQPClassFactory public abstract AMQPChannel createChannelClass(int channel) throws AMQPException; - public abstract void destroyChannelClass(int channel, QpidAMQPChannel amqpChannel) throws AMQPException; + public abstract void destroyChannelClass(int channel, AMQPChannel amqpChannel) throws AMQPException; public abstract AMQPExchange createExchangeClass(int channel) throws AMQPException; - public abstract void destoryExchangeClass(int channel, QpidAMQPExchange amqpExchange) throws AMQPException; + public abstract void destoryExchangeClass(int channel, AMQPExchange amqpExchange) throws AMQPException; public abstract AMQPQueue createQueueClass(int channel) throws AMQPException; - public abstract void destroyQueueClass(int channel, QpidAMQPQueue amqpQueue) throws AMQPException; + public abstract void destroyQueueClass(int channel, AMQPQueue amqpQueue) throws AMQPException; public abstract AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException; - public abstract void destoryMessageClass(int channel, QpidAMQPMessage amqpMessage) throws AMQPException; + public abstract void destoryMessageClass(int channel, AMQPMessage amqpMessage) throws AMQPException; /** * Extention point diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java index 809aa57dab..a38469def5 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java @@ -152,14 +152,15 @@ public class QpidAMQPClassFactory implements AMQPClassFactory /* (non-Javadoc) * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destroyChannelClass(int, org.apache.qpid.nclient.amqp.AMQPChannel) */ - public void destroyChannelClass(int channel, QpidAMQPChannel amqpChannel) throws AMQPException + public void destroyChannelClass(int channel, AMQPChannel amqpChannel) throws AMQPException { - _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelOkBody.class, amqpChannel); + QpidAMQPChannel qpidAMQPChannel = (QpidAMQPChannel)amqpChannel; + _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, qpidAMQPChannel); + _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, qpidAMQPChannel); + _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, qpidAMQPChannel); + _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, qpidAMQPChannel); + _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, qpidAMQPChannel); + _eventManager.removeMethodEventListener(channel, ChannelOkBody.class, qpidAMQPChannel); } /* (non-Javadoc) @@ -177,10 +178,11 @@ public class QpidAMQPClassFactory implements AMQPClassFactory /* (non-Javadoc) * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destoryExchangeClass(int, org.apache.qpid.nclient.amqp.AMQPExchange) */ - public void destoryExchangeClass(int channel, QpidAMQPExchange amqpExchange) throws AMQPException + public void destoryExchangeClass(int channel, AMQPExchange amqpExchange) throws AMQPException { - _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange); - _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange); + QpidAMQPExchange qpidAMQPExchange = (QpidAMQPExchange)amqpExchange; + _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, qpidAMQPExchange); + _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, qpidAMQPExchange); } /* (non-Javadoc) @@ -201,13 +203,14 @@ public class QpidAMQPClassFactory implements AMQPClassFactory /* (non-Javadoc) * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destroyQueueClass(int, org.apache.qpid.nclient.amqp.AMQPQueue) */ - public void destroyQueueClass(int channel, QpidAMQPQueue amqpQueue) throws AMQPException + public void destroyQueueClass(int channel, AMQPQueue amqpQueue) throws AMQPException { - _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue); - _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, amqpQueue); - _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue); - _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue); - _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue); + QpidAMQPQueue qpidAMQPQueue = (QpidAMQPQueue)amqpQueue; + _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, qpidAMQPQueue); + _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, qpidAMQPQueue); + _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, qpidAMQPQueue); + _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, qpidAMQPQueue); + _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, qpidAMQPQueue); } /* (non-Javadoc) @@ -237,21 +240,22 @@ public class QpidAMQPClassFactory implements AMQPClassFactory /* (non-Javadoc) * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destoryMessageClass(int, org.apache.qpid.nclient.amqp.AMQPMessage) */ - public void destoryMessageClass(int channel, QpidAMQPMessage amqpMessage) throws AMQPException + public void destoryMessageClass(int channel, AMQPMessage amqpMessage) throws AMQPException { - _eventManager.removeMethodEventListener(channel, MessageAppendBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageCancelBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageCloseBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageGetBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageOkBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageOpenBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageRejectBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageResumeBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageQosBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageTransferBody.class, amqpMessage); + QpidAMQPMessage qpidAMQPMessage = (QpidAMQPMessage) amqpMessage; + _eventManager.removeMethodEventListener(channel, MessageAppendBody.class, qpidAMQPMessage); + _eventManager.removeMethodEventListener(channel, MessageCancelBody.class, qpidAMQPMessage); + _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, qpidAMQPMessage); + _eventManager.removeMethodEventListener(channel, MessageCloseBody.class, qpidAMQPMessage); + _eventManager.removeMethodEventListener(channel, MessageGetBody.class, qpidAMQPMessage); + _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, qpidAMQPMessage); + _eventManager.removeMethodEventListener(channel, MessageOkBody.class, qpidAMQPMessage); + _eventManager.removeMethodEventListener(channel, MessageOpenBody.class, qpidAMQPMessage); + _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, qpidAMQPMessage); + _eventManager.removeMethodEventListener(channel, MessageRejectBody.class, qpidAMQPMessage); + _eventManager.removeMethodEventListener(channel, MessageResumeBody.class, qpidAMQPMessage); + _eventManager.removeMethodEventListener(channel, MessageQosBody.class, qpidAMQPMessage); + _eventManager.removeMethodEventListener(channel, MessageTransferBody.class, qpidAMQPMessage); } //This class should register as a state listener for AMQPConnection diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java index 3fe1ee4cfd..196b441308 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; import org.apache.qpid.nclient.amqp.state.AMQPStateListener; import org.apache.qpid.nclient.amqp.state.AMQPStateManager; @@ -40,7 +39,7 @@ public class QpidStateManager implements AMQPStateManager private Map> _listernerMap = new ConcurrentHashMap>(); - public void addListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException + public void addListener(AMQPStateType stateType, AMQPStateListener l) throws AMQPException { List list; if(_listernerMap.containsKey(stateType)) @@ -55,7 +54,7 @@ public class QpidStateManager implements AMQPStateManager list.add(l); } - public void removeListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException + public void removeListener(AMQPStateType stateType, AMQPStateListener l) throws AMQPException { if(_listernerMap.containsKey(stateType)) { diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java deleted file mode 100644 index 908f0adee0..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp.sample; - -import java.io.UnsupportedEncodingException; -import java.util.HashSet; -import java.util.StringTokenizer; - -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.security.AMQPCallbackHandler; -import org.apache.qpid.nclient.security.CallbackHandlerRegistry; -import org.apache.qpid.nclient.transport.ConnectionURL; - -public class SecurityHelper -{ - public static String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException - { - final String mechanisms = new String(availableMechanisms, "utf8"); - StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); - HashSet mechanismSet = new HashSet(); - while (tokenizer.hasMoreTokens()) - { - mechanismSet.add(tokenizer.nextToken()); - } - - String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms(); - StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " "); - while (prefTokenizer.hasMoreTokens()) - { - String mech = prefTokenizer.nextToken(); - if (mechanismSet.contains(mech)) - { - return mech; - } - } - return null; - } - - public static AMQPCallbackHandler createCallbackHandler(String mechanism, ConnectionURL url) - throws AMQPException - { - Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism); - try - { - Object instance = mechanismClass.newInstance(); - AMQPCallbackHandler cbh = (AMQPCallbackHandler) instance; - cbh.initialise(url); - return cbh; - } - catch (Exception e) - { - throw new AMQPException("Unable to create callback handler: " + e, e); - } - } - -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java index 3dce1cde1e..267225afec 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java @@ -45,6 +45,7 @@ import org.apache.qpid.framing.ConnectionTuneBody; import org.apache.qpid.framing.ConnectionTuneOkBody; import org.apache.qpid.framing.Content; import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeleteBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MessageCancelBody; @@ -58,6 +59,7 @@ import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.framing.QueueDeleteOkBody; import org.apache.qpid.framing.QueuePurgeBody; import org.apache.qpid.framing.QueuePurgeOkBody; +import org.apache.qpid.framing.QueueUnbindBody; import org.apache.qpid.nclient.amqp.AMQPCallBack; import org.apache.qpid.nclient.amqp.AMQPChannel; import org.apache.qpid.nclient.amqp.AMQPClassFactory; @@ -67,6 +69,7 @@ import org.apache.qpid.nclient.amqp.AMQPMessage; import org.apache.qpid.nclient.amqp.AMQPQueue; import org.apache.qpid.nclient.amqp.AbstractAMQPClassFactory; import org.apache.qpid.nclient.amqp.state.AMQPStateType; +import org.apache.qpid.nclient.impl.SecurityHelper; import org.apache.qpid.nclient.transport.AMQPConnectionURL; import org.apache.qpid.nclient.transport.ConnectionURL; import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType; @@ -193,7 +196,7 @@ public class TestClient // Blocking for response while (!cb.isComplete()) { - } + } } public void createAndBindQueue() throws Exception @@ -245,7 +248,7 @@ public class TestClient //Blocking for response while (!cb.isComplete()) { - } + } } public void purgeQueue() throws Exception diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java index 9bc60b658e..01404565b9 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java @@ -1,13 +1,12 @@ package org.apache.qpid.nclient.amqp.state; -import org.apache.qpid.AMQException; import org.apache.qpid.nclient.core.AMQPException; public interface AMQPStateManager { - public void addListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException; + public void addListener(AMQPStateType stateType, AMQPStateListener l)throws AMQPException; - public void removeListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException; + public void removeListener(AMQPStateType stateType, AMQPStateListener l)throws AMQPException; public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException; } \ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConnection.java new file mode 100644 index 0000000000..744b0aa89f --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConnection.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.api; + +public interface QpidConnection +{ + public static final int SESSION_EXPIRY_MAX_TIME = Integer.MAX_VALUE; + public static final int SESSION_EXPIRY_TIED_TO_CHANNEL = 0; + + public void connect(String url) throws QpidException; + + public QpidSession createSession(int expiryInSeconds) throws QpidException; + + public void close()throws QpidException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidException.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidException.java new file mode 100644 index 0000000000..f418380452 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidException.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.api; + + +public class QpidException extends Exception +{ + public QpidException(String message) + { + super(message); + } + + public QpidException(String msg, Throwable t) + { + super(msg, t); + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidExchangeHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidExchangeHelper.java new file mode 100644 index 0000000000..1636f87262 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidExchangeHelper.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.api; + +/** + * used as a helper class to support the Session class + * This reduces the clutter and makes the session class + * more readable and manageable. + * + * An implementation of this interface should take a Session + * interface as an argument in the constructor and scope all + * operations for that session. + */ + +public interface QpidExchangeHelper +{ + public void declareExchange(boolean autoDelete, boolean durable, String exchangeName, boolean internal, boolean nowait, boolean passive, + String exchangeClass) throws QpidException; + + public void deleteExchange(String exchangeName, boolean ifUnused, boolean nowait) throws QpidException; + + public void open() throws QpidException; + + public void close() throws QpidException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageConsumer.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageConsumer.java new file mode 100644 index 0000000000..41adc23727 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageConsumer.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.api; + +import java.util.concurrent.TimeUnit; + +import org.apache.qpid.nclient.message.AMQPApplicationMessage; + +public interface QpidMessageConsumer +{ + public AMQPApplicationMessage get() throws QpidException; + + public AMQPApplicationMessage receive()throws QpidException; + + public AMQPApplicationMessage receive(long timeout, TimeUnit tu)throws QpidException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageHelper.java new file mode 100644 index 0000000000..ec50d16f9e --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageHelper.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.api; + +/** + * used as a helper class to support the Session class + * This reduces the clutter and makes the session class + * more readable and manageable. + * + * An implementation of this interface should take a Session + * interface as an argument in the constructor and scope all + * operations for that session. + */ +public interface QpidMessageHelper +{ + public void declareQueue(boolean autoDelete, boolean durable, boolean exclusive,boolean nowait,boolean passive,String queueName) throws QpidException; + + public void bindQueue(String exchangeName,boolean nowait,String queueName,String routingKey)throws QpidException; + + public void unbindQueue(String exchangeName,String queueName,String routingKey)throws QpidException; + + public void purgeQueue(boolean nowait,String queueName)throws QpidException; + + public void deleteQueue(boolean ifEmpty, boolean ifUnused, boolean nowait,String queueName)throws QpidException; + + public void open() throws QpidException; + + public void close() throws QpidException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageProducer.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageProducer.java new file mode 100644 index 0000000000..684b53da42 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageProducer.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.api; + +import org.apache.qpid.nclient.message.AMQPApplicationMessage; + +public interface QpidMessageProducer +{ + public void send(boolean disableMessageId,boolean inline,AMQPApplicationMessage msg)throws QpidException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidQueueHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidQueueHelper.java new file mode 100644 index 0000000000..de4b79fc78 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidQueueHelper.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.api; + +/** + * used as a helper class to support the Session class + * This reduces the clutter and makes the session class + * more readable and manageable. + * + * An implementation of this interface should take a Session + * interface as an argument in the constructor and scope all + * operations for that session. + */ +public interface QpidQueueHelper +{ + public void declareQueue(boolean autoDelete, boolean durable, boolean exclusive,boolean nowait,boolean passive,String queueName) throws QpidException; + + public void bindQueue(String exchangeName,boolean nowait,String queueName,String routingKey)throws QpidException; + + public void unbindQueue(String exchangeName,String queueName,String routingKey)throws QpidException; + + public void purgeQueue(boolean nowait,String queueName)throws QpidException; + + public void deleteQueue(boolean ifEmpty, boolean ifUnused, boolean nowait,String queueName)throws QpidException; + + public void open() throws QpidException; + + public void close() throws QpidException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidSession.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidSession.java new file mode 100644 index 0000000000..7386cc4092 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidSession.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.api; + +/** + * The Helper classes are added in order to avoid the session class + * from having too many methods and an implementation becoming too + * complicated. Having a lengthy class can impact readability and + * manageability. + */ +public interface QpidSession +{ + + public void open() throws QpidException; + + public void close() throws QpidException; + + public void resume() throws QpidException; + + public void suspend() throws QpidException; + + public void failover() throws QpidException; + + public QpidMessageProducer createProducer() throws QpidException; + + public QpidMessageConsumer createConsumer() throws QpidException; + + public QpidMessageHelper getMessageHelper() throws QpidException; + + public QpidExchangeHelper getExchangeHelper() throws QpidException; + + public QpidQueueHelper getQueueHelper() throws QpidException; + + public QpidTransactionHelper getTransactionHelper()throws QpidException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidTransactionHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidTransactionHelper.java new file mode 100644 index 0000000000..d5c8d80ca7 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidTransactionHelper.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.api; + +/** + * used as a helper class to support the Session class + * This reduces the clutter and makes the session class + * more readable and manageable. + * + * An implementation of this interface should take a Session + * interface as an argument in the constructor and scope all + * operations for that session. + */ + +public interface QpidTransactionHelper +{ + public void commit()throws QpidException; + + public void rollback()throws QpidException; + + public void recover()throws QpidException; + + public void resume()throws QpidException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AMQPCallbackHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AMQPCallbackHelper.java new file mode 100644 index 0000000000..54ea9ac01b --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AMQPCallbackHelper.java @@ -0,0 +1,50 @@ +package org.apache.qpid.nclient.impl; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.nclient.amqp.AMQPCallBack; + +public class AMQPCallbackHelper extends AMQPCallBack +{ + private static final Logger _logger = Logger.getLogger(AMQPCallbackHelper.class); + + private AMQMethodBody _body; + private Exception _e; + private boolean _isError; + + @Override + public void brokerResponded(AMQMethodBody body) + { + _body = body; + _logger.debug("[Broker has responded " + body); + _isError = false; + this.setIsComplete(true); + } + + @Override + public void brokerRespondedWithError(AMQException e) + { + _e = e; + _logger.debug("[Broker has responded with an error" + e.getMessage(),e); + _isError = true; + this.setIsComplete(true); + } + + public AMQMethodBody getMethodBody() + { + return _body; + } + + public Exception getException() + { + return _e; + } + + public boolean isError() + { + return _isError; + } + + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java new file mode 100644 index 0000000000..d202bab843 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java @@ -0,0 +1,51 @@ +package org.apache.qpid.nclient.impl; + +import org.apache.qpid.nclient.api.QpidException; +import org.apache.qpid.nclient.core.AMQPException; + +/** + * This abstracts the error handling for open + * and close methods for a resource. This class + * eliminates the duplication of error handling + * code + */ +public abstract class AbstractResource +{ + private String _resourceName; + + public AbstractResource(String resourceName) + { + _resourceName = resourceName; + } + + public void open() throws QpidException + { + try + { + openResource(); + + } + catch(AMQPException e) + { + throw new QpidException("Error creating " + _resourceName + " due to " + e.getMessage(),e); + } + } + + public void close() throws QpidException + { + try + { + closeResource(); + + } + catch(Exception e) + { + throw new QpidException("Error destroying " + _resourceName + " due to " + e.getMessage(),e); + } + + } + + protected abstract void openResource() throws AMQPException; + + protected abstract void closeResource() throws AMQPException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/HelperTemplate.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/HelperTemplate.java new file mode 100644 index 0000000000..ecbef90298 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/HelperTemplate.java @@ -0,0 +1,39 @@ +package org.apache.qpid.nclient.impl; + +import org.apache.qpid.nclient.api.QpidException; +import org.apache.qpid.nclient.core.AMQPException; + +/** + * This class creates helper methods to avoid + * duplication of routine logic and error handling + * that can be reused. + */ +public abstract class HelperTemplate +{ + public abstract void amqpMethodCall() throws AMQPException; + + public void invokeAMQPMethodCall(String msg) throws QpidException + { + try + { + amqpMethodCall(); + } + catch(Exception e) + { + throw new QpidException(msg + e.getMessage(),e); + } + } + + public void evaulateResponse(AMQPCallbackHelper cb) throws QpidException + { + // Blocking for response + while (!cb.isComplete()) + { + } + + if (cb.isError()) + { + throw new QpidException("The broker responded with an error",cb.getException()); + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java new file mode 100644 index 0000000000..054eedfcee --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java @@ -0,0 +1,242 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.impl; + +import java.util.Map; +import java.util.StringTokenizer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; + +import org.apache.log4j.Logger; +import org.apache.qpid.common.ClientProperties; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionOpenBody; +import org.apache.qpid.framing.ConnectionOpenOkBody; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionSecureOkBody; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ConnectionStartOkBody; +import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; +import org.apache.qpid.nclient.amqp.AMQPChannel; +import org.apache.qpid.nclient.amqp.AMQPClassFactory; +import org.apache.qpid.nclient.amqp.AMQPConnection; +import org.apache.qpid.nclient.amqp.AbstractAMQPClassFactory; +import org.apache.qpid.nclient.amqp.qpid.QpidAMQPChannel; +import org.apache.qpid.nclient.amqp.state.AMQPState; +import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; +import org.apache.qpid.nclient.amqp.state.AMQPStateListener; +import org.apache.qpid.nclient.amqp.state.AMQPStateType; +import org.apache.qpid.nclient.api.QpidConnection; +import org.apache.qpid.nclient.api.QpidException; +import org.apache.qpid.nclient.api.QpidSession; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.transport.ConnectionURL; +import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType; + +/** + * + * Once the Session class is implemented the channel logic will be + * replaced by the session methods. + * + */ +public class QpidConnectionImpl implements QpidConnection, AMQPStateListener +{ + private static final Logger _logger = Logger.getLogger(QpidConnectionImpl.class); + + private byte _major; + + private byte _minor; + + private ConnectionURL _url; + + // Need a Class factory per connection + private AMQPClassFactory _classFactory; + + private int _ticket; + + private AMQPConnection _amqpConnection; + + private AtomicInteger _channelNo = new AtomicInteger(); + + private Map _sessionMap = new ConcurrentHashMap(); + + private Lock _lock = new ReentrantLock(); + + /** --------------------------------------------- + * Methods from o.a.qpid.client.Connection + * ---------------------------------------------- + */ + + public void close() + { + // handle failover + } + + public void connect(String url) throws QpidException + { + try + { + _classFactory = AbstractAMQPClassFactory.getFactoryInstance(); + } + catch(Exception e) + { + throw new QpidException("Unable to create the class factory",e); + } + + try + { + //_url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'"); + _amqpConnection = _classFactory.createConnectionClass(url, ConnectionType.TCP); + } + catch(Exception e) + { + throw new QpidException("Unable to create a connection to the broker using url " + url + " due to " + e.getMessage(),e); + } + + try + { + handleConnectionNegotiation(); + } + catch(Exception e) + { + throw new QpidException("Connection negotiation failed due to " + e.getMessage(),e); + } + } + + public QpidSession createSession(int expiryInSeconds) throws QpidException + { + AMQPChannel channel = null; + _lock.lock(); + try + { + int channelNo = _channelNo.addAndGet(1); + channel = _classFactory.createChannelClass(channelNo); + QpidSession session = new QpidSessionImpl(_classFactory,channel,channelNo, _major,_minor); + _sessionMap.put(channelNo, session); + return session; + } + catch(AMQPException e) + { + throw new QpidException("Unable to create channel class",e); + } + finally + { + _lock.unlock(); + } + } + + /** --------------------------------------------- + * Methods from AMQPStateListener + * ---------------------------------------------- + */ + public void stateChanged(AMQPStateChangedEvent event) throws AMQPException + { + String s = event.getStateType() + " changed state from " + + event.getOldState() + " to " + event.getNewState(); + + _logger.debug(s); + + if(event.getNewState() == AMQPState.CONNECTION_CLOSED) + { + //We need to notify the sessions that they need to + //kick in the fail over logic + for (Integer sessionId : _sessionMap.keySet()) + { + QpidSession session = _sessionMap.get(sessionId); + try + { + session.failover(); + } + catch(Exception e) + { + _logger.error("Error executing failover logic for session : " + sessionId, e); + } + } + } + + } + + /** --------------------------------------------- + * Helper methods + * ---------------------------------------------- + */ + + public void handleConnectionNegotiation() throws Exception + { + _classFactory.getStateManager().addListener(AMQPStateType.CONNECTION_STATE, this); + + //ConnectionStartBody + ConnectionStartBody connectionStartBody = _amqpConnection.openTCPConnection(); + _major = connectionStartBody.getMajor(); + _minor = connectionStartBody.getMinor(); + + FieldTable clientProperties = FieldTableFactory.newFieldTable(); + clientProperties.put(new AMQShortString(ClientProperties.instance.toString()), "Test"); // setting only the client id + + final String locales = new String(connectionStartBody.getLocales(), "utf8"); + final StringTokenizer tokenizer = new StringTokenizer(locales, " "); + + final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms()); + + SaslClient sc = Sasl.createSaslClient(new String[] + { mechanism }, null, "AMQP", "localhost", null, SecurityHelper.createCallbackHandler(mechanism, _url)); + + ConnectionStartOkBody connectionStartOkBody = ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties, new AMQShortString( + tokenizer.nextToken()), new AMQShortString(mechanism), (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null)); + // ConnectionSecureBody + AMQMethodBody body = _amqpConnection.startOk(connectionStartOkBody); + ConnectionTuneBody connectionTuneBody; + + if (body instanceof ConnectionSecureBody) + { + ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody) body; + ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(_major, _minor, sc + .evaluateChallenge(connectionSecureBody.getChallenge())); + //Assuming the server is not going to send another challenge + connectionTuneBody = (ConnectionTuneBody) _amqpConnection.secureOk(connectionSecureOkBody); + + } + else + { + connectionTuneBody = (ConnectionTuneBody) body; + } + + // Using broker supplied values + ConnectionTuneOkBody connectionTuneOkBody = ConnectionTuneOkBody.createMethodBody(_major, _minor, connectionTuneBody.getChannelMax(), + connectionTuneBody.getFrameMax(), connectionTuneBody.getHeartbeat()); + _amqpConnection.tuneOk(connectionTuneOkBody); + + ConnectionOpenBody connectionOpenBody = ConnectionOpenBody.createMethodBody(_major, _minor, null, true, new AMQShortString(_url + .getVirtualHost())); + + ConnectionOpenOkBody connectionOpenOkBody = _amqpConnection.open(connectionOpenBody); + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidExchangeHelperImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidExchangeHelperImpl.java new file mode 100644 index 0000000000..2643fe17e6 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidExchangeHelperImpl.java @@ -0,0 +1,101 @@ +package org.apache.qpid.nclient.impl; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeleteBody; +import org.apache.qpid.nclient.amqp.AMQPExchange; +import org.apache.qpid.nclient.api.QpidExchangeHelper; +import org.apache.qpid.nclient.api.QpidException; +import org.apache.qpid.nclient.core.AMQPException; + +/** + * The Qpid helper classes are written with JMS in mind. + * Therefore most calls a blocking calls. However the low + * level framework allows non blocking Async calls and should + * use the low level API if u need asynchrony + */ +public class QpidExchangeHelperImpl extends AbstractResource implements QpidExchangeHelper +{ + private QpidSessionImpl _session; + private AMQPExchange _exchange; + + protected QpidExchangeHelperImpl(QpidSessionImpl session) + { + super("Exchange Class"); + _session = session; + } + + /** + * ----------------------------------------------------- + * Methods introduced by QpidExchangeHelper + * ----------------------------------------------------- + */ + public void declareExchange(boolean autoDelete, boolean durable, String exchangeName,boolean internal, boolean nowait, boolean passive,String exchangeClass) throws QpidException + { + final ExchangeDeclareBody exchangeDeclareBody = ExchangeDeclareBody.createMethodBody( + _session.getMajor(), + _session.getMinor(), + null, // arguments + autoDelete, + durable, + new AMQShortString(exchangeName), + internal, + nowait, + passive, + _session.getAccessTicket(), + new AMQShortString(exchangeClass)); + + final AMQPCallbackHelper cb = new AMQPCallbackHelper(); + HelperTemplate template = new HelperTemplate(){ + + public void amqpMethodCall() throws AMQPException + { + _exchange.declare(exchangeDeclareBody, cb); + } + }; + + template.invokeAMQPMethodCall("Declare exchange failed due to"); + + template.evaulateResponse(cb); + } + + public void deleteExchange(String exchangeName, boolean ifUnused, boolean nowait) throws QpidException + { + final ExchangeDeleteBody exchangeDeclareBody = ExchangeDeleteBody.createMethodBody( + _session.getMajor(), + _session.getMinor(), + new AMQShortString(exchangeName), + ifUnused, + nowait, + _session.getAccessTicket()); + + final AMQPCallbackHelper cb = new AMQPCallbackHelper(); + HelperTemplate template = new HelperTemplate(){ + + public void amqpMethodCall() throws AMQPException + { + _exchange.delete(exchangeDeclareBody, cb); + } + }; + + template.invokeAMQPMethodCall("Delete exchange failed due to"); + + template.evaulateResponse(cb); + } + + /** + * ----------------------------------------------------- + * Methods introduced by AbstractResource + * ----------------------------------------------------- + */ + protected void openResource() throws AMQPException + { + _exchange = _session.getClassFactory().createExchangeClass(_session.getChannel()); + } + + protected void closeResource() throws AMQPException + { + _session.getClassFactory().destoryExchangeClass(_session.getChannel(), _exchange); + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java new file mode 100644 index 0000000000..22000a8506 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.impl; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.qpid.framing.Content; +import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.nclient.amqp.AMQPMessage; +import org.apache.qpid.nclient.amqp.AMQPMessageCallBack; +import org.apache.qpid.nclient.api.QpidException; +import org.apache.qpid.nclient.api.QpidMessageConsumer; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.message.AMQPApplicationMessage; +import org.apache.qpid.nclient.message.MessageHeaders; +import org.apache.qpid.nclient.message.MessageStore; +import org.apache.qpid.nclient.message.TransientMessageStore; + +public class QpidMessageConsumerImpl extends AbstractResource implements QpidMessageConsumer, AMQPMessageCallBack +{ + private MessageStore _msgStore = new TransientMessageStore(); + private final BlockingQueue _queue = new LinkedBlockingQueue(); + private QpidSessionImpl _session; + private AMQPMessage _amqpMessage; + + protected QpidMessageConsumerImpl(QpidSessionImpl session) + { + super("Message Class"); + _session = session; + } + + /** + * ----------------------------------------------- + * Methods from QpidMessageConsumer class + * ----------------------------------------------- + */ + + public AMQPApplicationMessage get() throws QpidException + { + // I want this to do a message.get + return null; + } + + public AMQPApplicationMessage receive()throws QpidException + { + return _queue.poll(); + } + + public AMQPApplicationMessage receive(long timeout, TimeUnit tu)throws QpidException + { + try + { + return _queue.poll(timeout, tu); + } + catch(Exception e) + { + throw new QpidException("Error retrieving message from queue",e); + } + } + + /** + * ----------------------------------------------- + * Abstract methods from AbstractResource class + * ----------------------------------------------- + */ + protected void openResource() throws AMQPException + { + _amqpMessage = _session.getClassFactory().createMessageClass(_session.getChannel(),null); + } + + protected void closeResource() throws AMQPException + { + _session.getClassFactory().destoryMessageClass(_session.getChannel(), _amqpMessage); + } + + /** + * ----------------------------------------------- + * Methods from AMQPMessageCallback class + * ----------------------------------------------- + */ + public void append(MessageAppendBody messageAppendBody, long correlationId) throws AMQPException + { + String reference = new String(messageAppendBody.getReference()); + AMQPApplicationMessage msg = _msgStore.getMessage(reference); + msg.addContent(messageAppendBody.getBytes()); + } + + public void checkpoint(MessageCheckpointBody messageCheckpointBody, long correlationId) throws AMQPException + { + // TODO Auto-generated method stub + } + + public void close(MessageCloseBody messageCloseBody, long correlationId) throws AMQPException + { + String reference = new String(messageCloseBody.getReference()); + AMQPApplicationMessage msg = _msgStore.getMessage(reference); + enQueue(msg); + _msgStore.removeMessage(reference); + } + + public void open(MessageOpenBody messageOpenBody, long correlationId) throws AMQPException + { + String reference = new String(messageOpenBody.getReference()); + AMQPApplicationMessage msg = new AMQPApplicationMessage(_session.getChannel(), messageOpenBody.getReference()); + _msgStore.storeMessage(reference, msg); + } + + public void recover(MessageRecoverBody messageRecoverBody, long correlationId) throws AMQPException + { + // TODO Auto-generated method stub + + } + + public void resume(MessageResumeBody messageResumeBody, long correlationId) throws AMQPException + { + // TODO Auto-generated method stub + + } + + public void transfer(MessageTransferBody messageTransferBody, long correlationId) throws AMQPException + { + MessageHeaders messageHeaders = new MessageHeaders(); + messageHeaders.setMessageId(messageTransferBody.getMessageId()); + messageHeaders.setAppId(messageTransferBody.getAppId()); + messageHeaders.setContentType(messageTransferBody.getContentType()); + messageHeaders.setEncoding(messageTransferBody.getContentEncoding()); + messageHeaders.setCorrelationId(messageTransferBody.getCorrelationId()); + messageHeaders.setDestination(messageTransferBody.getDestination()); + messageHeaders.setExchange(messageTransferBody.getExchange()); + messageHeaders.setExpiration(messageTransferBody.getExpiration()); + messageHeaders.setReplyTo(messageTransferBody.getReplyTo()); + messageHeaders.setRoutingKey(messageTransferBody.getRoutingKey()); + messageHeaders.setTransactionId(messageTransferBody.getTransactionId()); + messageHeaders.setUserId(messageTransferBody.getUserId()); + messageHeaders.setPriority(messageTransferBody.getPriority()); + messageHeaders.setDeliveryMode(messageTransferBody.getDeliveryMode()); + messageHeaders.setApplicationHeaders(messageTransferBody.getApplicationHeaders()); + + + + if (messageTransferBody.getBody().getContentType() == Content.TypeEnum.INLINE_T) + { + AMQPApplicationMessage msg = new AMQPApplicationMessage(_session.getChannel(), + correlationId, + messageHeaders, + messageTransferBody.getBody().getContentAsByteArray(), + messageTransferBody.getRedelivered()); + + enQueue(msg); + } + else + { + byte[] referenceId = messageTransferBody.getBody().getContentAsByteArray(); + AMQPApplicationMessage msg = new AMQPApplicationMessage(_session.getChannel(),referenceId); + msg.setMessageHeaders(messageHeaders); + msg.setRedeliveredFlag(messageTransferBody.getRedelivered()); + + _msgStore.storeMessage(new String(referenceId), msg); + } + } + + private void enQueue(AMQPApplicationMessage msg)throws AMQPException + { + try + { + _queue.put(msg); + } + catch(Exception e) + { + throw new AMQPException("Error queueing the messsage",e); + } + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java new file mode 100644 index 0000000000..f553743ea6 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java @@ -0,0 +1,103 @@ +package org.apache.qpid.nclient.impl; + +import java.util.UUID; + +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.Content; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.nclient.amqp.AMQPMessage; +import org.apache.qpid.nclient.api.QpidException; +import org.apache.qpid.nclient.api.QpidMessageProducer; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.message.AMQPApplicationMessage; +import org.apache.qpid.nclient.message.MessageHeaders; + +public class QpidMessageProducerImpl extends AbstractResource implements QpidMessageProducer +{ + private QpidSessionImpl _session; + private AMQPMessage _amqpMessage; + + protected QpidMessageProducerImpl(QpidSessionImpl session) + { + super("Message Class"); + _session = session; + } + + /** + * ----------------------------------------------------- + * Methods introduced by QpidMessageProducer + * ----------------------------------------------------- + */ + public void send(boolean disableMessageId,boolean inline,AMQPApplicationMessage msg)throws QpidException + { + // need to handle the inline and reference case + final MessageTransferBody messageTransferBody = prepareTransfer(disableMessageId,msg); + final AMQPCallbackHelper cb = new AMQPCallbackHelper(); + HelperTemplate template = new HelperTemplate(){ + + public void amqpMethodCall() throws AMQPException + { + _amqpMessage.transfer(messageTransferBody, cb); + } + }; + + template.invokeAMQPMethodCall("message transfer failed due to"); + + template.evaulateResponse(cb); + } + + /** + * ----------------------------------------------------- + * Methods introduced by AbstractResource + * ----------------------------------------------------- + */ + protected void openResource() throws AMQPException + { + _amqpMessage = _session.getClassFactory().createMessageClass(_session.getChannel(),null); + } + + protected void closeResource() throws AMQPException + { + _session.getClassFactory().destoryMessageClass(_session.getChannel(), _amqpMessage); + } + + /** + * ----------------------------------------------------- + * Helper Methods + * ----------------------------------------------------- + */ + private MessageTransferBody prepareTransfer(boolean disableMessageId,AMQPApplicationMessage msg) + { + MessageHeaders msgHeaders = msg.getMessageHeaders(); + MessageTransferBody messageTransferBody = MessageTransferBody.createMethodBody( + _session.getMajor(), + _session.getMinor(), + msgHeaders.getAppId(), //appId + msgHeaders.getApplicationHeaders(), //applicationHeaders + new Content(Content.TypeEnum.INLINE_T,msg.getContentsAsBytes() ), //body + msgHeaders.getContentType(), //contentEncoding, + msgHeaders.getContentType(), //contentType + msgHeaders.getCorrelationId(), //correlationId + msgHeaders.getDeliveryMode(), //deliveryMode non persistant + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange + msgHeaders.getExpiration(), //expiration + msgHeaders.isImmediate(), //immediate + msgHeaders.isMandatory(), //mandatory + (disableMessageId)?null : new AMQShortString(UUID.randomUUID().toString()), //messageId + msgHeaders.getPriority(), //priority + msg.getRedeliveredFlag(), //redelivered + msgHeaders.getReplyTo(), //replyTo + msgHeaders.getRoutingKey(), //routingKey, + "abc".getBytes(), //securityToken + _session.getAccessTicket(), //ticket + System.currentTimeMillis(), //timestamp + msgHeaders.getTransactionId(), //transactionId + msgHeaders.getTtl(), //ttl, + msgHeaders.getUserId() //userId + ); + + return messageTransferBody; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidQueueHelperImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidQueueHelperImpl.java new file mode 100644 index 0000000000..33b3c1177e --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidQueueHelperImpl.java @@ -0,0 +1,178 @@ +package org.apache.qpid.nclient.impl; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.QueuePurgeBody; +import org.apache.qpid.framing.QueueUnbindBody; +import org.apache.qpid.nclient.amqp.AMQPQueue; +import org.apache.qpid.nclient.api.QpidException; +import org.apache.qpid.nclient.api.QpidQueueHelper; +import org.apache.qpid.nclient.core.AMQPException; + +public class QpidQueueHelperImpl extends AbstractResource implements QpidQueueHelper +{ + private QpidSessionImpl _session; + private AMQPQueue _queueClass; + + protected QpidQueueHelperImpl(QpidSessionImpl session) + { + super("Queue Class"); + _session = session; + } + + /** + * ----------------------------------------------------- + * Methods introduced by QpidQueueHelper + * ----------------------------------------------------- + */ + public void bindQueue(String exchangeName, boolean nowait, String queueName, String routingKey) throws QpidException + { + final QueueBindBody queueBindBody = QueueBindBody.createMethodBody( + _session.getMajor(), + _session.getMinor(), + null, //arguments + new AMQShortString(exchangeName),//exchange + nowait, + new AMQShortString(queueName), //queue + new AMQShortString(routingKey), //routingKey + _session.getAccessTicket() + ); + + final AMQPCallbackHelper cb = new AMQPCallbackHelper(); + HelperTemplate template = new HelperTemplate(){ + + public void amqpMethodCall() throws AMQPException + { + _queueClass.bind(queueBindBody, cb); + } + }; + + template.invokeAMQPMethodCall("Queue bind failed due to"); + + template.evaulateResponse(cb); + } + + public void declareQueue(boolean autoDelete, boolean durable, boolean exclusive, boolean nowait, boolean passive, String queueName) + throws QpidException + { + final QueueDeclareBody queueDeclareBody = QueueDeclareBody.createMethodBody( + _session.getMajor(), + _session.getMinor(), + null, //arguments + autoDelete, + durable, + exclusive, + nowait, + passive, + new AMQShortString(queueName), //queue + _session.getAccessTicket() + ); + + final AMQPCallbackHelper cb = new AMQPCallbackHelper(); + HelperTemplate template = new HelperTemplate(){ + + public void amqpMethodCall() throws AMQPException + { + _queueClass.declare(queueDeclareBody, cb); + } + }; + + template.invokeAMQPMethodCall("Queue declare failed due to"); + + template.evaulateResponse(cb); + } + + public void deleteQueue(boolean ifEmpty, boolean ifUnused, boolean nowait, String queueName) throws QpidException + { + final QueueDeleteBody queueDeleteBody = QueueDeleteBody.createMethodBody( + _session.getMajor(), + _session.getMinor(), + ifEmpty, + ifUnused, + nowait, + new AMQShortString(queueName), //queue + _session.getAccessTicket() + ); + + final AMQPCallbackHelper cb = new AMQPCallbackHelper(); + HelperTemplate template = new HelperTemplate(){ + + public void amqpMethodCall() throws AMQPException + { + _queueClass.delete(queueDeleteBody, cb); + } + }; + + template.invokeAMQPMethodCall("Queue delete failed due to"); + + template.evaulateResponse(cb); + } + + public void purgeQueue(boolean nowait, String queueName) throws QpidException + { + final QueuePurgeBody queuePurgeBody = QueuePurgeBody.createMethodBody( + _session.getMajor(), + _session.getMinor(), + nowait, + new AMQShortString(queueName), //queue + _session.getAccessTicket() + ); + + final AMQPCallbackHelper cb = new AMQPCallbackHelper(); + HelperTemplate template = new HelperTemplate(){ + + public void amqpMethodCall() throws AMQPException + { + _queueClass.purge(queuePurgeBody, cb); + } + }; + + template.invokeAMQPMethodCall("Queue purge failed due to"); + + template.evaulateResponse(cb); + } + + public void unbindQueue(String exchangeName, String queueName, String routingKey) throws QpidException + { + final QueueUnbindBody queueUnbindBody = QueueUnbindBody.createMethodBody( + _session.getMajor(), + _session.getMinor(), + null, + new AMQShortString(exchangeName), + new AMQShortString(queueName), //queue + new AMQShortString(routingKey), + _session.getAccessTicket() + ); + + final AMQPCallbackHelper cb = new AMQPCallbackHelper(); + HelperTemplate template = new HelperTemplate(){ + + public void amqpMethodCall() throws AMQPException + { + _queueClass.unbind(queueUnbindBody, cb); + } + }; + + template.invokeAMQPMethodCall("Queue unbind failed due to"); + + template.evaulateResponse(cb); + } + + + /** + * ----------------------------------------------------- + * Methods introduced by AbstractResource + * ----------------------------------------------------- + */ + protected void openResource() throws AMQPException + { + _queueClass = _session.getClassFactory().createQueueClass(_session.getChannel()); + } + + protected void closeResource() throws AMQPException + { + _session.getClassFactory().destroyQueueClass(_session.getChannel(), _queueClass); + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java new file mode 100644 index 0000000000..7d3cf5f861 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java @@ -0,0 +1,259 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.impl; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelOpenBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.nclient.amqp.AMQPChannel; +import org.apache.qpid.nclient.amqp.AMQPClassFactory; +import org.apache.qpid.nclient.api.QpidConnection; +import org.apache.qpid.nclient.api.QpidExchangeHelper; +import org.apache.qpid.nclient.api.QpidMessageConsumer; +import org.apache.qpid.nclient.api.QpidMessageHelper; +import org.apache.qpid.nclient.api.QpidMessageProducer; +import org.apache.qpid.nclient.api.QpidException; +import org.apache.qpid.nclient.api.QpidQueueHelper; +import org.apache.qpid.nclient.api.QpidSession; +import org.apache.qpid.nclient.api.QpidTransactionHelper; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.protocol.AMQConstant; + +/** + * According to the 0-9 spec, the session is built on channels(1-1 map) and when a channel is closed + * the session should be closed. However with the introdution of the session class in 0-10 + * this may change. Therefore I will not implement that logic yet. + * + * Once the dust settles there will be a Failover Helper that will manage the sessions + * failover logic. + */ +public class QpidSessionImpl extends AbstractResource implements QpidSession +{ + private AMQPChannel _channelClass; + private int _channel; + private byte _major; + private byte _minor; + private AMQPClassFactory _classFactory; + private int _ticket = 0; //currently useless + private QpidExchangeHelperImpl _qpidExchangeHelper; + private QpidQueueHelperImpl _qpidQueueHelper; + private QpidMessageConsumerImpl _qpidMessageConsumer; + private QpidMessageProducerImpl _qpidMessageProducer; + private AtomicBoolean _closed; + private Lock _sessionCloseLock = new ReentrantLock(); + + // this will be used as soon as Session class is finalized + private int _expiryInSeconds = QpidConnection.SESSION_EXPIRY_TIED_TO_CHANNEL; + private QpidConnection _qpidConnection; + + public QpidSessionImpl(AMQPClassFactory classFactory,AMQPChannel channelClass,int channel,byte major, byte minor) + { + super("Session"); + _channelClass = channelClass; + _channel = channel; + _major = major; + _minor = minor; + _classFactory = classFactory; + _qpidConnection = null; + } + + /** + * ----------------------------------------------------- + * Methods introduced by AbstractResource + * ----------------------------------------------------- + */ + protected void openResource() throws AMQPException + { + // These methods will be changed to session methods + openChannel(); + } + + protected void closeResource() throws AMQPException + { + ChannelCloseBody channelCloseBody = ChannelCloseBody.createMethodBody(_major, _minor, + 0, //classId + 0, //methodId + AMQConstant.REPLY_SUCCESS.getCode(), + new AMQShortString("Qpid Client closing channel")); + + _channelClass.close(channelCloseBody); + + _classFactory.destroyChannelClass(_channel, _channelClass); + if (_qpidQueueHelper != null) + { + _qpidQueueHelper.closeResource(); + } + if (_qpidExchangeHelper != null) + { + _qpidExchangeHelper.closeResource(); + } + if(_qpidMessageConsumer != null) + { + _qpidMessageConsumer.closeResource(); + } + if(_qpidMessageProducer != null) + { + _qpidMessageProducer.closeResource(); + } + } + + /** + * ----------------------------------------------------- + * Methods introduced by QpidSession + * ----------------------------------------------------- + */ + public void close() throws QpidException + { + if (!_closed.getAndSet(true)) + { + _sessionCloseLock.lock(); + try + { + super.close(); + } + finally + { + _sessionCloseLock.unlock(); + } + } + } + + public void resume() throws QpidException + { + + } + + // not intended to be used at the jms layer + public void suspend() throws QpidException + { + + } + + public void failover() throws QpidException + { + if(_expiryInSeconds == QpidConnection.SESSION_EXPIRY_TIED_TO_CHANNEL) + { + // then close the session + } + else + { + //kick in the failover logic + } + } + + public QpidMessageConsumer createConsumer() throws QpidException + { + if (_qpidMessageConsumer == null) + { + _qpidMessageConsumer = new QpidMessageConsumerImpl(this); + _qpidMessageConsumer.open(); + } + return _qpidMessageConsumer; + } + + public QpidMessageProducer createProducer() throws QpidException + { + if (_qpidMessageProducer == null) + { + _qpidMessageProducer = new QpidMessageProducerImpl(this); + _qpidMessageProducer.open(); + } + return _qpidMessageProducer; + } + + /** ------------------------------------------ + * These helper classes are employed to reduce + * the clutter in session classes and improve + * readability + * ------------------------------------------ + */ + public QpidExchangeHelper getExchangeHelper() throws QpidException + { + if (_qpidExchangeHelper == null) + { + _qpidExchangeHelper = new QpidExchangeHelperImpl(this); + _qpidExchangeHelper.open(); + } + return _qpidExchangeHelper; + } + + public QpidMessageHelper getMessageHelper() throws QpidException + { + // TODO Auto-generated method stub + return null; + } + + public QpidQueueHelper getQueueHelper() throws QpidException + { + if (_qpidQueueHelper == null) + { + _qpidQueueHelper = new QpidQueueHelperImpl(this); + _qpidQueueHelper.open(); + } + return _qpidQueueHelper; + } + + public QpidTransactionHelper getTransactionHelper()throws QpidException + { + return null; + } + + /** ------------------------------------------ + * These protected methods are for the qpid + * implementation of the api package + * ------------------------------------------ + */ + protected byte getMinor() + { + return _minor; + } + + protected byte getMajor() + { + return _major; + } + + protected int getChannel() + { + return _channel; + } + + protected AMQPClassFactory getClassFactory() + { + return _classFactory; + } + + protected int getAccessTicket() + { + return _ticket; + } + + private void openChannel() throws AMQPException + { + ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1")); + ChannelOpenOkBody channelOpenOkBody = _channelClass.open(channelOpenBody); + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/SecurityHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/SecurityHelper.java new file mode 100644 index 0000000000..15cc347b4f --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/SecurityHelper.java @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.impl; + +import java.io.UnsupportedEncodingException; +import java.util.HashSet; +import java.util.StringTokenizer; + +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.security.AMQPCallbackHandler; +import org.apache.qpid.nclient.security.CallbackHandlerRegistry; +import org.apache.qpid.nclient.transport.ConnectionURL; + +public class SecurityHelper +{ + public static String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException + { + final String mechanisms = new String(availableMechanisms, "utf8"); + StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); + HashSet mechanismSet = new HashSet(); + while (tokenizer.hasMoreTokens()) + { + mechanismSet.add(tokenizer.nextToken()); + } + + String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms(); + StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " "); + while (prefTokenizer.hasMoreTokens()) + { + String mech = prefTokenizer.nextToken(); + if (mechanismSet.contains(mech)) + { + return mech; + } + } + return null; + } + + public static AMQPCallbackHandler createCallbackHandler(String mechanism, ConnectionURL url) + throws AMQPException + { + Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism); + try + { + Object instance = mechanismClass.newInstance(); + AMQPCallbackHandler cbh = (AMQPCallbackHandler) instance; + cbh.initialise(url); + return cbh; + } + catch (Exception e) + { + throw new AMQPException("Unable to create callback handler: " + e, e); + } + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java index d79525c5b2..79302540be 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java @@ -21,6 +21,7 @@ package org.apache.qpid.nclient.message; +import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; @@ -82,6 +83,17 @@ public class AMQPApplicationMessage { { return contents; } + + public byte[] getContentsAsBytes() + { + ByteBuffer buf = ByteBuffer.allocate(bytesReceived); + for (byte[] bytes: contents) + { + buf.put(bytes); + } + + return buf.array(); + } public long getDeliveryTag() { diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java index 562aa7b06e..f633ecff3e 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java @@ -44,7 +44,7 @@ public class MessageHeaders private AMQShortString _exchange; - private FieldTable _jmsHeaders; + private FieldTable _applicationHeaders; private short _deliveryMode; @@ -70,6 +70,12 @@ public class MessageHeaders private AMQShortString _routingKey; + private boolean _immediate; + + private boolean _mandatory; + + private long _ttl; + private int _size; public int getSize() @@ -106,19 +112,19 @@ public class MessageHeaders _encoding = encoding; } - public FieldTable getJMSHeaders() + public FieldTable getApplicationHeaders() { - if (_jmsHeaders == null) + if (_applicationHeaders == null) { - setJMSHeaders(FieldTableFactory.newFieldTable()); + setApplicationHeaders(FieldTableFactory.newFieldTable()); } - return _jmsHeaders; + return _applicationHeaders; } - public void setJMSHeaders(FieldTable headers) + public void setApplicationHeaders(FieldTable headers) { - _jmsHeaders = headers; + _applicationHeaders = headers; } @@ -227,13 +233,13 @@ public class MessageHeaders public boolean getBoolean(AMQShortString string) throws JMSException { - Boolean b = getJMSHeaders().getBoolean(string); + Boolean b = getApplicationHeaders().getBoolean(string); if (b == null) { - if (getJMSHeaders().containsKey(string)) + if (getApplicationHeaders().containsKey(string)) { - Object str = getJMSHeaders().getObject(string); + Object str = getApplicationHeaders().getObject(string); if (str == null || !(str instanceof AMQShortString)) { @@ -255,11 +261,11 @@ public class MessageHeaders public char getCharacter(AMQShortString string) throws JMSException { - Character c = getJMSHeaders().getCharacter(string); + Character c = getApplicationHeaders().getCharacter(string); if (c == null) { - if (getJMSHeaders().isNullStringValue(string.asString())) + if (getApplicationHeaders().isNullStringValue(string.asString())) { throw new NullPointerException("Cannot convert null char"); } @@ -276,7 +282,7 @@ public class MessageHeaders public byte[] getBytes(AMQShortString string) throws JMSException { - byte[] bs = getJMSHeaders().getBytes(string); + byte[] bs = getApplicationHeaders().getBytes(string); if (bs == null) { @@ -290,12 +296,12 @@ public class MessageHeaders public byte getByte(AMQShortString string) throws JMSException { - Byte b = getJMSHeaders().getByte(string); + Byte b = getApplicationHeaders().getByte(string); if (b == null) { - if (getJMSHeaders().containsKey(string)) + if (getApplicationHeaders().containsKey(string)) { - Object str = getJMSHeaders().getObject(string); + Object str = getApplicationHeaders().getObject(string); if (str == null || !(str instanceof AMQShortString)) { @@ -317,7 +323,7 @@ public class MessageHeaders public short getShort(AMQShortString string) throws JMSException { - Short s = getJMSHeaders().getShort(string); + Short s = getApplicationHeaders().getShort(string); if (s == null) { @@ -329,7 +335,7 @@ public class MessageHeaders public int getInteger(AMQShortString string) throws JMSException { - Integer i = getJMSHeaders().getInteger(string); + Integer i = getApplicationHeaders().getInteger(string); if (i == null) { @@ -341,7 +347,7 @@ public class MessageHeaders public long getLong(AMQShortString string) throws JMSException { - Long l = getJMSHeaders().getLong(string); + Long l = getApplicationHeaders().getLong(string); if (l == null) { @@ -353,13 +359,13 @@ public class MessageHeaders public float getFloat(AMQShortString string) throws JMSException { - Float f = getJMSHeaders().getFloat(string); + Float f = getApplicationHeaders().getFloat(string); if (f == null) { - if (getJMSHeaders().containsKey(string)) + if (getApplicationHeaders().containsKey(string)) { - Object str = getJMSHeaders().getObject(string); + Object str = getApplicationHeaders().getObject(string); if (str == null || !(str instanceof AMQShortString)) { @@ -382,7 +388,7 @@ public class MessageHeaders public double getDouble(AMQShortString string) throws JMSException { - Double d = getJMSHeaders().getDouble(string); + Double d = getApplicationHeaders().getDouble(string); if (d == null) { @@ -394,13 +400,13 @@ public class MessageHeaders public AMQShortString getString(AMQShortString string) throws JMSException { - AMQShortString s = new AMQShortString(getJMSHeaders().getString(string.asString())); + AMQShortString s = new AMQShortString(getApplicationHeaders().getString(string.asString())); if (s == null) { - if (getJMSHeaders().containsKey(string)) + if (getApplicationHeaders().containsKey(string)) { - Object o = getJMSHeaders().getObject(string); + Object o = getApplicationHeaders().getObject(string); if (o instanceof byte[]) { throw new MessageFormatException("getObject couldn't find " + string + " item."); @@ -424,71 +430,71 @@ public class MessageHeaders public Object getObject(AMQShortString string) throws JMSException { - return getJMSHeaders().getObject(string); + return getApplicationHeaders().getObject(string); } public void setBoolean(AMQShortString string, boolean b) throws JMSException { checkPropertyName(string); - getJMSHeaders().setBoolean(string, b); + getApplicationHeaders().setBoolean(string, b); } public void setChar(AMQShortString string, char c) throws JMSException { checkPropertyName(string); - getJMSHeaders().setChar(string, c); + getApplicationHeaders().setChar(string, c); } public Object setBytes(AMQShortString string, byte[] bytes) { - return getJMSHeaders().setBytes(string, bytes); + return getApplicationHeaders().setBytes(string, bytes); } public Object setBytes(AMQShortString string, byte[] bytes, int start, int length) { - return getJMSHeaders().setBytes(string, bytes, start, length); + return getApplicationHeaders().setBytes(string, bytes, start, length); } public void setByte(AMQShortString string, byte b) throws JMSException { checkPropertyName(string); - getJMSHeaders().setByte(string, b); + getApplicationHeaders().setByte(string, b); } public void setShort(AMQShortString string, short i) throws JMSException { checkPropertyName(string); - getJMSHeaders().setShort(string, i); + getApplicationHeaders().setShort(string, i); } public void setInteger(AMQShortString string, int i) throws JMSException { checkPropertyName(string); - getJMSHeaders().setInteger(string, i); + getApplicationHeaders().setInteger(string, i); } public void setLong(AMQShortString string, long l) throws JMSException { checkPropertyName(string); - getJMSHeaders().setLong(string, l); + getApplicationHeaders().setLong(string, l); } public void setFloat(AMQShortString string, float v) throws JMSException { checkPropertyName(string); - getJMSHeaders().setFloat(string, v); + getApplicationHeaders().setFloat(string, v); } public void setDouble(AMQShortString string, double v) throws JMSException { checkPropertyName(string); - getJMSHeaders().setDouble(string, v); + getApplicationHeaders().setDouble(string, v); } public void setString(AMQShortString string, AMQShortString string1) throws JMSException { checkPropertyName(string); - getJMSHeaders().setString(string.asString(), string1.asString()); + getApplicationHeaders().setString(string.asString(), string1.asString()); } public void setObject(AMQShortString string, Object object) throws JMSException @@ -496,7 +502,7 @@ public class MessageHeaders checkPropertyName(string); try { - getJMSHeaders().setObject(string, object); + getApplicationHeaders().setObject(string, object); } catch (AMQPInvalidClassException aice) { @@ -506,42 +512,42 @@ public class MessageHeaders public boolean itemExists(AMQShortString string) throws JMSException { - return getJMSHeaders().containsKey(string); + return getApplicationHeaders().containsKey(string); } public Enumeration getPropertyNames() { - return getJMSHeaders().getPropertyNames(); + return getApplicationHeaders().getPropertyNames(); } public void clear() { - getJMSHeaders().clear(); + getApplicationHeaders().clear(); } public boolean propertyExists(AMQShortString propertyName) { - return getJMSHeaders().propertyExists(propertyName); + return getApplicationHeaders().propertyExists(propertyName); } public Object put(Object key, Object value) { - return getJMSHeaders().setObject(key.toString(), value); + return getApplicationHeaders().setObject(key.toString(), value); } public Object remove(AMQShortString propertyName) { - return getJMSHeaders().remove(propertyName); + return getApplicationHeaders().remove(propertyName); } public boolean isEmpty() { - return getJMSHeaders().isEmpty(); + return getApplicationHeaders().isEmpty(); } public void writeToBuffer(ByteBuffer data) { - getJMSHeaders().writeToBuffer(data); + getApplicationHeaders().writeToBuffer(data); } public Enumeration getMapNames() @@ -674,6 +680,36 @@ public class MessageHeaders { this._routingKey = routingKey; } + + public boolean isImmediate() + { + return _immediate; + } + + public void setImmediate(boolean immediate) + { + this._immediate = immediate; + } + + public boolean isMandatory() + { + return _mandatory; + } + + public void setMandatory(boolean mandatory) + { + this._mandatory = mandatory; + } + + public long getTtl() + { + return _ttl; + } + + public void setTtl(long ttl) + { + this._ttl = ttl; + } } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java index efd7264f96..4419a662dd 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java @@ -1,16 +1,16 @@ package org.apache.qpid.nclient.message; -import org.apache.qpid.AMQException; +import org.apache.qpid.nclient.core.AMQPException; public interface MessageStore { - public void removeMessage(String identifier); + public void removeMessage(String identifier) throws AMQPException; - public void storeContentBodyChunk(String identifier,byte[] contentBody) throws AMQException; + public void storeContentBodyChunk(String identifier,byte[] contentBody) throws AMQPException; - public void storeMessageMetaData(String identifier, MessageHeaders messageHeaders) throws AMQException; + public void storeMessageMetaData(String identifier, MessageHeaders messageHeaders) throws AMQPException; - public AMQPApplicationMessage getMessage(String identifier) throws AMQException; + public AMQPApplicationMessage getMessage(String identifier) throws AMQPException; - public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException; + public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQPException; } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java index eb5a9c1778..13c01acb07 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java @@ -3,37 +3,48 @@ package org.apache.qpid.nclient.message; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.apache.qpid.AMQException; +import org.apache.qpid.nclient.core.AMQPException; public class TransientMessageStore implements MessageStore { - private Map messageMap = new ConcurrentHashMap(); + private Map _messageMap = new ConcurrentHashMap(); public AMQPApplicationMessage getMessage(String identifier) - throws AMQException + throws AMQPException { - return messageMap.get(identifier); + if (!_messageMap.containsKey(identifier)) + { + throw new AMQPException("identifier not found " + identifier); + } + + return _messageMap.get(identifier); } - public void removeMessage(String identifier) + public void removeMessage(String identifier) throws AMQPException { - messageMap.remove(identifier); + if (!_messageMap.containsKey(identifier)) + { + throw new AMQPException("identifier not found " + identifier); + } + _messageMap.remove(identifier); } public void storeContentBodyChunk(String identifier, byte[] contentBody) - throws AMQException + throws AMQPException { - + AMQPApplicationMessage msg = _messageMap.get(identifier); + msg.addContent(contentBody); } public void storeMessageMetaData(String identifier, - MessageHeaders messageHeaders) throws AMQException + MessageHeaders messageHeaders) throws AMQPException { - + AMQPApplicationMessage msg = _messageMap.get(identifier); + msg.setMessageHeaders(messageHeaders); } - public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException + public void storeMessage(String identifier,AMQPApplicationMessage msg)throws AMQPException { - + _messageMap.put(identifier, msg); } } -- cgit v1.2.1