summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-05-28 20:26:55 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-05-28 20:26:55 +0000
commit48f73ce3731366fd1b014faa3bbaa2820f41e8bb (patch)
treeb800ea7fe2e589aee1f48b2eae2c078a675fbc31
parent92b10f932cabb33924d2249e1e2270246d56cca0 (diff)
downloadqpid-python-48f73ce3731366fd1b014faa3bbaa2820f41e8bb.tar.gz
This is the initial checkin for the Qpid java client which is built on top of the AMQP protocol level client/framework
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@542314 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/pom.xml5
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java12
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java64
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java5
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java7
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java5
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConnection.java33
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidException.java35
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidExchangeHelper.java43
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageConsumer.java34
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageHelper.java47
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageProducer.java28
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidQueueHelper.java47
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidSession.java53
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidTransactionHelper.java42
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/AMQPCallbackHelper.java50
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java51
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/HelperTemplate.java39
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java242
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidExchangeHelperImpl.java101
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java200
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java103
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidQueueHelperImpl.java178
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java259
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/SecurityHelper.java (renamed from java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java)2
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java12
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java130
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java12
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java35
29 files changed, 1757 insertions, 117 deletions
diff --git a/java/client/pom.xml b/java/client/pom.xml
index 6d6bb7cae2..854428fb39 100644
--- a/java/client/pom.xml
+++ b/java/client/pom.xml
@@ -53,11 +53,6 @@
</dependency>
<dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-jta_1.1_spec</artifactId>
- </dependency>
-
- <dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
index 4976daa4fa..a310941d18 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
@@ -21,10 +21,6 @@
package org.apache.qpid.nclient.amqp;
import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
-import org.apache.qpid.nclient.amqp.qpid.QpidAMQPChannel;
-import org.apache.qpid.nclient.amqp.qpid.QpidAMQPExchange;
-import org.apache.qpid.nclient.amqp.qpid.QpidAMQPMessage;
-import org.apache.qpid.nclient.amqp.qpid.QpidAMQPQueue;
import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.transport.ConnectionURL;
@@ -40,19 +36,19 @@ public interface AMQPClassFactory
public abstract AMQPChannel createChannelClass(int channel) throws AMQPException;
- public abstract void destroyChannelClass(int channel, QpidAMQPChannel amqpChannel) throws AMQPException;
+ public abstract void destroyChannelClass(int channel, AMQPChannel amqpChannel) throws AMQPException;
public abstract AMQPExchange createExchangeClass(int channel) throws AMQPException;
- public abstract void destoryExchangeClass(int channel, QpidAMQPExchange amqpExchange) throws AMQPException;
+ public abstract void destoryExchangeClass(int channel, AMQPExchange amqpExchange) throws AMQPException;
public abstract AMQPQueue createQueueClass(int channel) throws AMQPException;
- public abstract void destroyQueueClass(int channel, QpidAMQPQueue amqpQueue) throws AMQPException;
+ public abstract void destroyQueueClass(int channel, AMQPQueue amqpQueue) throws AMQPException;
public abstract AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException;
- public abstract void destoryMessageClass(int channel, QpidAMQPMessage amqpMessage) throws AMQPException;
+ public abstract void destoryMessageClass(int channel, AMQPMessage amqpMessage) throws AMQPException;
/**
* Extention point
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java
index 809aa57dab..a38469def5 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java
@@ -152,14 +152,15 @@ public class QpidAMQPClassFactory implements AMQPClassFactory
/* (non-Javadoc)
* @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destroyChannelClass(int, org.apache.qpid.nclient.amqp.AMQPChannel)
*/
- public void destroyChannelClass(int channel, QpidAMQPChannel amqpChannel) throws AMQPException
+ public void destroyChannelClass(int channel, AMQPChannel amqpChannel) throws AMQPException
{
- _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
+ QpidAMQPChannel qpidAMQPChannel = (QpidAMQPChannel)amqpChannel;
+ _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, qpidAMQPChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, qpidAMQPChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, qpidAMQPChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, qpidAMQPChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, qpidAMQPChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelOkBody.class, qpidAMQPChannel);
}
/* (non-Javadoc)
@@ -177,10 +178,11 @@ public class QpidAMQPClassFactory implements AMQPClassFactory
/* (non-Javadoc)
* @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destoryExchangeClass(int, org.apache.qpid.nclient.amqp.AMQPExchange)
*/
- public void destoryExchangeClass(int channel, QpidAMQPExchange amqpExchange) throws AMQPException
+ public void destoryExchangeClass(int channel, AMQPExchange amqpExchange) throws AMQPException
{
- _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
- _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
+ QpidAMQPExchange qpidAMQPExchange = (QpidAMQPExchange)amqpExchange;
+ _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, qpidAMQPExchange);
+ _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, qpidAMQPExchange);
}
/* (non-Javadoc)
@@ -201,13 +203,14 @@ public class QpidAMQPClassFactory implements AMQPClassFactory
/* (non-Javadoc)
* @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destroyQueueClass(int, org.apache.qpid.nclient.amqp.AMQPQueue)
*/
- public void destroyQueueClass(int channel, QpidAMQPQueue amqpQueue) throws AMQPException
+ public void destroyQueueClass(int channel, AMQPQueue amqpQueue) throws AMQPException
{
- _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
+ QpidAMQPQueue qpidAMQPQueue = (QpidAMQPQueue)amqpQueue;
+ _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, qpidAMQPQueue);
+ _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, qpidAMQPQueue);
+ _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, qpidAMQPQueue);
+ _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, qpidAMQPQueue);
+ _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, qpidAMQPQueue);
}
/* (non-Javadoc)
@@ -237,21 +240,22 @@ public class QpidAMQPClassFactory implements AMQPClassFactory
/* (non-Javadoc)
* @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destoryMessageClass(int, org.apache.qpid.nclient.amqp.AMQPMessage)
*/
- public void destoryMessageClass(int channel, QpidAMQPMessage amqpMessage) throws AMQPException
+ public void destoryMessageClass(int channel, AMQPMessage amqpMessage) throws AMQPException
{
- _eventManager.removeMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageGetBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageOkBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageQosBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
+ QpidAMQPMessage qpidAMQPMessage = (QpidAMQPMessage) amqpMessage;
+ _eventManager.removeMethodEventListener(channel, MessageAppendBody.class, qpidAMQPMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCancelBody.class, qpidAMQPMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, qpidAMQPMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCloseBody.class, qpidAMQPMessage);
+ _eventManager.removeMethodEventListener(channel, MessageGetBody.class, qpidAMQPMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, qpidAMQPMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOkBody.class, qpidAMQPMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOpenBody.class, qpidAMQPMessage);
+ _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, qpidAMQPMessage);
+ _eventManager.removeMethodEventListener(channel, MessageRejectBody.class, qpidAMQPMessage);
+ _eventManager.removeMethodEventListener(channel, MessageResumeBody.class, qpidAMQPMessage);
+ _eventManager.removeMethodEventListener(channel, MessageQosBody.class, qpidAMQPMessage);
+ _eventManager.removeMethodEventListener(channel, MessageTransferBody.class, qpidAMQPMessage);
}
//This class should register as a state listener for AMQPConnection
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java
index 3fe1ee4cfd..196b441308 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java
@@ -26,7 +26,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
import org.apache.qpid.nclient.amqp.state.AMQPStateListener;
import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
@@ -40,7 +39,7 @@ public class QpidStateManager implements AMQPStateManager
private Map<AMQPStateType, List<AMQPStateListener>> _listernerMap = new ConcurrentHashMap<AMQPStateType, List<AMQPStateListener>>();
- public void addListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException
+ public void addListener(AMQPStateType stateType, AMQPStateListener l) throws AMQPException
{
List<AMQPStateListener> list;
if(_listernerMap.containsKey(stateType))
@@ -55,7 +54,7 @@ public class QpidStateManager implements AMQPStateManager
list.add(l);
}
- public void removeListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException
+ public void removeListener(AMQPStateType stateType, AMQPStateListener l) throws AMQPException
{
if(_listernerMap.containsKey(stateType))
{
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
index 3dce1cde1e..267225afec 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
@@ -45,6 +45,7 @@ import org.apache.qpid.framing.ConnectionTuneBody;
import org.apache.qpid.framing.ConnectionTuneOkBody;
import org.apache.qpid.framing.Content;
import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeleteBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MessageCancelBody;
@@ -58,6 +59,7 @@ import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.QueueDeleteOkBody;
import org.apache.qpid.framing.QueuePurgeBody;
import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.framing.QueueUnbindBody;
import org.apache.qpid.nclient.amqp.AMQPCallBack;
import org.apache.qpid.nclient.amqp.AMQPChannel;
import org.apache.qpid.nclient.amqp.AMQPClassFactory;
@@ -67,6 +69,7 @@ import org.apache.qpid.nclient.amqp.AMQPMessage;
import org.apache.qpid.nclient.amqp.AMQPQueue;
import org.apache.qpid.nclient.amqp.AbstractAMQPClassFactory;
import org.apache.qpid.nclient.amqp.state.AMQPStateType;
+import org.apache.qpid.nclient.impl.SecurityHelper;
import org.apache.qpid.nclient.transport.AMQPConnectionURL;
import org.apache.qpid.nclient.transport.ConnectionURL;
import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
@@ -193,7 +196,7 @@ public class TestClient
// Blocking for response
while (!cb.isComplete())
{
- }
+ }
}
public void createAndBindQueue() throws Exception
@@ -245,7 +248,7 @@ public class TestClient
//Blocking for response
while (!cb.isComplete())
{
- }
+ }
}
public void purgeQueue() throws Exception
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java
index 9bc60b658e..01404565b9 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java
@@ -1,13 +1,12 @@
package org.apache.qpid.nclient.amqp.state;
-import org.apache.qpid.AMQException;
import org.apache.qpid.nclient.core.AMQPException;
public interface AMQPStateManager
{
- public void addListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException;
+ public void addListener(AMQPStateType stateType, AMQPStateListener l)throws AMQPException;
- public void removeListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException;
+ public void removeListener(AMQPStateType stateType, AMQPStateListener l)throws AMQPException;
public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException;
} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConnection.java
new file mode 100644
index 0000000000..744b0aa89f
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConnection.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.api;
+
+public interface QpidConnection
+{
+ public static final int SESSION_EXPIRY_MAX_TIME = Integer.MAX_VALUE;
+ public static final int SESSION_EXPIRY_TIED_TO_CHANNEL = 0;
+
+ public void connect(String url) throws QpidException;
+
+ public QpidSession createSession(int expiryInSeconds) throws QpidException;
+
+ public void close()throws QpidException;
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidException.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidException.java
new file mode 100644
index 0000000000..f418380452
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidException.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.api;
+
+
+public class QpidException extends Exception
+{
+ public QpidException(String message)
+ {
+ super(message);
+ }
+
+ public QpidException(String msg, Throwable t)
+ {
+ super(msg, t);
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidExchangeHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidExchangeHelper.java
new file mode 100644
index 0000000000..1636f87262
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidExchangeHelper.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.api;
+
+/**
+ * used as a helper class to support the Session class
+ * This reduces the clutter and makes the session class
+ * more readable and manageable.
+ *
+ * An implementation of this interface should take a Session
+ * interface as an argument in the constructor and scope all
+ * operations for that session.
+ */
+
+public interface QpidExchangeHelper
+{
+ public void declareExchange(boolean autoDelete, boolean durable, String exchangeName, boolean internal, boolean nowait, boolean passive,
+ String exchangeClass) throws QpidException;
+
+ public void deleteExchange(String exchangeName, boolean ifUnused, boolean nowait) throws QpidException;
+
+ public void open() throws QpidException;
+
+ public void close() throws QpidException;
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageConsumer.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageConsumer.java
new file mode 100644
index 0000000000..41adc23727
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageConsumer.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.api;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.nclient.message.AMQPApplicationMessage;
+
+public interface QpidMessageConsumer
+{
+ public AMQPApplicationMessage get() throws QpidException;
+
+ public AMQPApplicationMessage receive()throws QpidException;
+
+ public AMQPApplicationMessage receive(long timeout, TimeUnit tu)throws QpidException;
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageHelper.java
new file mode 100644
index 0000000000..ec50d16f9e
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageHelper.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.api;
+
+/**
+ * used as a helper class to support the Session class
+ * This reduces the clutter and makes the session class
+ * more readable and manageable.
+ *
+ * An implementation of this interface should take a Session
+ * interface as an argument in the constructor and scope all
+ * operations for that session.
+ */
+public interface QpidMessageHelper
+{
+ public void declareQueue(boolean autoDelete, boolean durable, boolean exclusive,boolean nowait,boolean passive,String queueName) throws QpidException;
+
+ public void bindQueue(String exchangeName,boolean nowait,String queueName,String routingKey)throws QpidException;
+
+ public void unbindQueue(String exchangeName,String queueName,String routingKey)throws QpidException;
+
+ public void purgeQueue(boolean nowait,String queueName)throws QpidException;
+
+ public void deleteQueue(boolean ifEmpty, boolean ifUnused, boolean nowait,String queueName)throws QpidException;
+
+ public void open() throws QpidException;
+
+ public void close() throws QpidException;
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageProducer.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageProducer.java
new file mode 100644
index 0000000000..684b53da42
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidMessageProducer.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.api;
+
+import org.apache.qpid.nclient.message.AMQPApplicationMessage;
+
+public interface QpidMessageProducer
+{
+ public void send(boolean disableMessageId,boolean inline,AMQPApplicationMessage msg)throws QpidException;
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidQueueHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidQueueHelper.java
new file mode 100644
index 0000000000..de4b79fc78
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidQueueHelper.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.api;
+
+/**
+ * used as a helper class to support the Session class
+ * This reduces the clutter and makes the session class
+ * more readable and manageable.
+ *
+ * An implementation of this interface should take a Session
+ * interface as an argument in the constructor and scope all
+ * operations for that session.
+ */
+public interface QpidQueueHelper
+{
+ public void declareQueue(boolean autoDelete, boolean durable, boolean exclusive,boolean nowait,boolean passive,String queueName) throws QpidException;
+
+ public void bindQueue(String exchangeName,boolean nowait,String queueName,String routingKey)throws QpidException;
+
+ public void unbindQueue(String exchangeName,String queueName,String routingKey)throws QpidException;
+
+ public void purgeQueue(boolean nowait,String queueName)throws QpidException;
+
+ public void deleteQueue(boolean ifEmpty, boolean ifUnused, boolean nowait,String queueName)throws QpidException;
+
+ public void open() throws QpidException;
+
+ public void close() throws QpidException;
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidSession.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidSession.java
new file mode 100644
index 0000000000..7386cc4092
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidSession.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.api;
+
+/**
+ * The Helper classes are added in order to avoid the session class
+ * from having too many methods and an implementation becoming too
+ * complicated. Having a lengthy class can impact readability and
+ * manageability.
+ */
+public interface QpidSession
+{
+
+ public void open() throws QpidException;
+
+ public void close() throws QpidException;
+
+ public void resume() throws QpidException;
+
+ public void suspend() throws QpidException;
+
+ public void failover() throws QpidException;
+
+ public QpidMessageProducer createProducer() throws QpidException;
+
+ public QpidMessageConsumer createConsumer() throws QpidException;
+
+ public QpidMessageHelper getMessageHelper() throws QpidException;
+
+ public QpidExchangeHelper getExchangeHelper() throws QpidException;
+
+ public QpidQueueHelper getQueueHelper() throws QpidException;
+
+ public QpidTransactionHelper getTransactionHelper()throws QpidException;
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidTransactionHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidTransactionHelper.java
new file mode 100644
index 0000000000..d5c8d80ca7
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidTransactionHelper.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.api;
+
+/**
+ * used as a helper class to support the Session class
+ * This reduces the clutter and makes the session class
+ * more readable and manageable.
+ *
+ * An implementation of this interface should take a Session
+ * interface as an argument in the constructor and scope all
+ * operations for that session.
+ */
+
+public interface QpidTransactionHelper
+{
+ public void commit()throws QpidException;
+
+ public void rollback()throws QpidException;
+
+ public void recover()throws QpidException;
+
+ public void resume()throws QpidException;
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AMQPCallbackHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AMQPCallbackHelper.java
new file mode 100644
index 0000000000..54ea9ac01b
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AMQPCallbackHelper.java
@@ -0,0 +1,50 @@
+package org.apache.qpid.nclient.impl;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.amqp.AMQPCallBack;
+
+public class AMQPCallbackHelper extends AMQPCallBack
+{
+ private static final Logger _logger = Logger.getLogger(AMQPCallbackHelper.class);
+
+ private AMQMethodBody _body;
+ private Exception _e;
+ private boolean _isError;
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ _body = body;
+ _logger.debug("[Broker has responded " + body);
+ _isError = false;
+ this.setIsComplete(true);
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ _e = e;
+ _logger.debug("[Broker has responded with an error" + e.getMessage(),e);
+ _isError = true;
+ this.setIsComplete(true);
+ }
+
+ public AMQMethodBody getMethodBody()
+ {
+ return _body;
+ }
+
+ public Exception getException()
+ {
+ return _e;
+ }
+
+ public boolean isError()
+ {
+ return _isError;
+ }
+
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java
new file mode 100644
index 0000000000..d202bab843
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java
@@ -0,0 +1,51 @@
+package org.apache.qpid.nclient.impl;
+
+import org.apache.qpid.nclient.api.QpidException;
+import org.apache.qpid.nclient.core.AMQPException;
+
+/**
+ * This abstracts the error handling for open
+ * and close methods for a resource. This class
+ * eliminates the duplication of error handling
+ * code
+ */
+public abstract class AbstractResource
+{
+ private String _resourceName;
+
+ public AbstractResource(String resourceName)
+ {
+ _resourceName = resourceName;
+ }
+
+ public void open() throws QpidException
+ {
+ try
+ {
+ openResource();
+
+ }
+ catch(AMQPException e)
+ {
+ throw new QpidException("Error creating " + _resourceName + " due to " + e.getMessage(),e);
+ }
+ }
+
+ public void close() throws QpidException
+ {
+ try
+ {
+ closeResource();
+
+ }
+ catch(Exception e)
+ {
+ throw new QpidException("Error destroying " + _resourceName + " due to " + e.getMessage(),e);
+ }
+
+ }
+
+ protected abstract void openResource() throws AMQPException;
+
+ protected abstract void closeResource() throws AMQPException;
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/HelperTemplate.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/HelperTemplate.java
new file mode 100644
index 0000000000..ecbef90298
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/HelperTemplate.java
@@ -0,0 +1,39 @@
+package org.apache.qpid.nclient.impl;
+
+import org.apache.qpid.nclient.api.QpidException;
+import org.apache.qpid.nclient.core.AMQPException;
+
+/**
+ * This class creates helper methods to avoid
+ * duplication of routine logic and error handling
+ * that can be reused.
+ */
+public abstract class HelperTemplate
+{
+ public abstract void amqpMethodCall() throws AMQPException;
+
+ public void invokeAMQPMethodCall(String msg) throws QpidException
+ {
+ try
+ {
+ amqpMethodCall();
+ }
+ catch(Exception e)
+ {
+ throw new QpidException(msg + e.getMessage(),e);
+ }
+ }
+
+ public void evaulateResponse(AMQPCallbackHelper cb) throws QpidException
+ {
+ // Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+
+ if (cb.isError())
+ {
+ throw new QpidException("The broker responded with an error",cb.getException());
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java
new file mode 100644
index 0000000000..054eedfcee
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java
@@ -0,0 +1,242 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.impl;
+
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.nclient.amqp.AMQPChannel;
+import org.apache.qpid.nclient.amqp.AMQPClassFactory;
+import org.apache.qpid.nclient.amqp.AMQPConnection;
+import org.apache.qpid.nclient.amqp.AbstractAMQPClassFactory;
+import org.apache.qpid.nclient.amqp.qpid.QpidAMQPChannel;
+import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
+import org.apache.qpid.nclient.amqp.state.AMQPStateListener;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
+import org.apache.qpid.nclient.api.QpidConnection;
+import org.apache.qpid.nclient.api.QpidException;
+import org.apache.qpid.nclient.api.QpidSession;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.transport.ConnectionURL;
+import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
+
+/**
+ *
+ * Once the Session class is implemented the channel logic will be
+ * replaced by the session methods.
+ *
+ */
+public class QpidConnectionImpl implements QpidConnection, AMQPStateListener
+{
+ private static final Logger _logger = Logger.getLogger(QpidConnectionImpl.class);
+
+ private byte _major;
+
+ private byte _minor;
+
+ private ConnectionURL _url;
+
+ // Need a Class factory per connection
+ private AMQPClassFactory _classFactory;
+
+ private int _ticket;
+
+ private AMQPConnection _amqpConnection;
+
+ private AtomicInteger _channelNo = new AtomicInteger();
+
+ private Map<Integer,QpidSession> _sessionMap = new ConcurrentHashMap<Integer,QpidSession>();
+
+ private Lock _lock = new ReentrantLock();
+
+ /** ---------------------------------------------
+ * Methods from o.a.qpid.client.Connection
+ * ----------------------------------------------
+ */
+
+ public void close()
+ {
+ // handle failover
+ }
+
+ public void connect(String url) throws QpidException
+ {
+ try
+ {
+ _classFactory = AbstractAMQPClassFactory.getFactoryInstance();
+ }
+ catch(Exception e)
+ {
+ throw new QpidException("Unable to create the class factory",e);
+ }
+
+ try
+ {
+ //_url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'");
+ _amqpConnection = _classFactory.createConnectionClass(url, ConnectionType.TCP);
+ }
+ catch(Exception e)
+ {
+ throw new QpidException("Unable to create a connection to the broker using url " + url + " due to " + e.getMessage(),e);
+ }
+
+ try
+ {
+ handleConnectionNegotiation();
+ }
+ catch(Exception e)
+ {
+ throw new QpidException("Connection negotiation failed due to " + e.getMessage(),e);
+ }
+ }
+
+ public QpidSession createSession(int expiryInSeconds) throws QpidException
+ {
+ AMQPChannel channel = null;
+ _lock.lock();
+ try
+ {
+ int channelNo = _channelNo.addAndGet(1);
+ channel = _classFactory.createChannelClass(channelNo);
+ QpidSession session = new QpidSessionImpl(_classFactory,channel,channelNo, _major,_minor);
+ _sessionMap.put(channelNo, session);
+ return session;
+ }
+ catch(AMQPException e)
+ {
+ throw new QpidException("Unable to create channel class",e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /** ---------------------------------------------
+ * Methods from AMQPStateListener
+ * ----------------------------------------------
+ */
+ public void stateChanged(AMQPStateChangedEvent event) throws AMQPException
+ {
+ String s = event.getStateType() + " changed state from " +
+ event.getOldState() + " to " + event.getNewState();
+
+ _logger.debug(s);
+
+ if(event.getNewState() == AMQPState.CONNECTION_CLOSED)
+ {
+ //We need to notify the sessions that they need to
+ //kick in the fail over logic
+ for (Integer sessionId : _sessionMap.keySet())
+ {
+ QpidSession session = _sessionMap.get(sessionId);
+ try
+ {
+ session.failover();
+ }
+ catch(Exception e)
+ {
+ _logger.error("Error executing failover logic for session : " + sessionId, e);
+ }
+ }
+ }
+
+ }
+
+ /** ---------------------------------------------
+ * Helper methods
+ * ----------------------------------------------
+ */
+
+ public void handleConnectionNegotiation() throws Exception
+ {
+ _classFactory.getStateManager().addListener(AMQPStateType.CONNECTION_STATE, this);
+
+ //ConnectionStartBody
+ ConnectionStartBody connectionStartBody = _amqpConnection.openTCPConnection();
+ _major = connectionStartBody.getMajor();
+ _minor = connectionStartBody.getMinor();
+
+ FieldTable clientProperties = FieldTableFactory.newFieldTable();
+ clientProperties.put(new AMQShortString(ClientProperties.instance.toString()), "Test"); // setting only the client id
+
+ final String locales = new String(connectionStartBody.getLocales(), "utf8");
+ final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
+
+ final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms());
+
+ SaslClient sc = Sasl.createSaslClient(new String[]
+ { mechanism }, null, "AMQP", "localhost", null, SecurityHelper.createCallbackHandler(mechanism, _url));
+
+ ConnectionStartOkBody connectionStartOkBody = ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties, new AMQShortString(
+ tokenizer.nextToken()), new AMQShortString(mechanism), (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null));
+ // ConnectionSecureBody
+ AMQMethodBody body = _amqpConnection.startOk(connectionStartOkBody);
+ ConnectionTuneBody connectionTuneBody;
+
+ if (body instanceof ConnectionSecureBody)
+ {
+ ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody) body;
+ ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(_major, _minor, sc
+ .evaluateChallenge(connectionSecureBody.getChallenge()));
+ //Assuming the server is not going to send another challenge
+ connectionTuneBody = (ConnectionTuneBody) _amqpConnection.secureOk(connectionSecureOkBody);
+
+ }
+ else
+ {
+ connectionTuneBody = (ConnectionTuneBody) body;
+ }
+
+ // Using broker supplied values
+ ConnectionTuneOkBody connectionTuneOkBody = ConnectionTuneOkBody.createMethodBody(_major, _minor, connectionTuneBody.getChannelMax(),
+ connectionTuneBody.getFrameMax(), connectionTuneBody.getHeartbeat());
+ _amqpConnection.tuneOk(connectionTuneOkBody);
+
+ ConnectionOpenBody connectionOpenBody = ConnectionOpenBody.createMethodBody(_major, _minor, null, true, new AMQShortString(_url
+ .getVirtualHost()));
+
+ ConnectionOpenOkBody connectionOpenOkBody = _amqpConnection.open(connectionOpenBody);
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidExchangeHelperImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidExchangeHelperImpl.java
new file mode 100644
index 0000000000..2643fe17e6
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidExchangeHelperImpl.java
@@ -0,0 +1,101 @@
+package org.apache.qpid.nclient.impl;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeleteBody;
+import org.apache.qpid.nclient.amqp.AMQPExchange;
+import org.apache.qpid.nclient.api.QpidExchangeHelper;
+import org.apache.qpid.nclient.api.QpidException;
+import org.apache.qpid.nclient.core.AMQPException;
+
+/**
+ * The Qpid helper classes are written with JMS in mind.
+ * Therefore most calls a blocking calls. However the low
+ * level framework allows non blocking Async calls and should
+ * use the low level API if u need asynchrony
+ */
+public class QpidExchangeHelperImpl extends AbstractResource implements QpidExchangeHelper
+{
+ private QpidSessionImpl _session;
+ private AMQPExchange _exchange;
+
+ protected QpidExchangeHelperImpl(QpidSessionImpl session)
+ {
+ super("Exchange Class");
+ _session = session;
+ }
+
+ /**
+ * -----------------------------------------------------
+ * Methods introduced by QpidExchangeHelper
+ * -----------------------------------------------------
+ */
+ public void declareExchange(boolean autoDelete, boolean durable, String exchangeName,boolean internal, boolean nowait, boolean passive,String exchangeClass) throws QpidException
+ {
+ final ExchangeDeclareBody exchangeDeclareBody = ExchangeDeclareBody.createMethodBody(
+ _session.getMajor(),
+ _session.getMinor(),
+ null, // arguments
+ autoDelete,
+ durable,
+ new AMQShortString(exchangeName),
+ internal,
+ nowait,
+ passive,
+ _session.getAccessTicket(),
+ new AMQShortString(exchangeClass));
+
+ final AMQPCallbackHelper cb = new AMQPCallbackHelper();
+ HelperTemplate template = new HelperTemplate(){
+
+ public void amqpMethodCall() throws AMQPException
+ {
+ _exchange.declare(exchangeDeclareBody, cb);
+ }
+ };
+
+ template.invokeAMQPMethodCall("Declare exchange failed due to");
+
+ template.evaulateResponse(cb);
+ }
+
+ public void deleteExchange(String exchangeName, boolean ifUnused, boolean nowait) throws QpidException
+ {
+ final ExchangeDeleteBody exchangeDeclareBody = ExchangeDeleteBody.createMethodBody(
+ _session.getMajor(),
+ _session.getMinor(),
+ new AMQShortString(exchangeName),
+ ifUnused,
+ nowait,
+ _session.getAccessTicket());
+
+ final AMQPCallbackHelper cb = new AMQPCallbackHelper();
+ HelperTemplate template = new HelperTemplate(){
+
+ public void amqpMethodCall() throws AMQPException
+ {
+ _exchange.delete(exchangeDeclareBody, cb);
+ }
+ };
+
+ template.invokeAMQPMethodCall("Delete exchange failed due to");
+
+ template.evaulateResponse(cb);
+ }
+
+ /**
+ * -----------------------------------------------------
+ * Methods introduced by AbstractResource
+ * -----------------------------------------------------
+ */
+ protected void openResource() throws AMQPException
+ {
+ _exchange = _session.getClassFactory().createExchangeClass(_session.getChannel());
+ }
+
+ protected void closeResource() throws AMQPException
+ {
+ _session.getClassFactory().destoryExchangeClass(_session.getChannel(), _exchange);
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java
new file mode 100644
index 0000000000..22000a8506
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java
@@ -0,0 +1,200 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.impl;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.nclient.amqp.AMQPMessage;
+import org.apache.qpid.nclient.amqp.AMQPMessageCallBack;
+import org.apache.qpid.nclient.api.QpidException;
+import org.apache.qpid.nclient.api.QpidMessageConsumer;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.message.AMQPApplicationMessage;
+import org.apache.qpid.nclient.message.MessageHeaders;
+import org.apache.qpid.nclient.message.MessageStore;
+import org.apache.qpid.nclient.message.TransientMessageStore;
+
+public class QpidMessageConsumerImpl extends AbstractResource implements QpidMessageConsumer, AMQPMessageCallBack
+{
+ private MessageStore _msgStore = new TransientMessageStore();
+ private final BlockingQueue<AMQPApplicationMessage> _queue = new LinkedBlockingQueue<AMQPApplicationMessage>();
+ private QpidSessionImpl _session;
+ private AMQPMessage _amqpMessage;
+
+ protected QpidMessageConsumerImpl(QpidSessionImpl session)
+ {
+ super("Message Class");
+ _session = session;
+ }
+
+ /**
+ * -----------------------------------------------
+ * Methods from QpidMessageConsumer class
+ * -----------------------------------------------
+ */
+
+ public AMQPApplicationMessage get() throws QpidException
+ {
+ // I want this to do a message.get
+ return null;
+ }
+
+ public AMQPApplicationMessage receive()throws QpidException
+ {
+ return _queue.poll();
+ }
+
+ public AMQPApplicationMessage receive(long timeout, TimeUnit tu)throws QpidException
+ {
+ try
+ {
+ return _queue.poll(timeout, tu);
+ }
+ catch(Exception e)
+ {
+ throw new QpidException("Error retrieving message from queue",e);
+ }
+ }
+
+ /**
+ * -----------------------------------------------
+ * Abstract methods from AbstractResource class
+ * -----------------------------------------------
+ */
+ protected void openResource() throws AMQPException
+ {
+ _amqpMessage = _session.getClassFactory().createMessageClass(_session.getChannel(),null);
+ }
+
+ protected void closeResource() throws AMQPException
+ {
+ _session.getClassFactory().destoryMessageClass(_session.getChannel(), _amqpMessage);
+ }
+
+ /**
+ * -----------------------------------------------
+ * Methods from AMQPMessageCallback class
+ * -----------------------------------------------
+ */
+ public void append(MessageAppendBody messageAppendBody, long correlationId) throws AMQPException
+ {
+ String reference = new String(messageAppendBody.getReference());
+ AMQPApplicationMessage msg = _msgStore.getMessage(reference);
+ msg.addContent(messageAppendBody.getBytes());
+ }
+
+ public void checkpoint(MessageCheckpointBody messageCheckpointBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+ }
+
+ public void close(MessageCloseBody messageCloseBody, long correlationId) throws AMQPException
+ {
+ String reference = new String(messageCloseBody.getReference());
+ AMQPApplicationMessage msg = _msgStore.getMessage(reference);
+ enQueue(msg);
+ _msgStore.removeMessage(reference);
+ }
+
+ public void open(MessageOpenBody messageOpenBody, long correlationId) throws AMQPException
+ {
+ String reference = new String(messageOpenBody.getReference());
+ AMQPApplicationMessage msg = new AMQPApplicationMessage(_session.getChannel(), messageOpenBody.getReference());
+ _msgStore.storeMessage(reference, msg);
+ }
+
+ public void recover(MessageRecoverBody messageRecoverBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void resume(MessageResumeBody messageResumeBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void transfer(MessageTransferBody messageTransferBody, long correlationId) throws AMQPException
+ {
+ MessageHeaders messageHeaders = new MessageHeaders();
+ messageHeaders.setMessageId(messageTransferBody.getMessageId());
+ messageHeaders.setAppId(messageTransferBody.getAppId());
+ messageHeaders.setContentType(messageTransferBody.getContentType());
+ messageHeaders.setEncoding(messageTransferBody.getContentEncoding());
+ messageHeaders.setCorrelationId(messageTransferBody.getCorrelationId());
+ messageHeaders.setDestination(messageTransferBody.getDestination());
+ messageHeaders.setExchange(messageTransferBody.getExchange());
+ messageHeaders.setExpiration(messageTransferBody.getExpiration());
+ messageHeaders.setReplyTo(messageTransferBody.getReplyTo());
+ messageHeaders.setRoutingKey(messageTransferBody.getRoutingKey());
+ messageHeaders.setTransactionId(messageTransferBody.getTransactionId());
+ messageHeaders.setUserId(messageTransferBody.getUserId());
+ messageHeaders.setPriority(messageTransferBody.getPriority());
+ messageHeaders.setDeliveryMode(messageTransferBody.getDeliveryMode());
+ messageHeaders.setApplicationHeaders(messageTransferBody.getApplicationHeaders());
+
+
+
+ if (messageTransferBody.getBody().getContentType() == Content.TypeEnum.INLINE_T)
+ {
+ AMQPApplicationMessage msg = new AMQPApplicationMessage(_session.getChannel(),
+ correlationId,
+ messageHeaders,
+ messageTransferBody.getBody().getContentAsByteArray(),
+ messageTransferBody.getRedelivered());
+
+ enQueue(msg);
+ }
+ else
+ {
+ byte[] referenceId = messageTransferBody.getBody().getContentAsByteArray();
+ AMQPApplicationMessage msg = new AMQPApplicationMessage(_session.getChannel(),referenceId);
+ msg.setMessageHeaders(messageHeaders);
+ msg.setRedeliveredFlag(messageTransferBody.getRedelivered());
+
+ _msgStore.storeMessage(new String(referenceId), msg);
+ }
+ }
+
+ private void enQueue(AMQPApplicationMessage msg)throws AMQPException
+ {
+ try
+ {
+ _queue.put(msg);
+ }
+ catch(Exception e)
+ {
+ throw new AMQPException("Error queueing the messsage",e);
+ }
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java
new file mode 100644
index 0000000000..f553743ea6
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java
@@ -0,0 +1,103 @@
+package org.apache.qpid.nclient.impl;
+
+import java.util.UUID;
+
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.nclient.amqp.AMQPMessage;
+import org.apache.qpid.nclient.api.QpidException;
+import org.apache.qpid.nclient.api.QpidMessageProducer;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.message.AMQPApplicationMessage;
+import org.apache.qpid.nclient.message.MessageHeaders;
+
+public class QpidMessageProducerImpl extends AbstractResource implements QpidMessageProducer
+{
+ private QpidSessionImpl _session;
+ private AMQPMessage _amqpMessage;
+
+ protected QpidMessageProducerImpl(QpidSessionImpl session)
+ {
+ super("Message Class");
+ _session = session;
+ }
+
+ /**
+ * -----------------------------------------------------
+ * Methods introduced by QpidMessageProducer
+ * -----------------------------------------------------
+ */
+ public void send(boolean disableMessageId,boolean inline,AMQPApplicationMessage msg)throws QpidException
+ {
+ // need to handle the inline and reference case
+ final MessageTransferBody messageTransferBody = prepareTransfer(disableMessageId,msg);
+ final AMQPCallbackHelper cb = new AMQPCallbackHelper();
+ HelperTemplate template = new HelperTemplate(){
+
+ public void amqpMethodCall() throws AMQPException
+ {
+ _amqpMessage.transfer(messageTransferBody, cb);
+ }
+ };
+
+ template.invokeAMQPMethodCall("message transfer failed due to");
+
+ template.evaulateResponse(cb);
+ }
+
+ /**
+ * -----------------------------------------------------
+ * Methods introduced by AbstractResource
+ * -----------------------------------------------------
+ */
+ protected void openResource() throws AMQPException
+ {
+ _amqpMessage = _session.getClassFactory().createMessageClass(_session.getChannel(),null);
+ }
+
+ protected void closeResource() throws AMQPException
+ {
+ _session.getClassFactory().destoryMessageClass(_session.getChannel(), _amqpMessage);
+ }
+
+ /**
+ * -----------------------------------------------------
+ * Helper Methods
+ * -----------------------------------------------------
+ */
+ private MessageTransferBody prepareTransfer(boolean disableMessageId,AMQPApplicationMessage msg)
+ {
+ MessageHeaders msgHeaders = msg.getMessageHeaders();
+ MessageTransferBody messageTransferBody = MessageTransferBody.createMethodBody(
+ _session.getMajor(),
+ _session.getMinor(),
+ msgHeaders.getAppId(), //appId
+ msgHeaders.getApplicationHeaders(), //applicationHeaders
+ new Content(Content.TypeEnum.INLINE_T,msg.getContentsAsBytes() ), //body
+ msgHeaders.getContentType(), //contentEncoding,
+ msgHeaders.getContentType(), //contentType
+ msgHeaders.getCorrelationId(), //correlationId
+ msgHeaders.getDeliveryMode(), //deliveryMode non persistant
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange
+ msgHeaders.getExpiration(), //expiration
+ msgHeaders.isImmediate(), //immediate
+ msgHeaders.isMandatory(), //mandatory
+ (disableMessageId)?null : new AMQShortString(UUID.randomUUID().toString()), //messageId
+ msgHeaders.getPriority(), //priority
+ msg.getRedeliveredFlag(), //redelivered
+ msgHeaders.getReplyTo(), //replyTo
+ msgHeaders.getRoutingKey(), //routingKey,
+ "abc".getBytes(), //securityToken
+ _session.getAccessTicket(), //ticket
+ System.currentTimeMillis(), //timestamp
+ msgHeaders.getTransactionId(), //transactionId
+ msgHeaders.getTtl(), //ttl,
+ msgHeaders.getUserId() //userId
+ );
+
+ return messageTransferBody;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidQueueHelperImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidQueueHelperImpl.java
new file mode 100644
index 0000000000..33b3c1177e
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidQueueHelperImpl.java
@@ -0,0 +1,178 @@
+package org.apache.qpid.nclient.impl;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueuePurgeBody;
+import org.apache.qpid.framing.QueueUnbindBody;
+import org.apache.qpid.nclient.amqp.AMQPQueue;
+import org.apache.qpid.nclient.api.QpidException;
+import org.apache.qpid.nclient.api.QpidQueueHelper;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class QpidQueueHelperImpl extends AbstractResource implements QpidQueueHelper
+{
+ private QpidSessionImpl _session;
+ private AMQPQueue _queueClass;
+
+ protected QpidQueueHelperImpl(QpidSessionImpl session)
+ {
+ super("Queue Class");
+ _session = session;
+ }
+
+ /**
+ * -----------------------------------------------------
+ * Methods introduced by QpidQueueHelper
+ * -----------------------------------------------------
+ */
+ public void bindQueue(String exchangeName, boolean nowait, String queueName, String routingKey) throws QpidException
+ {
+ final QueueBindBody queueBindBody = QueueBindBody.createMethodBody(
+ _session.getMajor(),
+ _session.getMinor(),
+ null, //arguments
+ new AMQShortString(exchangeName),//exchange
+ nowait,
+ new AMQShortString(queueName), //queue
+ new AMQShortString(routingKey), //routingKey
+ _session.getAccessTicket()
+ );
+
+ final AMQPCallbackHelper cb = new AMQPCallbackHelper();
+ HelperTemplate template = new HelperTemplate(){
+
+ public void amqpMethodCall() throws AMQPException
+ {
+ _queueClass.bind(queueBindBody, cb);
+ }
+ };
+
+ template.invokeAMQPMethodCall("Queue bind failed due to");
+
+ template.evaulateResponse(cb);
+ }
+
+ public void declareQueue(boolean autoDelete, boolean durable, boolean exclusive, boolean nowait, boolean passive, String queueName)
+ throws QpidException
+ {
+ final QueueDeclareBody queueDeclareBody = QueueDeclareBody.createMethodBody(
+ _session.getMajor(),
+ _session.getMinor(),
+ null, //arguments
+ autoDelete,
+ durable,
+ exclusive,
+ nowait,
+ passive,
+ new AMQShortString(queueName), //queue
+ _session.getAccessTicket()
+ );
+
+ final AMQPCallbackHelper cb = new AMQPCallbackHelper();
+ HelperTemplate template = new HelperTemplate(){
+
+ public void amqpMethodCall() throws AMQPException
+ {
+ _queueClass.declare(queueDeclareBody, cb);
+ }
+ };
+
+ template.invokeAMQPMethodCall("Queue declare failed due to");
+
+ template.evaulateResponse(cb);
+ }
+
+ public void deleteQueue(boolean ifEmpty, boolean ifUnused, boolean nowait, String queueName) throws QpidException
+ {
+ final QueueDeleteBody queueDeleteBody = QueueDeleteBody.createMethodBody(
+ _session.getMajor(),
+ _session.getMinor(),
+ ifEmpty,
+ ifUnused,
+ nowait,
+ new AMQShortString(queueName), //queue
+ _session.getAccessTicket()
+ );
+
+ final AMQPCallbackHelper cb = new AMQPCallbackHelper();
+ HelperTemplate template = new HelperTemplate(){
+
+ public void amqpMethodCall() throws AMQPException
+ {
+ _queueClass.delete(queueDeleteBody, cb);
+ }
+ };
+
+ template.invokeAMQPMethodCall("Queue delete failed due to");
+
+ template.evaulateResponse(cb);
+ }
+
+ public void purgeQueue(boolean nowait, String queueName) throws QpidException
+ {
+ final QueuePurgeBody queuePurgeBody = QueuePurgeBody.createMethodBody(
+ _session.getMajor(),
+ _session.getMinor(),
+ nowait,
+ new AMQShortString(queueName), //queue
+ _session.getAccessTicket()
+ );
+
+ final AMQPCallbackHelper cb = new AMQPCallbackHelper();
+ HelperTemplate template = new HelperTemplate(){
+
+ public void amqpMethodCall() throws AMQPException
+ {
+ _queueClass.purge(queuePurgeBody, cb);
+ }
+ };
+
+ template.invokeAMQPMethodCall("Queue purge failed due to");
+
+ template.evaulateResponse(cb);
+ }
+
+ public void unbindQueue(String exchangeName, String queueName, String routingKey) throws QpidException
+ {
+ final QueueUnbindBody queueUnbindBody = QueueUnbindBody.createMethodBody(
+ _session.getMajor(),
+ _session.getMinor(),
+ null,
+ new AMQShortString(exchangeName),
+ new AMQShortString(queueName), //queue
+ new AMQShortString(routingKey),
+ _session.getAccessTicket()
+ );
+
+ final AMQPCallbackHelper cb = new AMQPCallbackHelper();
+ HelperTemplate template = new HelperTemplate(){
+
+ public void amqpMethodCall() throws AMQPException
+ {
+ _queueClass.unbind(queueUnbindBody, cb);
+ }
+ };
+
+ template.invokeAMQPMethodCall("Queue unbind failed due to");
+
+ template.evaulateResponse(cb);
+ }
+
+
+ /**
+ * -----------------------------------------------------
+ * Methods introduced by AbstractResource
+ * -----------------------------------------------------
+ */
+ protected void openResource() throws AMQPException
+ {
+ _queueClass = _session.getClassFactory().createQueueClass(_session.getChannel());
+ }
+
+ protected void closeResource() throws AMQPException
+ {
+ _session.getClassFactory().destroyQueueClass(_session.getChannel(), _queueClass);
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java
new file mode 100644
index 0000000000..7d3cf5f861
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java
@@ -0,0 +1,259 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.impl;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.nclient.amqp.AMQPChannel;
+import org.apache.qpid.nclient.amqp.AMQPClassFactory;
+import org.apache.qpid.nclient.api.QpidConnection;
+import org.apache.qpid.nclient.api.QpidExchangeHelper;
+import org.apache.qpid.nclient.api.QpidMessageConsumer;
+import org.apache.qpid.nclient.api.QpidMessageHelper;
+import org.apache.qpid.nclient.api.QpidMessageProducer;
+import org.apache.qpid.nclient.api.QpidException;
+import org.apache.qpid.nclient.api.QpidQueueHelper;
+import org.apache.qpid.nclient.api.QpidSession;
+import org.apache.qpid.nclient.api.QpidTransactionHelper;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.protocol.AMQConstant;
+
+/**
+ * According to the 0-9 spec, the session is built on channels(1-1 map) and when a channel is closed
+ * the session should be closed. However with the introdution of the session class in 0-10
+ * this may change. Therefore I will not implement that logic yet.
+ *
+ * Once the dust settles there will be a Failover Helper that will manage the sessions
+ * failover logic.
+ */
+public class QpidSessionImpl extends AbstractResource implements QpidSession
+{
+ private AMQPChannel _channelClass;
+ private int _channel;
+ private byte _major;
+ private byte _minor;
+ private AMQPClassFactory _classFactory;
+ private int _ticket = 0; //currently useless
+ private QpidExchangeHelperImpl _qpidExchangeHelper;
+ private QpidQueueHelperImpl _qpidQueueHelper;
+ private QpidMessageConsumerImpl _qpidMessageConsumer;
+ private QpidMessageProducerImpl _qpidMessageProducer;
+ private AtomicBoolean _closed;
+ private Lock _sessionCloseLock = new ReentrantLock();
+
+ // this will be used as soon as Session class is finalized
+ private int _expiryInSeconds = QpidConnection.SESSION_EXPIRY_TIED_TO_CHANNEL;
+ private QpidConnection _qpidConnection;
+
+ public QpidSessionImpl(AMQPClassFactory classFactory,AMQPChannel channelClass,int channel,byte major, byte minor)
+ {
+ super("Session");
+ _channelClass = channelClass;
+ _channel = channel;
+ _major = major;
+ _minor = minor;
+ _classFactory = classFactory;
+ _qpidConnection = null;
+ }
+
+ /**
+ * -----------------------------------------------------
+ * Methods introduced by AbstractResource
+ * -----------------------------------------------------
+ */
+ protected void openResource() throws AMQPException
+ {
+ // These methods will be changed to session methods
+ openChannel();
+ }
+
+ protected void closeResource() throws AMQPException
+ {
+ ChannelCloseBody channelCloseBody = ChannelCloseBody.createMethodBody(_major, _minor,
+ 0, //classId
+ 0, //methodId
+ AMQConstant.REPLY_SUCCESS.getCode(),
+ new AMQShortString("Qpid Client closing channel"));
+
+ _channelClass.close(channelCloseBody);
+
+ _classFactory.destroyChannelClass(_channel, _channelClass);
+ if (_qpidQueueHelper != null)
+ {
+ _qpidQueueHelper.closeResource();
+ }
+ if (_qpidExchangeHelper != null)
+ {
+ _qpidExchangeHelper.closeResource();
+ }
+ if(_qpidMessageConsumer != null)
+ {
+ _qpidMessageConsumer.closeResource();
+ }
+ if(_qpidMessageProducer != null)
+ {
+ _qpidMessageProducer.closeResource();
+ }
+ }
+
+ /**
+ * -----------------------------------------------------
+ * Methods introduced by QpidSession
+ * -----------------------------------------------------
+ */
+ public void close() throws QpidException
+ {
+ if (!_closed.getAndSet(true))
+ {
+ _sessionCloseLock.lock();
+ try
+ {
+ super.close();
+ }
+ finally
+ {
+ _sessionCloseLock.unlock();
+ }
+ }
+ }
+
+ public void resume() throws QpidException
+ {
+
+ }
+
+ // not intended to be used at the jms layer
+ public void suspend() throws QpidException
+ {
+
+ }
+
+ public void failover() throws QpidException
+ {
+ if(_expiryInSeconds == QpidConnection.SESSION_EXPIRY_TIED_TO_CHANNEL)
+ {
+ // then close the session
+ }
+ else
+ {
+ //kick in the failover logic
+ }
+ }
+
+ public QpidMessageConsumer createConsumer() throws QpidException
+ {
+ if (_qpidMessageConsumer == null)
+ {
+ _qpidMessageConsumer = new QpidMessageConsumerImpl(this);
+ _qpidMessageConsumer.open();
+ }
+ return _qpidMessageConsumer;
+ }
+
+ public QpidMessageProducer createProducer() throws QpidException
+ {
+ if (_qpidMessageProducer == null)
+ {
+ _qpidMessageProducer = new QpidMessageProducerImpl(this);
+ _qpidMessageProducer.open();
+ }
+ return _qpidMessageProducer;
+ }
+
+ /** ------------------------------------------
+ * These helper classes are employed to reduce
+ * the clutter in session classes and improve
+ * readability
+ * ------------------------------------------
+ */
+ public QpidExchangeHelper getExchangeHelper() throws QpidException
+ {
+ if (_qpidExchangeHelper == null)
+ {
+ _qpidExchangeHelper = new QpidExchangeHelperImpl(this);
+ _qpidExchangeHelper.open();
+ }
+ return _qpidExchangeHelper;
+ }
+
+ public QpidMessageHelper getMessageHelper() throws QpidException
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public QpidQueueHelper getQueueHelper() throws QpidException
+ {
+ if (_qpidQueueHelper == null)
+ {
+ _qpidQueueHelper = new QpidQueueHelperImpl(this);
+ _qpidQueueHelper.open();
+ }
+ return _qpidQueueHelper;
+ }
+
+ public QpidTransactionHelper getTransactionHelper()throws QpidException
+ {
+ return null;
+ }
+
+ /** ------------------------------------------
+ * These protected methods are for the qpid
+ * implementation of the api package
+ * ------------------------------------------
+ */
+ protected byte getMinor()
+ {
+ return _minor;
+ }
+
+ protected byte getMajor()
+ {
+ return _major;
+ }
+
+ protected int getChannel()
+ {
+ return _channel;
+ }
+
+ protected AMQPClassFactory getClassFactory()
+ {
+ return _classFactory;
+ }
+
+ protected int getAccessTicket()
+ {
+ return _ticket;
+ }
+
+ private void openChannel() throws AMQPException
+ {
+ ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1"));
+ ChannelOpenOkBody channelOpenOkBody = _channelClass.open(channelOpenBody);
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/SecurityHelper.java
index 908f0adee0..15cc347b4f 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/SecurityHelper.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.nclient.amqp.sample;
+package org.apache.qpid.nclient.impl;
import java.io.UnsupportedEncodingException;
import java.util.HashSet;
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
index d79525c5b2..79302540be 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
@@ -21,6 +21,7 @@
package org.apache.qpid.nclient.message;
+import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
@@ -82,6 +83,17 @@ public class AMQPApplicationMessage {
{
return contents;
}
+
+ public byte[] getContentsAsBytes()
+ {
+ ByteBuffer buf = ByteBuffer.allocate(bytesReceived);
+ for (byte[] bytes: contents)
+ {
+ buf.put(bytes);
+ }
+
+ return buf.array();
+ }
public long getDeliveryTag()
{
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java
index 562aa7b06e..f633ecff3e 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java
@@ -44,7 +44,7 @@ public class MessageHeaders
private AMQShortString _exchange;
- private FieldTable _jmsHeaders;
+ private FieldTable _applicationHeaders;
private short _deliveryMode;
@@ -70,6 +70,12 @@ public class MessageHeaders
private AMQShortString _routingKey;
+ private boolean _immediate;
+
+ private boolean _mandatory;
+
+ private long _ttl;
+
private int _size;
public int getSize()
@@ -106,19 +112,19 @@ public class MessageHeaders
_encoding = encoding;
}
- public FieldTable getJMSHeaders()
+ public FieldTable getApplicationHeaders()
{
- if (_jmsHeaders == null)
+ if (_applicationHeaders == null)
{
- setJMSHeaders(FieldTableFactory.newFieldTable());
+ setApplicationHeaders(FieldTableFactory.newFieldTable());
}
- return _jmsHeaders;
+ return _applicationHeaders;
}
- public void setJMSHeaders(FieldTable headers)
+ public void setApplicationHeaders(FieldTable headers)
{
- _jmsHeaders = headers;
+ _applicationHeaders = headers;
}
@@ -227,13 +233,13 @@ public class MessageHeaders
public boolean getBoolean(AMQShortString string) throws JMSException
{
- Boolean b = getJMSHeaders().getBoolean(string);
+ Boolean b = getApplicationHeaders().getBoolean(string);
if (b == null)
{
- if (getJMSHeaders().containsKey(string))
+ if (getApplicationHeaders().containsKey(string))
{
- Object str = getJMSHeaders().getObject(string);
+ Object str = getApplicationHeaders().getObject(string);
if (str == null || !(str instanceof AMQShortString))
{
@@ -255,11 +261,11 @@ public class MessageHeaders
public char getCharacter(AMQShortString string) throws JMSException
{
- Character c = getJMSHeaders().getCharacter(string);
+ Character c = getApplicationHeaders().getCharacter(string);
if (c == null)
{
- if (getJMSHeaders().isNullStringValue(string.asString()))
+ if (getApplicationHeaders().isNullStringValue(string.asString()))
{
throw new NullPointerException("Cannot convert null char");
}
@@ -276,7 +282,7 @@ public class MessageHeaders
public byte[] getBytes(AMQShortString string) throws JMSException
{
- byte[] bs = getJMSHeaders().getBytes(string);
+ byte[] bs = getApplicationHeaders().getBytes(string);
if (bs == null)
{
@@ -290,12 +296,12 @@ public class MessageHeaders
public byte getByte(AMQShortString string) throws JMSException
{
- Byte b = getJMSHeaders().getByte(string);
+ Byte b = getApplicationHeaders().getByte(string);
if (b == null)
{
- if (getJMSHeaders().containsKey(string))
+ if (getApplicationHeaders().containsKey(string))
{
- Object str = getJMSHeaders().getObject(string);
+ Object str = getApplicationHeaders().getObject(string);
if (str == null || !(str instanceof AMQShortString))
{
@@ -317,7 +323,7 @@ public class MessageHeaders
public short getShort(AMQShortString string) throws JMSException
{
- Short s = getJMSHeaders().getShort(string);
+ Short s = getApplicationHeaders().getShort(string);
if (s == null)
{
@@ -329,7 +335,7 @@ public class MessageHeaders
public int getInteger(AMQShortString string) throws JMSException
{
- Integer i = getJMSHeaders().getInteger(string);
+ Integer i = getApplicationHeaders().getInteger(string);
if (i == null)
{
@@ -341,7 +347,7 @@ public class MessageHeaders
public long getLong(AMQShortString string) throws JMSException
{
- Long l = getJMSHeaders().getLong(string);
+ Long l = getApplicationHeaders().getLong(string);
if (l == null)
{
@@ -353,13 +359,13 @@ public class MessageHeaders
public float getFloat(AMQShortString string) throws JMSException
{
- Float f = getJMSHeaders().getFloat(string);
+ Float f = getApplicationHeaders().getFloat(string);
if (f == null)
{
- if (getJMSHeaders().containsKey(string))
+ if (getApplicationHeaders().containsKey(string))
{
- Object str = getJMSHeaders().getObject(string);
+ Object str = getApplicationHeaders().getObject(string);
if (str == null || !(str instanceof AMQShortString))
{
@@ -382,7 +388,7 @@ public class MessageHeaders
public double getDouble(AMQShortString string) throws JMSException
{
- Double d = getJMSHeaders().getDouble(string);
+ Double d = getApplicationHeaders().getDouble(string);
if (d == null)
{
@@ -394,13 +400,13 @@ public class MessageHeaders
public AMQShortString getString(AMQShortString string) throws JMSException
{
- AMQShortString s = new AMQShortString(getJMSHeaders().getString(string.asString()));
+ AMQShortString s = new AMQShortString(getApplicationHeaders().getString(string.asString()));
if (s == null)
{
- if (getJMSHeaders().containsKey(string))
+ if (getApplicationHeaders().containsKey(string))
{
- Object o = getJMSHeaders().getObject(string);
+ Object o = getApplicationHeaders().getObject(string);
if (o instanceof byte[])
{
throw new MessageFormatException("getObject couldn't find " + string + " item.");
@@ -424,71 +430,71 @@ public class MessageHeaders
public Object getObject(AMQShortString string) throws JMSException
{
- return getJMSHeaders().getObject(string);
+ return getApplicationHeaders().getObject(string);
}
public void setBoolean(AMQShortString string, boolean b) throws JMSException
{
checkPropertyName(string);
- getJMSHeaders().setBoolean(string, b);
+ getApplicationHeaders().setBoolean(string, b);
}
public void setChar(AMQShortString string, char c) throws JMSException
{
checkPropertyName(string);
- getJMSHeaders().setChar(string, c);
+ getApplicationHeaders().setChar(string, c);
}
public Object setBytes(AMQShortString string, byte[] bytes)
{
- return getJMSHeaders().setBytes(string, bytes);
+ return getApplicationHeaders().setBytes(string, bytes);
}
public Object setBytes(AMQShortString string, byte[] bytes, int start, int length)
{
- return getJMSHeaders().setBytes(string, bytes, start, length);
+ return getApplicationHeaders().setBytes(string, bytes, start, length);
}
public void setByte(AMQShortString string, byte b) throws JMSException
{
checkPropertyName(string);
- getJMSHeaders().setByte(string, b);
+ getApplicationHeaders().setByte(string, b);
}
public void setShort(AMQShortString string, short i) throws JMSException
{
checkPropertyName(string);
- getJMSHeaders().setShort(string, i);
+ getApplicationHeaders().setShort(string, i);
}
public void setInteger(AMQShortString string, int i) throws JMSException
{
checkPropertyName(string);
- getJMSHeaders().setInteger(string, i);
+ getApplicationHeaders().setInteger(string, i);
}
public void setLong(AMQShortString string, long l) throws JMSException
{
checkPropertyName(string);
- getJMSHeaders().setLong(string, l);
+ getApplicationHeaders().setLong(string, l);
}
public void setFloat(AMQShortString string, float v) throws JMSException
{
checkPropertyName(string);
- getJMSHeaders().setFloat(string, v);
+ getApplicationHeaders().setFloat(string, v);
}
public void setDouble(AMQShortString string, double v) throws JMSException
{
checkPropertyName(string);
- getJMSHeaders().setDouble(string, v);
+ getApplicationHeaders().setDouble(string, v);
}
public void setString(AMQShortString string, AMQShortString string1) throws JMSException
{
checkPropertyName(string);
- getJMSHeaders().setString(string.asString(), string1.asString());
+ getApplicationHeaders().setString(string.asString(), string1.asString());
}
public void setObject(AMQShortString string, Object object) throws JMSException
@@ -496,7 +502,7 @@ public class MessageHeaders
checkPropertyName(string);
try
{
- getJMSHeaders().setObject(string, object);
+ getApplicationHeaders().setObject(string, object);
}
catch (AMQPInvalidClassException aice)
{
@@ -506,42 +512,42 @@ public class MessageHeaders
public boolean itemExists(AMQShortString string) throws JMSException
{
- return getJMSHeaders().containsKey(string);
+ return getApplicationHeaders().containsKey(string);
}
public Enumeration getPropertyNames()
{
- return getJMSHeaders().getPropertyNames();
+ return getApplicationHeaders().getPropertyNames();
}
public void clear()
{
- getJMSHeaders().clear();
+ getApplicationHeaders().clear();
}
public boolean propertyExists(AMQShortString propertyName)
{
- return getJMSHeaders().propertyExists(propertyName);
+ return getApplicationHeaders().propertyExists(propertyName);
}
public Object put(Object key, Object value)
{
- return getJMSHeaders().setObject(key.toString(), value);
+ return getApplicationHeaders().setObject(key.toString(), value);
}
public Object remove(AMQShortString propertyName)
{
- return getJMSHeaders().remove(propertyName);
+ return getApplicationHeaders().remove(propertyName);
}
public boolean isEmpty()
{
- return getJMSHeaders().isEmpty();
+ return getApplicationHeaders().isEmpty();
}
public void writeToBuffer(ByteBuffer data)
{
- getJMSHeaders().writeToBuffer(data);
+ getApplicationHeaders().writeToBuffer(data);
}
public Enumeration getMapNames()
@@ -674,6 +680,36 @@ public class MessageHeaders
{
this._routingKey = routingKey;
}
+
+ public boolean isImmediate()
+ {
+ return _immediate;
+ }
+
+ public void setImmediate(boolean immediate)
+ {
+ this._immediate = immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return _mandatory;
+ }
+
+ public void setMandatory(boolean mandatory)
+ {
+ this._mandatory = mandatory;
+ }
+
+ public long getTtl()
+ {
+ return _ttl;
+ }
+
+ public void setTtl(long ttl)
+ {
+ this._ttl = ttl;
+ }
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java
index efd7264f96..4419a662dd 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java
@@ -1,16 +1,16 @@
package org.apache.qpid.nclient.message;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.nclient.core.AMQPException;
public interface MessageStore {
- public void removeMessage(String identifier);
+ public void removeMessage(String identifier) throws AMQPException;
- public void storeContentBodyChunk(String identifier,byte[] contentBody) throws AMQException;
+ public void storeContentBodyChunk(String identifier,byte[] contentBody) throws AMQPException;
- public void storeMessageMetaData(String identifier, MessageHeaders messageHeaders) throws AMQException;
+ public void storeMessageMetaData(String identifier, MessageHeaders messageHeaders) throws AMQPException;
- public AMQPApplicationMessage getMessage(String identifier) throws AMQException;
+ public AMQPApplicationMessage getMessage(String identifier) throws AMQPException;
- public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException;
+ public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQPException;
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java
index eb5a9c1778..13c01acb07 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java
@@ -3,37 +3,48 @@ package org.apache.qpid.nclient.message;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.nclient.core.AMQPException;
public class TransientMessageStore implements MessageStore {
- private Map<String,AMQPApplicationMessage> messageMap = new ConcurrentHashMap<String,AMQPApplicationMessage>();
+ private Map<String,AMQPApplicationMessage> _messageMap = new ConcurrentHashMap<String,AMQPApplicationMessage>();
public AMQPApplicationMessage getMessage(String identifier)
- throws AMQException
+ throws AMQPException
{
- return messageMap.get(identifier);
+ if (!_messageMap.containsKey(identifier))
+ {
+ throw new AMQPException("identifier not found " + identifier);
+ }
+
+ return _messageMap.get(identifier);
}
- public void removeMessage(String identifier)
+ public void removeMessage(String identifier) throws AMQPException
{
- messageMap.remove(identifier);
+ if (!_messageMap.containsKey(identifier))
+ {
+ throw new AMQPException("identifier not found " + identifier);
+ }
+ _messageMap.remove(identifier);
}
public void storeContentBodyChunk(String identifier, byte[] contentBody)
- throws AMQException
+ throws AMQPException
{
-
+ AMQPApplicationMessage msg = _messageMap.get(identifier);
+ msg.addContent(contentBody);
}
public void storeMessageMetaData(String identifier,
- MessageHeaders messageHeaders) throws AMQException
+ MessageHeaders messageHeaders) throws AMQPException
{
-
+ AMQPApplicationMessage msg = _messageMap.get(identifier);
+ msg.setMessageHeaders(messageHeaders);
}
- public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException
+ public void storeMessage(String identifier,AMQPApplicationMessage msg)throws AMQPException
{
-
+ _messageMap.put(identifier, msg);
}
}