summaryrefslogtreecommitdiff
path: root/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0
diff options
context:
space:
mode:
Diffstat (limited to 'java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0')
-rw-r--r--java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java43
-rw-r--r--java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java407
-rw-r--r--java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java136
-rw-r--r--java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java347
-rw-r--r--java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java296
-rw-r--r--java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java246
-rw-r--r--java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java249
-rw-r--r--java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java347
-rw-r--r--java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java244
-rw-r--r--java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java529
10 files changed, 2844 insertions, 0 deletions
diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java
new file mode 100644
index 0000000000..3bb26744c4
--- /dev/null
+++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import java.lang.reflect.InvocationTargetException;
+
+public class Command
+{
+ public static void main(String[] args) throws
+ ClassNotFoundException,
+ NoSuchMethodException,
+ InvocationTargetException,
+ IllegalAccessException,
+ InstantiationException
+ {
+ String name = args[0];
+ String[] cmdArgs = new String[args.length-1];
+ System.arraycopy(args,1,cmdArgs,0,args.length-1);
+ name = "org.apache.qpid.amqp_1_0.client." + String.valueOf(name.charAt(0)).toUpperCase() + name.substring(1).toLowerCase();
+ Class<Util> clazz = (Class<Util>) Class.forName(name);
+ Util util = clazz.getDeclaredConstructor(String[].class).newInstance((Object)cmdArgs);
+ util.run();
+
+ }
+}
diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
new file mode 100644
index 0000000000..b58ce6bfe5
--- /dev/null
+++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
@@ -0,0 +1,407 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Demo extends Util
+{
+ private static final String USAGE_STRING = "demo [options] <vendor> [<content> ...]\n\nOptions:";
+ private static final String OPCODE = "opcode";
+ private static final String ACTION = "action";
+ private static final String MESSAGE_ID = "message-id";
+ private static final String VENDOR = "vendor";
+ private static final String LOG = "log";
+ private static final String RECEIVED = "received";
+ private static final String TEST = "test";
+ private static final String APACHE = "apache";
+ private static final String SENT = "sent";
+ private static final String LINK_REF = "link-ref";
+ private static final String HOST = "host";
+ private static final String PORT = "port";
+ private static final String SASL_USER = "sasl-user";
+ private static final String SASL_PASSWORD = "sasl-password";
+ private static final String ROLE = "role";
+ private static final String ADDRESS = "address";
+ private static final String SENDER = "sender";
+ private static final String SEND_MESSAGE = "send-message";
+ private static final String ANNOUNCE = "announce";
+ private static final String MESSAGE_VENDOR = "message-vendor";
+ private static final String CREATE_LINK = "create-link";
+
+ public static void main(String[] args)
+ {
+ new Demo(args).run();
+ }
+
+ public Demo(String[] args)
+ {
+ super(args);
+ }
+
+ @Override
+ protected boolean hasLinkDurableOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasLinkNameOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasResponseQueueOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasSizeOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasBlockOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasStdInOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasTxnOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasModeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasCountOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasWindowSizeOption()
+ {
+ return false;
+ }
+
+ public void run()
+ {
+
+ try
+ {
+
+ final String vendor = getArgs()[0];
+ final String queue = "control";
+
+ String message = "";
+
+ Connection conn = newConnection();
+ Session session = conn.createSession();
+
+
+ Receiver responseReceiver;
+
+ responseReceiver = session.createTemporaryQueueReceiver();
+
+
+
+
+ responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
+
+
+
+ Sender s = session.createSender(queue, getWindowSize(), getMode());
+
+
+ Properties properties = new Properties();
+ properties.setMessageId(java.util.UUID.randomUUID());
+ properties.setReplyTo(responseReceiver.getAddress());
+
+ HashMap appPropMap = new HashMap();
+ ApplicationProperties appProperties = new ApplicationProperties(appPropMap);
+
+ appPropMap.put(OPCODE, ANNOUNCE);
+ appPropMap.put(VENDOR, vendor);
+ appPropMap.put(ADDRESS,responseReceiver.getAddress());
+
+ AmqpValue amqpValue = new AmqpValue(message);
+ Section[] sections = { properties, appProperties, amqpValue};
+ final Message message1 = new Message(Arrays.asList(sections));
+
+ s.send(message1);
+
+ Map<Object, Sender> sendingLinks = new HashMap<Object, Sender>();
+ Map<Object, Receiver> receivingLinks = new HashMap<Object, Receiver>();
+
+
+ boolean done = false;
+
+ while(!done)
+ {
+ boolean wait = true;
+ Message m = responseReceiver.receive(false);
+ if(m != null)
+ {
+ List<Section> payload = m.getPayload();
+ wait = false;
+ ApplicationProperties props = m.getApplicationProperties();
+ Map map = props.getValue();
+ String op = (String) map.get(OPCODE);
+ if("reset".equals(op))
+ {
+ for(Sender sender : sendingLinks.values())
+ {
+ try
+ {
+ sender.close();
+ Session session1 = sender.getSession();
+ session1.close();
+ session1.getConnection().close();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ for(Receiver receiver : receivingLinks.values())
+ {
+ try
+ {
+ receiver.close();
+ receiver.getSession().close();
+ receiver.getSession().getConnection().close();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ sendingLinks.clear();
+ receivingLinks.clear();
+ }
+ else if(CREATE_LINK.equals(op))
+ {
+ Object linkRef = map.get(LINK_REF);
+ String host = (String) map.get(HOST);
+ Object o = map.get(PORT);
+ int port = Integer.parseInt(String.valueOf(o));
+ String user = (String) map.get(SASL_USER);
+ String password = (String) map.get(SASL_PASSWORD);
+ String role = (String) map.get(ROLE);
+ String address = (String) map.get(ADDRESS);
+ System.err.println("Host: " + host + "\tPort: " + port + "\t user: " + user +"\t password: " + password);
+ try{
+
+
+ Connection conn2 = new Connection(host, port, user, password, host);
+ Session session2 = conn2.createSession();
+ if(sendingLinks.containsKey(linkRef))
+ {
+ try
+ {
+ sendingLinks.remove(linkRef).close();
+ }
+ catch (Exception e)
+ {
+
+ }
+ }
+ if(receivingLinks.containsKey(linkRef))
+ {
+ try
+ {
+ receivingLinks.remove(linkRef).close();
+ }
+ catch (Exception e)
+ {
+
+ }
+ }
+ if(SENDER.equals(role))
+ {
+
+ System.err.println("%%% Creating sender (" + linkRef + ")");
+ Sender sender = session2.createSender(address);
+ sendingLinks.put(linkRef, sender);
+ }
+ else
+ {
+
+ System.err.println("%%% Creating receiver (" + linkRef + ")");
+ Receiver receiver2 = session2.createReceiver(address);
+ receiver2.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
+
+ receivingLinks.put(linkRef, receiver2);
+ }
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ else if(SEND_MESSAGE.equals(op))
+ {
+ Sender sender = sendingLinks.get(map.get(LINK_REF));
+ Properties m2props = new Properties();
+ Object messageId = map.get(MESSAGE_ID);
+ m2props.setMessageId(messageId);
+ Map m2propmap = new HashMap();
+ m2propmap.put(OPCODE, TEST);
+ m2propmap.put(VENDOR, vendor);
+ ApplicationProperties m2appProps = new ApplicationProperties(m2propmap);
+ Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId)));
+ sender.send(m2);
+
+ Map m3propmap = new HashMap();
+ m3propmap.put(OPCODE, LOG);
+ m3propmap.put(ACTION, SENT);
+ m3propmap.put(MESSAGE_ID, messageId);
+ m3propmap.put(VENDOR, vendor);
+ m3propmap.put(MESSAGE_VENDOR, vendor);
+
+
+ Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
+ new AmqpValue("AMQP-"+messageId)));
+ s.send(m3);
+
+ }
+
+ responseReceiver.acknowledge(m);
+ }
+ else
+ {
+ for(Map.Entry<Object, Receiver> entry : receivingLinks.entrySet())
+ {
+ m = entry.getValue().receive(false);
+ if(m != null)
+ {
+ wait = false;
+
+ System.err.println("%%% Received message from " + entry.getKey());
+
+ Properties mp = m.getProperties();
+ ApplicationProperties ap = m.getApplicationProperties();
+
+ Map m3propmap = new HashMap();
+ m3propmap.put(OPCODE, LOG);
+ m3propmap.put(ACTION, RECEIVED);
+ m3propmap.put(MESSAGE_ID, mp.getMessageId());
+ m3propmap.put(VENDOR, vendor);
+ m3propmap.put(MESSAGE_VENDOR, ap.getValue().get(VENDOR));
+
+ Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
+ new AmqpValue("AMQP-"+mp.getMessageId())));
+ s.send(m3);
+
+ entry.getValue().acknowledge(m);
+ }
+
+ }
+ }
+
+ if(wait)
+ {
+ try
+ {
+ Thread.sleep(500l);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ }
+
+ }
+
+
+
+
+
+
+
+
+
+ s.close();
+ session.close();
+ conn.close();
+
+ }
+ catch (Connection.ConnectionException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (AmqpErrorException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+
+ }
+
+ protected boolean hasSingleLinkPerConnectionMode()
+ {
+ return false;
+ }
+
+ protected void printUsage(Options options)
+ {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(USAGE_STRING, options );
+ }
+
+}
diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
new file mode 100644
index 0000000000..65d27b21f8
--- /dev/null
+++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
@@ -0,0 +1,136 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.commons.cli.Options;
+
+public class Dump extends Util
+{
+ private static final String USAGE_STRING = "dump [options] <address>\n\nOptions:";
+
+
+ protected Dump(String[] args)
+ {
+ super(args);
+ }
+
+ public static void main(String[] args)
+ {
+ new Dump(args).run();
+ }
+
+ @Override
+ protected boolean hasLinkDurableOption()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ protected boolean hasLinkNameOption()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ protected boolean hasResponseQueueOption()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ protected boolean hasSizeOption()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ protected boolean hasBlockOption()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ protected boolean hasStdInOption()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ protected boolean hasTxnOption()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ protected boolean hasModeOption()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ protected boolean hasCountOption()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ protected void printUsage(Options options)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ protected void run()
+ {
+ final String queue = getArgs()[0];
+
+ try
+ {
+ Connection conn = newConnection();
+
+ Session session = conn.createSession();
+
+
+ Sender s = session.createSender(queue, 10);
+
+ Message message = new Message("dump me");
+ message.setDeliveryTag(new Binary("dump".getBytes()));
+
+ s.send(message);
+
+ s.close();
+ session.close();
+ conn.close();
+
+ } catch (Connection.ConnectionException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ } catch (Sender.SenderClosingException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+}
diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java
new file mode 100644
index 0000000000..4d98655ad2
--- /dev/null
+++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java
@@ -0,0 +1,347 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class Filereceiver extends Util
+{
+ private static final String USAGE_STRING = "filereceiver [options] <address> <directory>\n\nOptions:";
+
+ protected Filereceiver(String[] args)
+ {
+ super(args);
+ }
+
+ @Override
+ protected boolean hasLinkDurableOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasLinkNameOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasResponseQueueOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasSizeOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasBlockOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasStdInOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasTxnOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasModeOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasCountOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected void printUsage(Options options)
+ {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(USAGE_STRING, options );
+
+ }
+
+ @Override
+ protected void run()
+ {
+ final String queue = getArgs()[0];
+ final String directoryName = getArgs()[1];
+
+ try
+ {
+ Connection conn = newConnection();
+
+ Session session = conn.createSession();
+
+ final File directory = new File(directoryName);
+ if(directory.isDirectory() && directory.canWrite())
+ {
+ File tmpDirectory = new File(directoryName, ".tmp");
+ if(!tmpDirectory.exists())
+ {
+ tmpDirectory.mkdir();
+ }
+
+ String[] unsettledFiles = tmpDirectory.list();
+
+ Map<Binary, Outcome> unsettled = new HashMap<Binary, Outcome>();
+ final Map<Binary, String> unsettledFileNames = new HashMap<Binary, String>();
+
+ Accepted accepted = new Accepted();
+
+ for(String fileName : unsettledFiles)
+ {
+ File theFile = new File(tmpDirectory, fileName);
+ if(theFile.isFile())
+ {
+ if(fileName.startsWith("~") && fileName.endsWith("~"))
+ {
+ theFile.delete();
+ }
+ else
+ {
+ int splitPoint = fileName.indexOf(".");
+ String deliveryTagStr = fileName.substring(0,splitPoint);
+ String actualFileName = fileName.substring(splitPoint+1);
+
+ byte[] bytes = new byte[deliveryTagStr.length()/2];
+
+
+ for(int i = 0; i < bytes.length; i++)
+ {
+ char c = deliveryTagStr.charAt(2*i);
+ char d = deliveryTagStr.charAt(1+(2*i));
+
+ bytes[i] = (byte) (((c <= '9' ? c - '0' : c - 'W') << 4)
+ | (d <= '9' ? d - '0' : d - 'W'));
+
+ }
+ Binary deliveryTag = new Binary(bytes);
+ unsettled.put(deliveryTag, accepted);
+ unsettledFileNames.put(deliveryTag, fileName);
+ }
+ }
+
+ }
+
+ Receiver r = session.createReceiver(queue, AcknowledgeMode.EO, getLinkName(), isDurableLink(),
+ unsettled);
+
+ Map<Binary, Outcome> remoteUnsettled = r.getRemoteUnsettled();
+
+ for(Map.Entry<Binary, String> entry : unsettledFileNames.entrySet())
+ {
+ if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey()))
+ {
+
+ File tmpFile = new File(tmpDirectory, entry.getValue());
+ final File dest = new File(directory,
+ entry.getValue().substring(entry.getValue().indexOf(".") + 1));
+ if(dest.exists())
+ {
+ System.err.println("Duplicate detected - filename " + dest.getName());
+ }
+
+ tmpFile.renameTo(dest);
+ }
+ }
+
+
+ int credit = 10;
+
+ r.setCredit(UnsignedInteger.valueOf(credit), true);
+
+
+ int received = 0;
+ Message m = null;
+ do
+ {
+ m = isBlock() && received == 0 ? r.receive() : r.receive(10000);
+ if(m != null)
+ {
+ if(m.isResume() && unsettled.containsKey(m.getDeliveryTag()))
+ {
+ final String tmpFileName = unsettledFileNames.get(m.getDeliveryTag());
+ final File unsettledFile = new File(tmpDirectory,
+ tmpFileName);
+ r.acknowledge(m, new Receiver.SettledAction()
+ {
+ public void onSettled(final Binary deliveryTag)
+ {
+ int splitPoint = tmpFileName.indexOf(".");
+
+ String fileName = tmpFileName.substring(splitPoint+1);
+
+ final File dest = new File(directory, fileName);
+ if(dest.exists())
+ {
+ System.err.println("Duplicate detected - filename " + dest.getName());
+ }
+ unsettledFile.renameTo(dest);
+ unsettledFileNames.remove(deliveryTag);
+ }
+ });
+ }
+ else
+ {
+ received++;
+ List<Section> sections = m.getPayload();
+ Binary deliveryTag = m.getDeliveryTag();
+ StringBuilder tagNameBuilder = new StringBuilder();
+
+ ByteBuffer dtbuf = deliveryTag.asByteBuffer();
+ while(dtbuf.hasRemaining())
+ {
+ tagNameBuilder.append(String.format("%02x", dtbuf.get()));
+ }
+
+
+ ApplicationProperties properties = null;
+ List<Binary> data = new ArrayList<Binary>();
+ int totalSize = 0;
+ for(Section section : sections)
+ {
+ if(section instanceof ApplicationProperties)
+ {
+ properties = (ApplicationProperties) section;
+ }
+ else if(section instanceof AmqpValue)
+ {
+ AmqpValue value = (AmqpValue) section;
+ if(value.getValue() instanceof Binary)
+ {
+ Binary binary = (Binary) value.getValue();
+ data.add(binary);
+ totalSize += binary.getLength();
+
+ }
+ else
+ {
+ // TODO exception
+ }
+ }
+ else if(section instanceof Data)
+ {
+ Data value = (Data) section;
+ Binary binary = value.getValue();
+ data.add(binary);
+ totalSize += binary.getLength();
+
+ }
+ }
+ if(properties != null)
+ {
+ final String fileName = (String) properties.getValue().get("filename");
+ byte[] fileData = new byte[totalSize];
+ ByteBuffer buf = ByteBuffer.wrap(fileData);
+ int offset = 0;
+ for(Binary bin : data)
+ {
+ buf.put(bin.asByteBuffer());
+ }
+ File outputFile = new File(tmpDirectory, "~"+fileName+"~");
+ if(outputFile.exists())
+ {
+ outputFile.delete();
+ }
+ FileOutputStream fos = new FileOutputStream(outputFile);
+ fos.write(fileData);
+ fos.flush();
+ fos.close();
+
+ final File unsettledFile = new File(tmpDirectory, tagNameBuilder.toString() + "." +
+ fileName);
+ outputFile.renameTo(unsettledFile);
+ r.acknowledge(m, new Receiver.SettledAction()
+ {
+ public void onSettled(final Binary deliveryTag)
+ {
+ final File dest = new File(directory, fileName);
+ if(dest.exists())
+ {
+ System.err.println("Duplicate detected - filename " + dest.getName());
+ }
+ unsettledFile.renameTo(dest);
+
+ }
+ });
+
+ }
+ }
+ }
+ }
+ while(m != null);
+
+
+ r.close();
+ }
+ else
+ {
+ System.err.println("No such directory: " + directoryName);
+ }
+ session.close();
+ conn.close();
+ }
+ catch (Connection.ConnectionException e)
+ {
+ e.printStackTrace();
+ }
+ catch (FileNotFoundException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (AmqpErrorException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+
+ }
+
+ public static void main(String[] args)
+ {
+ new Filereceiver(args).run();
+ }
+}
diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
new file mode 100644
index 0000000000..46e6ba537f
--- /dev/null
+++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
@@ -0,0 +1,296 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.*;
+
+public class Filesender extends Util
+{
+ private static final String USAGE_STRING = "filesender [options] <address> <directory>\n\nOptions:";
+
+ protected Filesender(String[] args)
+ {
+ super(args);
+ }
+
+ @Override
+ protected boolean hasLinkDurableOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasLinkNameOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasResponseQueueOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasSizeOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasBlockOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasStdInOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasTxnOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasModeOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasCountOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected void printUsage(Options options)
+ {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(USAGE_STRING, options );
+
+ }
+
+ @Override
+ protected void run()
+ {
+ final String queue = getArgs()[0];
+ final String directoryName = getArgs()[1];
+
+ try
+ {
+ MessageDigest md5 = MessageDigest.getInstance("MD5");
+ Connection conn = newConnection();
+
+ Session session = conn.createSession();
+
+ File directory = new File(directoryName);
+ if(directory.isDirectory() && directory.canWrite())
+ {
+
+ File tmpDirectory = new File(directoryName, ".tmp");
+ if(!tmpDirectory.exists())
+ {
+ tmpDirectory.mkdir();
+ }
+
+ String[] unsettledFiles = tmpDirectory.list();
+
+
+
+ Map<Binary, Outcome> unsettled = new HashMap<Binary, Outcome>();
+ Map<Binary, String> unsettledFileNames = new HashMap<Binary, String>();
+ for(String fileName : unsettledFiles)
+ {
+ File aFile = new File(tmpDirectory, fileName);
+ if(aFile.canRead() && aFile.canWrite())
+ {
+ Binary deliveryTag = new Binary(md5.digest(fileName.getBytes()));
+ unsettled.put(deliveryTag, null);
+ unsettledFileNames.put(deliveryTag, fileName);
+ }
+ }
+
+
+ Sender s = session.createSender(queue, 10, AcknowledgeMode.EO, getLinkName(), isDurableLink(),
+ unsettled);
+
+ Map<Binary, DeliveryState> remoteUnsettled = s.getRemoteUnsettled();
+
+ for(Map.Entry<Binary, String> entry: unsettledFileNames.entrySet())
+ {
+ if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey()))
+ {
+ (new File(tmpDirectory, entry.getValue())).renameTo(new File(directory, entry.getValue()));
+ }
+ }
+
+ if(remoteUnsettled != null)
+ {
+ for(Map.Entry<Binary, DeliveryState> entry : remoteUnsettled.entrySet())
+ {
+ if(entry.getValue() instanceof Accepted)
+ {
+ final String fileName = unsettledFileNames.get(entry.getKey());
+ if(fileName != null)
+ {
+
+ Message resumed = new Message();
+ resumed.setDeliveryTag(entry.getKey());
+ resumed.setDeliveryState(entry.getValue());
+ resumed.setResume(Boolean.TRUE);
+ resumed.setSettled(Boolean.TRUE);
+
+
+
+ final File unsettledFile = new File(tmpDirectory, fileName);
+ unsettledFile.delete();
+
+ s.send(resumed);
+
+ }
+
+ }
+ else if(entry.getValue() instanceof Received || entry.getValue() == null)
+ {
+ final File unsettledFile = new File(tmpDirectory, unsettledFileNames.get(entry.getKey()));
+ Message resumed = createMessageFromFile(md5, unsettledFileNames.get(entry.getKey()), unsettledFile);
+ resumed.setResume(Boolean.TRUE);
+ Sender.OutcomeAction action = new Sender.OutcomeAction()
+ {
+ public void onOutcome(Binary deliveryTag, Outcome outcome)
+ {
+ if(outcome instanceof Accepted)
+ {
+ unsettledFile.delete();
+ }
+ }
+ };
+ s.send(resumed, action);
+
+ }
+ }
+ }
+
+
+
+ String[] files = directory.list();
+
+ for(String fileName : files)
+ {
+ final File file = new File(directory, fileName);
+
+ if(file.canRead() && file.canWrite() && !file.isDirectory())
+ {
+ Message message = createMessageFromFile(md5, fileName, file);
+
+ final File unsettledFile = new File(tmpDirectory, fileName);
+
+ Sender.OutcomeAction action = new Sender.OutcomeAction()
+ {
+ public void onOutcome(Binary deliveryTag, Outcome outcome)
+ {
+ if(outcome instanceof Accepted)
+ {
+ unsettledFile.delete();
+ }
+ }
+ };
+
+ file.renameTo(unsettledFile);
+
+ s.send(message, action);
+ }
+ }
+
+ s.close();
+ }
+ else
+ {
+ System.err.println("No such directory: " + directory);
+ }
+ session.close();
+ conn.close();
+ }
+ catch (Connection.ConnectionException e)
+ {
+ e.printStackTrace();
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ e.printStackTrace();
+ } catch (FileNotFoundException e)
+ {
+ e.printStackTrace();
+ } catch (IOException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ } catch (NoSuchAlgorithmException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ } catch (Sender.SenderClosingException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+
+ }
+
+ private Message createMessageFromFile(MessageDigest md5, String fileName, File file) throws IOException
+ {
+ FileInputStream fis = new FileInputStream(file);
+ byte[] data = new byte[(int) file.length()];
+
+ int read = fis.read(data);
+
+ fis.close();
+
+ Section applicationProperties = new ApplicationProperties(Collections.singletonMap("filename", fileName));
+ Section amqpValue = new Data(new Binary(data));
+ Message message = new Message(Arrays.asList(applicationProperties, amqpValue));
+ Binary deliveryTag = new Binary(md5.digest(fileName.getBytes()));
+ message.setDeliveryTag(deliveryTag);
+ md5.reset();
+ return message;
+ }
+
+ public static void main(String[] args)
+ {
+ new Filesender(args).run();
+ }
+}
diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java
new file mode 100644
index 0000000000..0da9dc3fb7
--- /dev/null
+++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java
@@ -0,0 +1,246 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.commons.cli.*;
+import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter;
+
+import java.util.Collections;
+
+public class Receive extends Util
+{
+ private static final String USAGE_STRING = "receive [options] <address> \n\nOptions:";
+ private static final UnsignedLong UNSIGNED_LONG_ONE = UnsignedLong.valueOf(1L);
+ private UnsignedLong _lastCorrelationId;
+
+ public static void main(String[] args)
+ {
+ new Receive(args).run();
+ }
+
+
+ public Receive(final String[] args)
+ {
+ super(args);
+ }
+
+ @Override
+ protected boolean hasLinkDurableOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasLinkNameOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasResponseQueueOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasSizeOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasBlockOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasStdInOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasTxnOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasModeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasCountOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasWindowSizeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasFilterOption()
+ {
+ return true;
+ }
+
+ protected void run()
+ {
+
+ try
+ {
+ final String queue = getArgs()[0];
+
+ String message = "";
+
+ Connection conn = newConnection();
+
+
+ Session session = conn.createSession();
+
+ Filter filter = null;
+ if(getFilter() != null)
+ {
+ String[] filterParts = getFilter().split("=",2);
+ if("exact-subject".equals(filterParts[0]))
+ {
+ filter = new ExactSubjectFilter(filterParts[1]);
+ }
+ else if("matching-subject".equals(filterParts[0]))
+ {
+ filter = new MatchingSubjectFilter(filterParts[1]);
+ }
+ else
+ {
+ System.err.println("Unknown filter type: " + filterParts[0]);
+ }
+ }
+
+ Receiver r =
+ filter == null
+ ? session.createReceiver(queue, getMode(), getLinkName(), isDurableLink())
+ : session.createReceiver(queue, getMode(), getLinkName(), isDurableLink(), Collections.singletonMap(Symbol.valueOf("filter"), filter), null);
+ Transaction txn = null;
+
+ int credit = 0;
+ int receivedCount = 0;
+
+ if(!useStdIn())
+ {
+ if(getArgs().length <= 2)
+ {
+
+ Transaction txn2 = null;
+ if(useTran())
+ {
+ txn = session.createSessionLocalTransaction();
+ txn2 = session.createSessionLocalTransaction();
+ }
+
+ for(int i = 0; i < getCount(); i++)
+ {
+
+ if(credit == 0)
+ {
+ if(getCount() - i <= getWindowSize())
+ {
+ credit = getCount() - i;
+
+ }
+ else
+ {
+ credit = getWindowSize();
+
+ }
+
+ {
+ r.setCredit(UnsignedInteger.valueOf(credit), false);
+ }
+ if(!isBlock())
+ r.drain();
+ }
+
+ Message m = isBlock() ? r.receive() : r.receive(1000L);
+ credit--;
+ if(m==null)
+ {
+ break;
+ }
+
+
+
+ r.acknowledge(m.getDeliveryTag(),txn);
+
+ receivedCount++;
+
+ System.out.println("Received Message : " + m.getPayload());
+ }
+
+ if(useTran())
+ {
+ txn.commit();
+ }
+ }
+ else
+ {
+ // TODO
+ }
+ }
+ else
+ {
+ // TODO
+ }
+ r.close();
+ session.close();
+ conn.close();
+ System.out.println("Total Messages Received: " + receivedCount);
+ }
+ catch (Connection.ConnectionException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (AmqpErrorException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+
+ }
+
+ protected void printUsage(Options options)
+ {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(USAGE_STRING, options );
+ }
+
+}
diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
new file mode 100644
index 0000000000..6e1d15376c
--- /dev/null
+++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
@@ -0,0 +1,249 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.commons.cli.*;
+
+import java.util.Arrays;
+
+public class Request extends Util
+{
+ private static final String USAGE_STRING = "request [options] <address> [<content> ...]\n\nOptions:";
+
+ public static void main(String[] args)
+ {
+ new Request(args).run();
+ }
+
+ public Request(String[] args)
+ {
+ super(args);
+ }
+
+ @Override
+ protected boolean hasLinkDurableOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasLinkNameOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasResponseQueueOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasSizeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasBlockOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasStdInOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasTxnOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasModeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasCountOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasWindowSizeOption()
+ {
+ return true;
+ }
+
+ public void run()
+ {
+
+ try
+ {
+
+
+ final String queue = getArgs()[0];
+
+ String message = "";
+
+ Connection conn = newConnection();
+ Session session = conn.createSession();
+
+ Connection conn2;
+ Session session2;
+ Receiver responseReceiver;
+
+ if(isUseMultipleConnections())
+ {
+ conn2 = newConnection();
+ session2 = conn2.createSession();
+ responseReceiver = session2.createTemporaryQueueReceiver();
+ }
+ else
+ {
+ conn2 = null;
+ session2 = null;
+ responseReceiver = session.createTemporaryQueueReceiver();
+ }
+
+
+
+
+ responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
+
+
+
+ Sender s = session.createSender(queue, getWindowSize(), getMode());
+
+ Transaction txn = null;
+
+ if(useTran())
+ {
+ txn = session.createSessionLocalTransaction();
+ }
+
+ int received = 0;
+
+ if(getArgs().length >= 2)
+ {
+ message = getArgs()[1];
+ if(message.length() < getMessageSize())
+ {
+ StringBuilder builder = new StringBuilder(getMessageSize());
+ builder.append(message);
+ for(int x = message.length(); x < getMessageSize(); x++)
+ {
+ builder.append('.');
+ }
+ message = builder.toString();
+ }
+
+ for(int i = 0; i < getCount(); i++)
+ {
+ Properties properties = new Properties();
+ properties.setMessageId(UnsignedLong.valueOf(i));
+ properties.setReplyTo(responseReceiver.getAddress());
+
+ AmqpValue amqpValue = new AmqpValue(message);
+ Section[] sections = { new Header() , properties, amqpValue};
+ final Message message1 = new Message(Arrays.asList(sections));
+
+ s.send(message1, txn);
+
+ Message responseMessage = responseReceiver.receive(false);
+ if(responseMessage != null)
+ {
+ responseReceiver.acknowledge(responseMessage.getDeliveryTag(),txn);
+ received++;
+ }
+ }
+ }
+
+ if(txn != null)
+ {
+ txn.commit();
+ }
+
+
+ while(received < getCount())
+ {
+ Message responseMessage = responseReceiver.receive();
+ responseReceiver.acknowledge(responseMessage.getDeliveryTag());
+ received++;
+ }
+
+
+
+
+ s.close();
+ session.close();
+ conn.close();
+
+ if(session2 != null)
+ {
+ session2.close();
+ conn2.close();
+ }
+ }
+ catch (Connection.ConnectionException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (AmqpErrorException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+
+ }
+
+ protected boolean hasSingleLinkPerConnectionMode()
+ {
+ return true;
+ }
+
+ protected void printUsage(Options options)
+ {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(USAGE_STRING, options );
+ }
+
+}
diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
new file mode 100644
index 0000000000..8d9de4893f
--- /dev/null
+++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
@@ -0,0 +1,347 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.commons.cli.*;
+
+import java.util.*;
+
+public class Respond extends Util
+{
+ private static final String USAGE_STRING = "respond [options] <address>\n\nOptions:";
+ private Connection _conn;
+ private Session _session;
+ private Receiver _receiver;
+ private Transaction _txn;
+ private Map<String,Sender> _senders;
+ private UnsignedLong _responseMsgId = UnsignedLong.ZERO;
+ private Connection _conn2;
+ private Session _session2;
+
+ public Respond(final String[] args)
+ {
+ super(args);
+ }
+
+ @Override
+ protected boolean hasLinkDurableOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasLinkNameOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasResponseQueueOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasSizeOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasBlockOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasStdInOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasTxnOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasModeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasCountOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasSingleLinkPerConnectionMode()
+ {
+ return true;
+ }
+
+
+ @Override
+ protected boolean hasWindowSizeOption()
+ {
+ return true;
+ }
+
+ public static void main(String[] args)
+ {
+ new Respond(args).run();
+ }
+
+ public void run()
+ {
+ try
+ {
+
+ _senders = new HashMap<String, Sender>();
+
+ final String queue = getArgs()[0];
+
+ String message = "";
+
+ _conn = newConnection();
+
+
+
+ if(isUseMultipleConnections())
+ {
+ _conn2 = newConnection();
+ _session2 = _conn2.createSession();
+ }
+
+
+ _session = _conn.createSession();
+
+
+ _receiver = _session.createReceiver(queue, getMode());
+ _txn = null;
+
+ int credit = 0;
+ int receivedCount = 0;
+ _responseMsgId = UnsignedLong.ZERO;
+
+ Random random = null;
+ int batch = 0;
+ List<Message> txnMessages = null;
+ if(useTran())
+ {
+ if(getRollbackRatio() != 0)
+ {
+ random = new Random();
+ }
+ batch = getBatchSize();
+ _txn = _session.createSessionLocalTransaction();
+ txnMessages = new ArrayList<Message>(batch);
+ }
+
+
+ for(int i = 0; receivedCount < getCount(); i++)
+ {
+
+ if(credit == 0)
+ {
+ if(getCount() - i <= getWindowSize())
+ {
+ credit = getCount() - i;
+
+ }
+ else
+ {
+ credit = getWindowSize();
+
+ }
+
+ _receiver.setCredit(UnsignedInteger.valueOf(credit), false);
+
+ if(!isBlock())
+ _receiver.drain();
+ }
+
+ Message m = isBlock() ? (receivedCount == 0 ? _receiver.receive() : _receiver.receive(10000L)) : _receiver.receive(1000L);
+ credit--;
+ if(m==null)
+ {
+ if(useTran() && batch != getBatchSize())
+ {
+ _txn.commit();
+ }
+ break;
+ }
+
+ System.out.println("Received Message: " + m.getPayload());
+
+ respond(m);
+
+
+
+ if(useTran())
+ {
+
+ txnMessages.add(m);
+
+ if(--batch == 0)
+ {
+
+ if(getRollbackRatio() == 0 || random.nextDouble() >= getRollbackRatio())
+ {
+ _txn.commit();
+ txnMessages.clear();
+ receivedCount += getBatchSize();
+ }
+ else
+ {
+ System.out.println("Random Rollback");
+ _txn.rollback();
+ double result;
+ do
+ {
+ _txn = _session.createSessionLocalTransaction();
+
+ for(Message msg : txnMessages)
+ {
+ respond(msg);
+ }
+
+ result = random.nextDouble();
+ if(result<getRollbackRatio())
+ {
+ _txn.rollback();
+ }
+ else
+ {
+ _txn.commit();
+ txnMessages.clear();
+ receivedCount += getBatchSize();
+ }
+ }
+ while(result < getRollbackRatio());
+ }
+ _txn = _session.createSessionLocalTransaction();
+
+ batch = getBatchSize();
+ }
+ }
+ else
+ {
+ receivedCount++;
+ }
+
+ }
+
+
+ for(Sender s : _senders.values())
+ {
+ s.close();
+ }
+
+ _receiver.close();
+ _session.close();
+ _conn.close();
+ System.out.println("Received: " + receivedCount);
+ }
+ catch (Connection.ConnectionException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (AmqpErrorException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ }
+
+ private void respond(Message m) throws Sender.SenderCreationException
+ {
+ List<Section> sections = m.getPayload();
+ String replyTo = null;
+ Object correlationId = null;
+ for(Section section : sections)
+ {
+ if(section instanceof Properties)
+ {
+ replyTo = getResponseQueue() == null ? ((Properties)section).getReplyTo() : getResponseQueue();
+ correlationId = ((Properties) section).getMessageId();
+ break;
+ }
+ }
+
+ if(replyTo != null)
+ {
+ Sender s = _senders.get(replyTo);
+ if(s == null)
+ {
+ s = (isUseMultipleConnections() ? _session2 : _session).createSender(replyTo,getWindowSize());
+ _senders.put(replyTo, s);
+ }
+
+ List<Section> replySections = new ArrayList<Section>(sections);
+
+ ListIterator<Section> sectionIterator = replySections.listIterator();
+
+ while(sectionIterator.hasNext())
+ {
+ Section section = sectionIterator.next();
+ if(section instanceof Properties)
+ {
+ Properties newProps = new Properties();
+ newProps.setTo(replyTo);
+ newProps.setCorrelationId(correlationId);
+ newProps.setMessageId(_responseMsgId);
+ _responseMsgId = _responseMsgId.add(UnsignedLong.ONE);
+ sectionIterator.set(newProps);
+ }
+ }
+
+ Message replyMessage = new Message(replySections);
+ System.out.println("Sent Message: " + replySections);
+ s.send(replyMessage, _txn);
+
+ }
+ _receiver.acknowledge(m.getDeliveryTag(), _txn);
+ }
+
+ protected void printUsage(Options options)
+ {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(USAGE_STRING, options );
+ }
+
+}
diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
new file mode 100644
index 0000000000..6f6575e083
--- /dev/null
+++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
@@ -0,0 +1,244 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.client;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.util.Arrays;
+
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.commons.cli.*;
+
+public class Send extends Util
+{
+ private static final String USAGE_STRING = "send [options] <address> [<content> ...]\n\nOptions:";
+ private static final char[] HEX = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'};
+
+
+ public static void main(String[] args) throws Sender.SenderCreationException, Sender.SenderClosingException, Connection.ConnectionException
+ {
+ new Send(args).run();
+ }
+
+
+ public Send(final String[] args)
+ {
+ super(args);
+ }
+
+ @Override
+ protected boolean hasLinkDurableOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasLinkNameOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasResponseQueueOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasSizeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasBlockOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasStdInOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasTxnOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasModeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasCountOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasWindowSizeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasSubjectOption()
+ {
+ return true;
+ }
+
+ public void run()
+ {
+
+ final String queue = getArgs()[0];
+
+ String message = "";
+
+ try
+ {
+ Connection conn = newConnection();
+
+ Session session = conn.createSession();
+
+
+ Sender s = session.createSender(queue, getWindowSize(), getMode(), getLinkName());
+
+ Transaction txn = null;
+
+ if(useTran())
+ {
+ txn = session.createSessionLocalTransaction();
+ }
+
+ if(!useStdIn())
+ {
+ if(getArgs().length <= 2)
+ {
+ if(getArgs().length == 2)
+ {
+ message = getArgs()[1];
+ }
+ for(int i = 0; i < getCount(); i++)
+ {
+
+ Properties properties = new Properties();
+ properties.setMessageId(UnsignedLong.valueOf(i));
+ if(getSubject() != null)
+ {
+ properties.setSubject(getSubject());
+ }
+ Section bodySection;
+ byte[] bytes = (message + " " + i).getBytes();
+ if(bytes.length < getMessageSize())
+ {
+ byte[] origBytes = bytes;
+ bytes = new byte[getMessageSize()];
+ System.arraycopy(origBytes,0,bytes,0,origBytes.length);
+ for(int x = origBytes.length; x < bytes.length; x++)
+ {
+ bytes[x] = (byte) '.';
+ }
+ bodySection = new Data(new Binary(bytes));
+ }
+ else
+ {
+ bodySection = new AmqpValue(message + " " + i);
+ }
+
+ Section[] sections = {properties, bodySection};
+ final Message message1 = new Message(Arrays.asList(sections));
+
+ s.send(message1, txn);
+ }
+ }
+ else
+ {
+ for(int i = 1; i < getArgs().length; i++)
+ {
+ s.send(new Message(getArgs()[i]), txn);
+ }
+
+ }
+ }
+ else
+ {
+ LineNumberReader buf = new LineNumberReader(new InputStreamReader(System.in));
+
+
+ try
+ {
+ while((message = buf.readLine()) != null)
+ {
+ s.send(new Message(message), txn);
+ }
+ }
+ catch (IOException e)
+ {
+ // TODO
+ e.printStackTrace();
+ }
+ }
+
+ if(txn != null)
+ {
+ txn.commit();
+ }
+
+ s.close();
+
+ session.close();
+ conn.close();
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (Connection.ConnectionException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+
+
+ }
+
+ protected void printUsage(Options options)
+ {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(USAGE_STRING, options );
+ }
+
+}
diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java
new file mode 100644
index 0000000000..6fe2a6d510
--- /dev/null
+++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java
@@ -0,0 +1,529 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.transport.Container;
+import org.apache.commons.cli.*;
+
+import java.util.logging.*;
+
+public abstract class Util
+{
+
+ private static final Logger FRAME_LOGGER = Logger.getLogger("FRM");
+ private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
+ private String _host;
+ private String _username;
+ private String _password;
+ private int _port;
+ private int _count;
+ private boolean _useStdIn;
+ private boolean _useTran;
+ private String[] _args;
+ private AcknowledgeMode _mode;
+ private boolean _block;
+ private int _frameSize;
+ private int _messageSize;
+ private String _responseQueue;
+ private int _batchSize;
+ private double _rollbackRatio;
+ private String _linkName;
+ private String _containerName;
+ private boolean _durableLink;
+ private boolean _useMultipleConnections;
+ private int _windowSize = 100;
+ private String _subject;
+ private String _filter;
+ private String _remoteHost;
+ private boolean _useSSL;
+
+ protected Util(String[] args)
+ {
+ CommandLineParser cmdLineParse = new PosixParser();
+
+ Options options = new Options();
+ options.addOption("h","help",false,"show this help message and exit");
+ options.addOption(OptionBuilder.withLongOpt("host")
+ .withDescription( "host to connect to (default 0.0.0.0)" )
+ .hasArg(true)
+ .withArgName("HOST")
+ .create('H'));
+ options.addOption(OptionBuilder.withLongOpt("username")
+ .withDescription( "username to use for authentication" )
+ .hasArg(true)
+ .withArgName("USERNAME")
+ .create('u'));
+ options.addOption(OptionBuilder.withLongOpt("password")
+ .withDescription( "password to use for authentication" )
+ .hasArg(true)
+ .withArgName("PASSWORD")
+ .create('w'));
+ options.addOption(OptionBuilder.withLongOpt("port")
+ .withDescription( "port to connect to (default 5672)" )
+ .hasArg(true)
+ .withArgName("PORT")
+ .create('p'));
+ options.addOption(OptionBuilder.withLongOpt("frame-size")
+ .withDescription( "specify the maximum frame size" )
+ .hasArg(true)
+ .withArgName("FRAME_SIZE")
+ .create('f'));
+ options.addOption(OptionBuilder.withLongOpt("container-name")
+ .withDescription( "Container name" )
+ .hasArg(true)
+ .withArgName("CONTAINER_NAME")
+ .create('C'));
+
+ options.addOption(OptionBuilder.withLongOpt("ssl")
+ .withDescription("Use SSL")
+ .create('S'));
+
+ options.addOption(OptionBuilder.withLongOpt("remote-hostname")
+ .withDescription( "hostname to supply in the open frame" )
+ .hasArg(true)
+ .withArgName("HOST")
+ .create('O'));
+
+ if(hasBlockOption())
+ options.addOption(OptionBuilder.withLongOpt("block")
+ .withDescription("block until messages arrive")
+ .create('b'));
+
+ if(hasCountOption())
+ options.addOption(OptionBuilder.withLongOpt("count")
+ .withDescription( "number of messages to send (default 1)" )
+ .hasArg(true)
+ .withArgName("COUNT")
+ .create('c'));
+ if(hasModeOption())
+ options.addOption(OptionBuilder.withLongOpt("acknowledge-mode")
+ .withDescription( "acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once" )
+ .hasArg(true)
+ .withArgName("MODE")
+ .create('k'));
+
+ if(hasSubjectOption())
+ options.addOption(OptionBuilder.withLongOpt("subject")
+ .withDescription( "subject message property" )
+ .hasArg(true)
+ .withArgName("SUBJECT")
+ .create('s'));
+
+
+ if(hasSingleLinkPerConnectionMode())
+ options.addOption(OptionBuilder.withLongOpt("single-link-per-connection")
+ .withDescription("acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once")
+ .hasArg(false)
+ .create('Z'));
+
+ if(hasFilterOption())
+ options.addOption(OptionBuilder.withLongOpt("filter")
+ .withDescription("filter, e.g. exact-subject=hello; matching-subject=%.a.#")
+ .hasArg(true)
+ .withArgName("<TYPE>=<VALUE>")
+ .create('F'));
+
+
+ if(hasTxnOption())
+ {
+ options.addOption("x","txn",false,"use transactions");
+ options.addOption(OptionBuilder.withLongOpt("batch-size")
+ .withDescription( "transaction batch size (default: 1)" )
+ .hasArg(true)
+ .withArgName("BATCH-SIZE")
+ .create('B'));
+ options.addOption(OptionBuilder.withLongOpt("rollback-ratio")
+ .withDescription( "rollback ratio - must be between 0 and 1 (default: 0)" )
+ .hasArg(true)
+ .withArgName("RATIO")
+ .create('R'));
+ }
+
+ if(hasLinkDurableOption())
+ {
+ options.addOption("d","durable-link",false,"use a durable link");
+ }
+
+ if(hasStdInOption())
+ options.addOption("i","stdin",false,"read messages from stdin (one message per line)");
+
+ options.addOption(OptionBuilder.withLongOpt("trace")
+ .withDescription("trace logging specified categories: RAW, FRM")
+ .hasArg(true)
+ .withArgName("TRACE")
+ .create('t'));
+ if(hasSizeOption())
+ options.addOption(OptionBuilder.withLongOpt("message-size")
+ .withDescription( "size to pad outgoing messages to" )
+ .hasArg(true)
+ .withArgName("SIZE")
+ .create('z'));
+
+ if(hasResponseQueueOption())
+ options.addOption(OptionBuilder.withLongOpt("response-queue")
+ .withDescription( "response queue to reply to" )
+ .hasArg(true)
+ .withArgName("RESPONSE_QUEUE")
+ .create('r'));
+
+ if(hasLinkNameOption())
+ {
+ options.addOption(OptionBuilder.withLongOpt("link")
+ .withDescription( "link name" )
+ .hasArg(true)
+ .withArgName("LINK")
+ .create('l'));
+ }
+
+ if(hasWindowSizeOption())
+ {
+ options.addOption(OptionBuilder.withLongOpt("window-size")
+ .withDescription("credit window size")
+ .hasArg(true)
+ .withArgName("WINDOW-SIZE")
+ .create('W'));
+ }
+
+ CommandLine cmdLine = null;
+ try
+ {
+ cmdLine = cmdLineParse.parse(options, args);
+
+ }
+ catch (ParseException e)
+ {
+ printUsage(options);
+ System.exit(-1);
+ }
+
+ if(cmdLine.hasOption('h') || cmdLine.getArgList().isEmpty())
+ {
+ printUsage(options);
+ System.exit(0);
+ }
+ _host = cmdLine.getOptionValue('H',"0.0.0.0");
+ _remoteHost = cmdLine.getOptionValue('O',null);
+ String portStr = cmdLine.getOptionValue('p',"5672");
+ String countStr = cmdLine.getOptionValue('c',"1");
+
+ _useSSL = cmdLine.hasOption('S');
+
+ if(hasWindowSizeOption())
+ {
+ String windowSizeStr = cmdLine.getOptionValue('W',"100");
+ _windowSize = Integer.parseInt(windowSizeStr);
+ }
+
+ if(hasSubjectOption())
+ {
+ _subject = cmdLine.getOptionValue('s');
+ }
+
+ if(cmdLine.hasOption('u'))
+ {
+ _username = cmdLine.getOptionValue('u');
+ }
+
+ if(cmdLine.hasOption('w'))
+ {
+ _password = cmdLine.getOptionValue('w');
+ }
+
+ if(cmdLine.hasOption('F'))
+ {
+ _filter = cmdLine.getOptionValue('F');
+ }
+
+ _port = Integer.parseInt(portStr);
+
+ _containerName = cmdLine.getOptionValue('C');
+
+ if(hasBlockOption())
+ _block = cmdLine.hasOption('b');
+
+ if(hasLinkNameOption())
+ _linkName = cmdLine.getOptionValue('l');
+
+
+ if(hasLinkDurableOption())
+ _durableLink = cmdLine.hasOption('d');
+
+ if(hasCountOption())
+ _count = Integer.parseInt(countStr);
+
+ if(hasStdInOption())
+ _useStdIn = cmdLine.hasOption('i');
+
+ if(hasSingleLinkPerConnectionMode())
+ _useMultipleConnections = cmdLine.hasOption('Z');
+
+ if(hasTxnOption())
+ {
+ _useTran = cmdLine.hasOption('x');
+ _batchSize = Integer.parseInt(cmdLine.getOptionValue('B',"1"));
+ _rollbackRatio = Double.parseDouble(cmdLine.getOptionValue('R',"0"));
+ }
+
+ if(hasModeOption())
+ {
+ _mode = AcknowledgeMode.ALO;
+
+ if(cmdLine.hasOption('k'))
+ {
+ _mode = AcknowledgeMode.valueOf(cmdLine.getOptionValue('k'));
+ }
+ }
+
+ if(hasResponseQueueOption())
+ {
+ _responseQueue = cmdLine.getOptionValue('r');
+ }
+
+ _frameSize = Integer.parseInt(cmdLine.getOptionValue('f',"65536"));
+
+ if(hasSizeOption())
+ {
+ _messageSize = Integer.parseInt(cmdLine.getOptionValue('z',"-1"));
+ }
+
+ String categoriesList = cmdLine.getOptionValue('t');
+ String[]categories = categoriesList == null ? new String[0] : categoriesList.split("[, ]");
+ for(String cat : categories)
+ {
+ if(cat.equalsIgnoreCase("FRM"))
+ {
+ FRAME_LOGGER.setLevel(Level.FINE);
+ Formatter formatter = new Formatter()
+ {
+ @Override
+ public String format(final LogRecord record)
+ {
+ return "[" + record.getMillis() + " FRM]\t" + record.getMessage() + "\n";
+ }
+ };
+ for(Handler handler : FRAME_LOGGER.getHandlers())
+ {
+ FRAME_LOGGER.removeHandler(handler);
+ }
+ Handler handler = new ConsoleHandler();
+ handler.setLevel(Level.FINE);
+ handler.setFormatter(formatter);
+ FRAME_LOGGER.addHandler(handler);
+ }
+ else if (cat.equalsIgnoreCase("RAW"))
+ {
+ RAW_LOGGER.setLevel(Level.FINE);
+ Formatter formatter = new Formatter()
+ {
+ @Override
+ public String format(final LogRecord record)
+ {
+ return "[" + record.getMillis() + " RAW]\t" + record.getMessage() + "\n";
+ }
+ };
+ for(Handler handler : RAW_LOGGER.getHandlers())
+ {
+ RAW_LOGGER.removeHandler(handler);
+ }
+ Handler handler = new ConsoleHandler();
+ handler.setLevel(Level.FINE);
+ handler.setFormatter(formatter);
+ RAW_LOGGER.addHandler(handler);
+ }
+ }
+
+
+ _args = cmdLine.getArgs();
+
+ }
+
+ protected boolean hasFilterOption()
+ {
+ return false;
+ }
+
+ protected boolean hasSubjectOption()
+ {
+ return false;
+ }
+
+ protected boolean hasWindowSizeOption()
+ {
+ return false;
+ }
+
+ protected boolean hasSingleLinkPerConnectionMode()
+ {
+ return false;
+ }
+
+ protected abstract boolean hasLinkDurableOption();
+
+ protected abstract boolean hasLinkNameOption();
+
+ protected abstract boolean hasResponseQueueOption();
+
+ protected abstract boolean hasSizeOption();
+
+ protected abstract boolean hasBlockOption();
+
+ protected abstract boolean hasStdInOption();
+
+ protected abstract boolean hasTxnOption();
+
+ protected abstract boolean hasModeOption();
+
+ protected abstract boolean hasCountOption();
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public String getUsername()
+ {
+ return _username;
+ }
+
+ public String getPassword()
+ {
+ return _password;
+ }
+
+ public int getPort()
+ {
+ return _port;
+ }
+
+ public int getCount()
+ {
+ return _count;
+ }
+
+ public boolean useStdIn()
+ {
+ return _useStdIn;
+ }
+
+ public boolean useTran()
+ {
+ return _useTran;
+ }
+
+ public AcknowledgeMode getMode()
+ {
+ return _mode;
+ }
+
+ public boolean isBlock()
+ {
+ return _block;
+ }
+
+ public String[] getArgs()
+ {
+ return _args;
+ }
+
+ public int getMessageSize()
+ {
+ return _messageSize;
+ }
+
+ public String getResponseQueue()
+ {
+ return _responseQueue;
+ }
+
+ public int getBatchSize()
+ {
+ return _batchSize;
+ }
+
+ public double getRollbackRatio()
+ {
+ return _rollbackRatio;
+ }
+
+ public String getLinkName()
+ {
+ return _linkName;
+ }
+
+ public boolean isDurableLink()
+ {
+ return _durableLink;
+ }
+
+ public boolean isUseMultipleConnections()
+ {
+ return _useMultipleConnections;
+ }
+
+ public void setUseMultipleConnections(boolean useMultipleConnections)
+ {
+ _useMultipleConnections = useMultipleConnections;
+ }
+
+ public String getSubject()
+ {
+ return _subject;
+ }
+
+ public void setSubject(String subject)
+ {
+ _subject = subject;
+ }
+
+ protected abstract void printUsage(final Options options);
+
+ protected abstract void run();
+
+
+ public Connection newConnection() throws Connection.ConnectionException
+ {
+ Container container = getContainerName() == null ? new Container() : new Container(getContainerName());
+ return getUsername() == null ? new Connection(getHost(), getPort(), null, null, _frameSize, container,
+ _remoteHost == null ? getHost() : _remoteHost, _useSSL)
+ : new Connection(getHost(), getPort(), getUsername(), getPassword(), _frameSize,
+ container, _remoteHost == null ? getHost() : _remoteHost, _useSSL);
+ }
+
+ public String getContainerName()
+ {
+ return _containerName;
+ }
+
+ public int getWindowSize()
+ {
+ return _windowSize;
+ }
+
+ public void setWindowSize(int windowSize)
+ {
+ _windowSize = windowSize;
+ }
+
+ public String getFilter()
+ {
+ return _filter;
+ }
+}