diff options
Diffstat (limited to 'java/amqp-1-0-client/src')
14 files changed, 45 insertions, 3357 deletions
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java deleted file mode 100644 index 3bb26744c4..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import java.lang.reflect.InvocationTargetException; - -public class Command -{ - public static void main(String[] args) throws - ClassNotFoundException, - NoSuchMethodException, - InvocationTargetException, - IllegalAccessException, - InstantiationException - { - String name = args[0]; - String[] cmdArgs = new String[args.length-1]; - System.arraycopy(args,1,cmdArgs,0,args.length-1); - name = "org.apache.qpid.amqp_1_0.client." + String.valueOf(name.charAt(0)).toUpperCase() + name.substring(1).toLowerCase(); - Class<Util> clazz = (Class<Util>) Class.forName(name); - Util util = clazz.getDeclaredConstructor(String[].class).newInstance((Object)cmdArgs); - util.run(); - - } -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java index e3d56fae09..e501662dbb 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java @@ -20,6 +20,15 @@ */
package org.apache.qpid.amqp_1_0.client;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.net.ssl.SSLSocketFactory;
import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
import org.apache.qpid.amqp_1_0.transport.AMQPTransport;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
@@ -30,17 +39,6 @@ import org.apache.qpid.amqp_1_0.type.FrameBody; import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import javax.net.ssl.SSLSocket;
-import javax.net.ssl.SSLSocketFactory;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
public class Connection
{
private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
@@ -224,7 +222,6 @@ public class Connection }
- //ConnectionHandler.OutputHandler outputHandler = new ConnectionHandler.OutputHandler(outputStream, out, _conn.getDescribedTypeRegistry());
ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn);
Thread outputThread = new Thread(outputHandler);
outputThread.setDaemon(true);
@@ -236,8 +233,6 @@ public class Connection final ConnectionHandler handler = new ConnectionHandler(_conn);
final InputStream inputStream = s.getInputStream();
- //final AMQPTransport transport = new AMQPTransport(new AMQPFrameTransport(_conn));
-
Thread inputThread = new Thread(new Runnable()
{
@@ -246,7 +241,6 @@ public class Connection try
{
doRead(handler, inputStream);
-// doRead(transport, inputStream);
}
finally
{
@@ -268,85 +262,6 @@ public class Connection inputThread.setDaemon(true);
inputThread.start();
-/*
- Thread outputThread = new Thread(new Runnable()
- {
-
- private int _lastWrite;
-
- public void run()
- {
- try
- {
-// doRead(handler, inputStream);
- final Object lock = new Object();
- transport.setOutputStateChangeListener(new StateChangeListener()
- {
-
- public void onStateChange(final boolean active)
- {
- synchronized (lock)
- {
- lock.notifyAll();
- }
- }
- });
-
- synchronized(lock)
- {
- while(transport.isOpenForOutput())
- {
- _lastWrite = 0;
- transport.getNextBytes(new BytesProcessor()
- {
-
- public void processBytes(final ByteBuffer buf)
- {
- _lastWrite = buf.remaining();
- try
- {
- outputStream.write(buf.array(),
- buf.arrayOffset() + buf.position(),
- buf.limit() - buf.position());
- }
- catch (IOException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- });
- if(_lastWrite == 0 && transport.isOpenForOutput())
- {
- try
- {
- lock.wait(1000);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- }
- }
- finally
- {
- if(_conn.closedForInput() && _conn.closedForOutput())
- {
- try
- {
- s.close();
- }
- catch (IOException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- }
- });
-*/
-
_conn.open();
}
@@ -394,7 +309,7 @@ public class Connection }
catch (IOException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ e.printStackTrace();
}
}
@@ -419,7 +334,7 @@ public class Connection {
int read;
boolean done = false;
- while(!done && (read = inputStream.read(buf)) != -1)
+ while(!handler.isDone() && (read = inputStream.read(buf)) != -1)
{
ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read);
Binary b = new Binary(buf,0,read);
@@ -428,12 +343,6 @@ public class Connection {
RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString());
}
- /*System.err.println(b);
- System.err.println("XXX: " + bbuf.hasRemaining() + "; " + handler.isDone());
- if(handler.isDone())
- {
- System.err.println(handler.getClass().getName() + "IS DONE!");
- } */
while(bbuf.hasRemaining() && !handler.isDone())
{
handler.parse(bbuf);
@@ -444,7 +353,7 @@ public class Connection }
catch (IOException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ e.printStackTrace();
}
}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java deleted file mode 100644 index b58ce6bfe5..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java +++ /dev/null @@ -1,407 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class Demo extends Util
-{
- private static final String USAGE_STRING = "demo [options] <vendor> [<content> ...]\n\nOptions:";
- private static final String OPCODE = "opcode";
- private static final String ACTION = "action";
- private static final String MESSAGE_ID = "message-id";
- private static final String VENDOR = "vendor";
- private static final String LOG = "log";
- private static final String RECEIVED = "received";
- private static final String TEST = "test";
- private static final String APACHE = "apache";
- private static final String SENT = "sent";
- private static final String LINK_REF = "link-ref";
- private static final String HOST = "host";
- private static final String PORT = "port";
- private static final String SASL_USER = "sasl-user";
- private static final String SASL_PASSWORD = "sasl-password";
- private static final String ROLE = "role";
- private static final String ADDRESS = "address";
- private static final String SENDER = "sender";
- private static final String SEND_MESSAGE = "send-message";
- private static final String ANNOUNCE = "announce";
- private static final String MESSAGE_VENDOR = "message-vendor";
- private static final String CREATE_LINK = "create-link";
-
- public static void main(String[] args)
- {
- new Demo(args).run();
- }
-
- public Demo(String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return false;
- }
-
- public void run()
- {
-
- try
- {
-
- final String vendor = getArgs()[0];
- final String queue = "control";
-
- String message = "";
-
- Connection conn = newConnection();
- Session session = conn.createSession();
-
-
- Receiver responseReceiver;
-
- responseReceiver = session.createTemporaryQueueReceiver();
-
-
-
-
- responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
-
-
-
- Sender s = session.createSender(queue, getWindowSize(), getMode());
-
-
- Properties properties = new Properties();
- properties.setMessageId(java.util.UUID.randomUUID());
- properties.setReplyTo(responseReceiver.getAddress());
-
- HashMap appPropMap = new HashMap();
- ApplicationProperties appProperties = new ApplicationProperties(appPropMap);
-
- appPropMap.put(OPCODE, ANNOUNCE);
- appPropMap.put(VENDOR, vendor);
- appPropMap.put(ADDRESS,responseReceiver.getAddress());
-
- AmqpValue amqpValue = new AmqpValue(message);
- Section[] sections = { properties, appProperties, amqpValue};
- final Message message1 = new Message(Arrays.asList(sections));
-
- s.send(message1);
-
- Map<Object, Sender> sendingLinks = new HashMap<Object, Sender>();
- Map<Object, Receiver> receivingLinks = new HashMap<Object, Receiver>();
-
-
- boolean done = false;
-
- while(!done)
- {
- boolean wait = true;
- Message m = responseReceiver.receive(false);
- if(m != null)
- {
- List<Section> payload = m.getPayload();
- wait = false;
- ApplicationProperties props = m.getApplicationProperties();
- Map map = props.getValue();
- String op = (String) map.get(OPCODE);
- if("reset".equals(op))
- {
- for(Sender sender : sendingLinks.values())
- {
- try
- {
- sender.close();
- Session session1 = sender.getSession();
- session1.close();
- session1.getConnection().close();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- for(Receiver receiver : receivingLinks.values())
- {
- try
- {
- receiver.close();
- receiver.getSession().close();
- receiver.getSession().getConnection().close();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- sendingLinks.clear();
- receivingLinks.clear();
- }
- else if(CREATE_LINK.equals(op))
- {
- Object linkRef = map.get(LINK_REF);
- String host = (String) map.get(HOST);
- Object o = map.get(PORT);
- int port = Integer.parseInt(String.valueOf(o));
- String user = (String) map.get(SASL_USER);
- String password = (String) map.get(SASL_PASSWORD);
- String role = (String) map.get(ROLE);
- String address = (String) map.get(ADDRESS);
- System.err.println("Host: " + host + "\tPort: " + port + "\t user: " + user +"\t password: " + password);
- try{
-
-
- Connection conn2 = new Connection(host, port, user, password, host);
- Session session2 = conn2.createSession();
- if(sendingLinks.containsKey(linkRef))
- {
- try
- {
- sendingLinks.remove(linkRef).close();
- }
- catch (Exception e)
- {
-
- }
- }
- if(receivingLinks.containsKey(linkRef))
- {
- try
- {
- receivingLinks.remove(linkRef).close();
- }
- catch (Exception e)
- {
-
- }
- }
- if(SENDER.equals(role))
- {
-
- System.err.println("%%% Creating sender (" + linkRef + ")");
- Sender sender = session2.createSender(address);
- sendingLinks.put(linkRef, sender);
- }
- else
- {
-
- System.err.println("%%% Creating receiver (" + linkRef + ")");
- Receiver receiver2 = session2.createReceiver(address);
- receiver2.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
-
- receivingLinks.put(linkRef, receiver2);
- }
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- else if(SEND_MESSAGE.equals(op))
- {
- Sender sender = sendingLinks.get(map.get(LINK_REF));
- Properties m2props = new Properties();
- Object messageId = map.get(MESSAGE_ID);
- m2props.setMessageId(messageId);
- Map m2propmap = new HashMap();
- m2propmap.put(OPCODE, TEST);
- m2propmap.put(VENDOR, vendor);
- ApplicationProperties m2appProps = new ApplicationProperties(m2propmap);
- Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId)));
- sender.send(m2);
-
- Map m3propmap = new HashMap();
- m3propmap.put(OPCODE, LOG);
- m3propmap.put(ACTION, SENT);
- m3propmap.put(MESSAGE_ID, messageId);
- m3propmap.put(VENDOR, vendor);
- m3propmap.put(MESSAGE_VENDOR, vendor);
-
-
- Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
- new AmqpValue("AMQP-"+messageId)));
- s.send(m3);
-
- }
-
- responseReceiver.acknowledge(m);
- }
- else
- {
- for(Map.Entry<Object, Receiver> entry : receivingLinks.entrySet())
- {
- m = entry.getValue().receive(false);
- if(m != null)
- {
- wait = false;
-
- System.err.println("%%% Received message from " + entry.getKey());
-
- Properties mp = m.getProperties();
- ApplicationProperties ap = m.getApplicationProperties();
-
- Map m3propmap = new HashMap();
- m3propmap.put(OPCODE, LOG);
- m3propmap.put(ACTION, RECEIVED);
- m3propmap.put(MESSAGE_ID, mp.getMessageId());
- m3propmap.put(VENDOR, vendor);
- m3propmap.put(MESSAGE_VENDOR, ap.getValue().get(VENDOR));
-
- Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
- new AmqpValue("AMQP-"+mp.getMessageId())));
- s.send(m3);
-
- entry.getValue().acknowledge(m);
- }
-
- }
- }
-
- if(wait)
- {
- try
- {
- Thread.sleep(500l);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //TODO.
- }
- }
-
- }
-
-
-
-
-
-
-
-
-
- s.close();
- session.close();
- conn.close();
-
- }
- catch (Connection.ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (AmqpErrorException e)
- {
- e.printStackTrace(); //TODO.
- }
-
- }
-
- protected boolean hasSingleLinkPerConnectionMode()
- {
- return false;
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java deleted file mode 100644 index 65d27b21f8..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.commons.cli.Options; - -public class Dump extends Util -{ - private static final String USAGE_STRING = "dump [options] <address>\n\nOptions:"; - - - protected Dump(String[] args) - { - super(args); - } - - public static void main(String[] args) - { - new Dump(args).run(); - } - - @Override - protected boolean hasLinkDurableOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasLinkNameOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasSizeOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasBlockOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasStdInOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasTxnOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasModeOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasCountOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected void printUsage(Options options) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected void run() - { - final String queue = getArgs()[0]; - - try - { - Connection conn = newConnection(); - - Session session = conn.createSession(); - - - Sender s = session.createSender(queue, 10); - - Message message = new Message("dump me"); - message.setDeliveryTag(new Binary("dump".getBytes())); - - s.send(message); - - s.close(); - session.close(); - conn.close(); - - } catch (Connection.ConnectionException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java deleted file mode 100644 index 4d98655ad2..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java +++ /dev/null @@ -1,347 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.*; -import org.apache.qpid.amqp_1_0.type.messaging.*; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; - -import java.io.*; -import java.nio.ByteBuffer; -import java.util.*; - -public class Filereceiver extends Util -{ - private static final String USAGE_STRING = "filereceiver [options] <address> <directory>\n\nOptions:"; - - protected Filereceiver(String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return true; - } - - @Override - protected boolean hasLinkNameOption() - { - return true; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return true; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return false; - } - - @Override - protected boolean hasModeOption() - { - return false; - } - - @Override - protected boolean hasCountOption() - { - return false; - } - - @Override - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - - } - - @Override - protected void run() - { - final String queue = getArgs()[0]; - final String directoryName = getArgs()[1]; - - try - { - Connection conn = newConnection(); - - Session session = conn.createSession(); - - final File directory = new File(directoryName); - if(directory.isDirectory() && directory.canWrite()) - { - File tmpDirectory = new File(directoryName, ".tmp"); - if(!tmpDirectory.exists()) - { - tmpDirectory.mkdir(); - } - - String[] unsettledFiles = tmpDirectory.list(); - - Map<Binary, Outcome> unsettled = new HashMap<Binary, Outcome>(); - final Map<Binary, String> unsettledFileNames = new HashMap<Binary, String>(); - - Accepted accepted = new Accepted(); - - for(String fileName : unsettledFiles) - { - File theFile = new File(tmpDirectory, fileName); - if(theFile.isFile()) - { - if(fileName.startsWith("~") && fileName.endsWith("~")) - { - theFile.delete(); - } - else - { - int splitPoint = fileName.indexOf("."); - String deliveryTagStr = fileName.substring(0,splitPoint); - String actualFileName = fileName.substring(splitPoint+1); - - byte[] bytes = new byte[deliveryTagStr.length()/2]; - - - for(int i = 0; i < bytes.length; i++) - { - char c = deliveryTagStr.charAt(2*i); - char d = deliveryTagStr.charAt(1+(2*i)); - - bytes[i] = (byte) (((c <= '9' ? c - '0' : c - 'W') << 4) - | (d <= '9' ? d - '0' : d - 'W')); - - } - Binary deliveryTag = new Binary(bytes); - unsettled.put(deliveryTag, accepted); - unsettledFileNames.put(deliveryTag, fileName); - } - } - - } - - Receiver r = session.createReceiver(queue, AcknowledgeMode.EO, getLinkName(), isDurableLink(), - unsettled); - - Map<Binary, Outcome> remoteUnsettled = r.getRemoteUnsettled(); - - for(Map.Entry<Binary, String> entry : unsettledFileNames.entrySet()) - { - if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey())) - { - - File tmpFile = new File(tmpDirectory, entry.getValue()); - final File dest = new File(directory, - entry.getValue().substring(entry.getValue().indexOf(".") + 1)); - if(dest.exists()) - { - System.err.println("Duplicate detected - filename " + dest.getName()); - } - - tmpFile.renameTo(dest); - } - } - - - int credit = 10; - - r.setCredit(UnsignedInteger.valueOf(credit), true); - - - int received = 0; - Message m = null; - do - { - m = isBlock() && received == 0 ? r.receive() : r.receive(10000); - if(m != null) - { - if(m.isResume() && unsettled.containsKey(m.getDeliveryTag())) - { - final String tmpFileName = unsettledFileNames.get(m.getDeliveryTag()); - final File unsettledFile = new File(tmpDirectory, - tmpFileName); - r.acknowledge(m, new Receiver.SettledAction() - { - public void onSettled(final Binary deliveryTag) - { - int splitPoint = tmpFileName.indexOf("."); - - String fileName = tmpFileName.substring(splitPoint+1); - - final File dest = new File(directory, fileName); - if(dest.exists()) - { - System.err.println("Duplicate detected - filename " + dest.getName()); - } - unsettledFile.renameTo(dest); - unsettledFileNames.remove(deliveryTag); - } - }); - } - else - { - received++; - List<Section> sections = m.getPayload(); - Binary deliveryTag = m.getDeliveryTag(); - StringBuilder tagNameBuilder = new StringBuilder(); - - ByteBuffer dtbuf = deliveryTag.asByteBuffer(); - while(dtbuf.hasRemaining()) - { - tagNameBuilder.append(String.format("%02x", dtbuf.get())); - } - - - ApplicationProperties properties = null; - List<Binary> data = new ArrayList<Binary>(); - int totalSize = 0; - for(Section section : sections) - { - if(section instanceof ApplicationProperties) - { - properties = (ApplicationProperties) section; - } - else if(section instanceof AmqpValue) - { - AmqpValue value = (AmqpValue) section; - if(value.getValue() instanceof Binary) - { - Binary binary = (Binary) value.getValue(); - data.add(binary); - totalSize += binary.getLength(); - - } - else - { - // TODO exception - } - } - else if(section instanceof Data) - { - Data value = (Data) section; - Binary binary = value.getValue(); - data.add(binary); - totalSize += binary.getLength(); - - } - } - if(properties != null) - { - final String fileName = (String) properties.getValue().get("filename"); - byte[] fileData = new byte[totalSize]; - ByteBuffer buf = ByteBuffer.wrap(fileData); - int offset = 0; - for(Binary bin : data) - { - buf.put(bin.asByteBuffer()); - } - File outputFile = new File(tmpDirectory, "~"+fileName+"~"); - if(outputFile.exists()) - { - outputFile.delete(); - } - FileOutputStream fos = new FileOutputStream(outputFile); - fos.write(fileData); - fos.flush(); - fos.close(); - - final File unsettledFile = new File(tmpDirectory, tagNameBuilder.toString() + "." + - fileName); - outputFile.renameTo(unsettledFile); - r.acknowledge(m, new Receiver.SettledAction() - { - public void onSettled(final Binary deliveryTag) - { - final File dest = new File(directory, fileName); - if(dest.exists()) - { - System.err.println("Duplicate detected - filename " + dest.getName()); - } - unsettledFile.renameTo(dest); - - } - }); - - } - } - } - } - while(m != null); - - - r.close(); - } - else - { - System.err.println("No such directory: " + directoryName); - } - session.close(); - conn.close(); - } - catch (Connection.ConnectionException e) - { - e.printStackTrace(); - } - catch (FileNotFoundException e) - { - e.printStackTrace(); //TODO. - } - catch (IOException e) - { - e.printStackTrace(); //TODO. - } - catch (AmqpErrorException e) - { - e.printStackTrace(); //TODO. - } - - } - - public static void main(String[] args) - { - new Filereceiver(args).run(); - } -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java deleted file mode 100644 index 46e6ba537f..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.messaging.*; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.*; - -public class Filesender extends Util -{ - private static final String USAGE_STRING = "filesender [options] <address> <directory>\n\nOptions:"; - - protected Filesender(String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return true; - } - - @Override - protected boolean hasLinkNameOption() - { - return true; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return false; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return false; - } - - @Override - protected boolean hasModeOption() - { - return false; - } - - @Override - protected boolean hasCountOption() - { - return false; - } - - @Override - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - - } - - @Override - protected void run() - { - final String queue = getArgs()[0]; - final String directoryName = getArgs()[1]; - - try - { - MessageDigest md5 = MessageDigest.getInstance("MD5"); - Connection conn = newConnection(); - - Session session = conn.createSession(); - - File directory = new File(directoryName); - if(directory.isDirectory() && directory.canWrite()) - { - - File tmpDirectory = new File(directoryName, ".tmp"); - if(!tmpDirectory.exists()) - { - tmpDirectory.mkdir(); - } - - String[] unsettledFiles = tmpDirectory.list(); - - - - Map<Binary, Outcome> unsettled = new HashMap<Binary, Outcome>(); - Map<Binary, String> unsettledFileNames = new HashMap<Binary, String>(); - for(String fileName : unsettledFiles) - { - File aFile = new File(tmpDirectory, fileName); - if(aFile.canRead() && aFile.canWrite()) - { - Binary deliveryTag = new Binary(md5.digest(fileName.getBytes())); - unsettled.put(deliveryTag, null); - unsettledFileNames.put(deliveryTag, fileName); - } - } - - - Sender s = session.createSender(queue, 10, AcknowledgeMode.EO, getLinkName(), isDurableLink(), - unsettled); - - Map<Binary, DeliveryState> remoteUnsettled = s.getRemoteUnsettled(); - - for(Map.Entry<Binary, String> entry: unsettledFileNames.entrySet()) - { - if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey())) - { - (new File(tmpDirectory, entry.getValue())).renameTo(new File(directory, entry.getValue())); - } - } - - if(remoteUnsettled != null) - { - for(Map.Entry<Binary, DeliveryState> entry : remoteUnsettled.entrySet()) - { - if(entry.getValue() instanceof Accepted) - { - final String fileName = unsettledFileNames.get(entry.getKey()); - if(fileName != null) - { - - Message resumed = new Message(); - resumed.setDeliveryTag(entry.getKey()); - resumed.setDeliveryState(entry.getValue()); - resumed.setResume(Boolean.TRUE); - resumed.setSettled(Boolean.TRUE); - - - - final File unsettledFile = new File(tmpDirectory, fileName); - unsettledFile.delete(); - - s.send(resumed); - - } - - } - else if(entry.getValue() instanceof Received || entry.getValue() == null) - { - final File unsettledFile = new File(tmpDirectory, unsettledFileNames.get(entry.getKey())); - Message resumed = createMessageFromFile(md5, unsettledFileNames.get(entry.getKey()), unsettledFile); - resumed.setResume(Boolean.TRUE); - Sender.OutcomeAction action = new Sender.OutcomeAction() - { - public void onOutcome(Binary deliveryTag, Outcome outcome) - { - if(outcome instanceof Accepted) - { - unsettledFile.delete(); - } - } - }; - s.send(resumed, action); - - } - } - } - - - - String[] files = directory.list(); - - for(String fileName : files) - { - final File file = new File(directory, fileName); - - if(file.canRead() && file.canWrite() && !file.isDirectory()) - { - Message message = createMessageFromFile(md5, fileName, file); - - final File unsettledFile = new File(tmpDirectory, fileName); - - Sender.OutcomeAction action = new Sender.OutcomeAction() - { - public void onOutcome(Binary deliveryTag, Outcome outcome) - { - if(outcome instanceof Accepted) - { - unsettledFile.delete(); - } - } - }; - - file.renameTo(unsettledFile); - - s.send(message, action); - } - } - - s.close(); - } - else - { - System.err.println("No such directory: " + directory); - } - session.close(); - conn.close(); - } - catch (Connection.ConnectionException e) - { - e.printStackTrace(); - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); - } catch (FileNotFoundException e) - { - e.printStackTrace(); - } catch (IOException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (NoSuchAlgorithmException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - - } - - private Message createMessageFromFile(MessageDigest md5, String fileName, File file) throws IOException - { - FileInputStream fis = new FileInputStream(file); - byte[] data = new byte[(int) file.length()]; - - int read = fis.read(data); - - fis.close(); - - Section applicationProperties = new ApplicationProperties(Collections.singletonMap("filename", fileName)); - Section amqpValue = new Data(new Binary(data)); - Message message = new Message(Arrays.asList(applicationProperties, amqpValue)); - Binary deliveryTag = new Binary(md5.digest(fileName.getBytes())); - message.setDeliveryTag(deliveryTag); - md5.reset(); - return message; - } - - public static void main(String[] args) - { - new Filesender(args).run(); - } -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java deleted file mode 100644 index 07ae54b54f..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java +++ /dev/null @@ -1,77 +0,0 @@ -/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.codec.ValueHandler;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-public class ReadBytes
-{
-
- public static void main(String[] args) throws IOException, AmqpErrorException
- {
-
- if(args.length == 0)
- {
- readBytes(System.in);
- }
- else
- {
- for(String fileName : args)
- {
- System.out.println("=========================== " + fileName + " ===========================");
- final FileInputStream fis = new FileInputStream(fileName);
- readBytes(fis);
- fis.close();
- }
- }
-
- }
-
- private static void readBytes(final InputStream inputStream) throws IOException, AmqpErrorException
- {
- byte[] bytes = new byte[4096];
-
- ValueHandler valueHandler = new ValueHandler(AMQPDescribedTypeRegistry.newInstance());
-
- int count;
-
- while((count = inputStream.read(bytes))!=-1)
- {
- ByteBuffer buf = ByteBuffer.wrap(bytes);
- buf.limit(count);
- while(buf.hasRemaining())
- {
-
- final Object value = valueHandler.parse(buf);
- System.out.print((value == null ? "" : value.getClass().getName() + ":") +value +"\n");
-
- }
- }
-
- }
-
-
-}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java deleted file mode 100644 index 0da9dc3fb7..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.commons.cli.*; -import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter; -import org.apache.qpid.amqp_1_0.type.messaging.Filter; -import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter; - -import java.util.Collections; - -public class Receive extends Util -{ - private static final String USAGE_STRING = "receive [options] <address> \n\nOptions:"; - private static final UnsignedLong UNSIGNED_LONG_ONE = UnsignedLong.valueOf(1L); - private UnsignedLong _lastCorrelationId; - - public static void main(String[] args) - { - new Receive(args).run(); - } - - - public Receive(final String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return true; - } - - @Override - protected boolean hasLinkNameOption() - { - return true; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return true; - } - - @Override - protected boolean hasStdInOption() - { - return true; - } - - @Override - protected boolean hasTxnOption() - { - return true; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return true; - } - - @Override - protected boolean hasWindowSizeOption() - { - return true; - } - - @Override - protected boolean hasFilterOption() - { - return true; - } - - protected void run() - { - - try - { - final String queue = getArgs()[0]; - - String message = ""; - - Connection conn = newConnection(); - - - Session session = conn.createSession(); - - Filter filter = null; - if(getFilter() != null) - { - String[] filterParts = getFilter().split("=",2); - if("exact-subject".equals(filterParts[0])) - { - filter = new ExactSubjectFilter(filterParts[1]); - } - else if("matching-subject".equals(filterParts[0])) - { - filter = new MatchingSubjectFilter(filterParts[1]); - } - else - { - System.err.println("Unknown filter type: " + filterParts[0]); - } - } - - Receiver r = - filter == null - ? session.createReceiver(queue, getMode(), getLinkName(), isDurableLink()) - : session.createReceiver(queue, getMode(), getLinkName(), isDurableLink(), Collections.singletonMap(Symbol.valueOf("filter"), filter), null); - Transaction txn = null; - - int credit = 0; - int receivedCount = 0; - - if(!useStdIn()) - { - if(getArgs().length <= 2) - { - - Transaction txn2 = null; - if(useTran()) - { - txn = session.createSessionLocalTransaction(); - txn2 = session.createSessionLocalTransaction(); - } - - for(int i = 0; i < getCount(); i++) - { - - if(credit == 0) - { - if(getCount() - i <= getWindowSize()) - { - credit = getCount() - i; - - } - else - { - credit = getWindowSize(); - - } - - { - r.setCredit(UnsignedInteger.valueOf(credit), false); - } - if(!isBlock()) - r.drain(); - } - - Message m = isBlock() ? r.receive() : r.receive(1000L); - credit--; - if(m==null) - { - break; - } - - - - r.acknowledge(m.getDeliveryTag(),txn); - - receivedCount++; - - System.out.println("Received Message : " + m.getPayload()); - } - - if(useTran()) - { - txn.commit(); - } - } - else - { - // TODO - } - } - else - { - // TODO - } - r.close(); - session.close(); - conn.close(); - System.out.println("Total Messages Received: " + receivedCount); - } - catch (Connection.ConnectionException e) - { - e.printStackTrace(); //TODO. - } - catch (AmqpErrorException e) - { - e.printStackTrace(); //TODO. - } - - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index ad390fd498..8b792db1f1 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -241,7 +241,7 @@ public class Receiver implements DeliveryStateHandler }
if(hasMore)
{
- xfr = receiveFromPrefetch(0L);
+ xfr = receiveFromPrefetch(-1l);
if(xfr== null)
{
// TODO - this is wrong!!!!
@@ -503,6 +503,37 @@ public class Receiver implements DeliveryStateHandler _endpoint.drain();
}
+ /**
+ * Waits for the receiver to drain or a message to be available to be received.
+ * @return true if the receiver has been drained.
+ */
+ public boolean drainWait()
+ {
+ final Object lock = _endpoint.getLock();
+ synchronized(lock)
+ {
+ try
+ {
+ while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() )
+ {
+ lock.wait();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ return _prefetchQueue.peek()==null && _endpoint.isDrained();
+ }
+
+ /**
+ * Clears the receiver drain so that message delivery can resume.
+ */
+ public void clearDrain()
+ {
+ _endpoint.clearDrain();
+ }
+
public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn)
{
_endpoint.setLinkCredit(credit);
@@ -558,4 +589,4 @@ public class Receiver implements DeliveryStateHandler void messageArrived(Receiver receiver);
}
-}
\ No newline at end of file +}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java deleted file mode 100644 index 6e1d15376c..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java +++ /dev/null @@ -1,249 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.commons.cli.*;
-
-import java.util.Arrays;
-
-public class Request extends Util
-{
- private static final String USAGE_STRING = "request [options] <address> [<content> ...]\n\nOptions:";
-
- public static void main(String[] args)
- {
- new Request(args).run();
- }
-
- public Request(String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return true;
- }
-
- public void run()
- {
-
- try
- {
-
-
- final String queue = getArgs()[0];
-
- String message = "";
-
- Connection conn = newConnection();
- Session session = conn.createSession();
-
- Connection conn2;
- Session session2;
- Receiver responseReceiver;
-
- if(isUseMultipleConnections())
- {
- conn2 = newConnection();
- session2 = conn2.createSession();
- responseReceiver = session2.createTemporaryQueueReceiver();
- }
- else
- {
- conn2 = null;
- session2 = null;
- responseReceiver = session.createTemporaryQueueReceiver();
- }
-
-
-
-
- responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
-
-
-
- Sender s = session.createSender(queue, getWindowSize(), getMode());
-
- Transaction txn = null;
-
- if(useTran())
- {
- txn = session.createSessionLocalTransaction();
- }
-
- int received = 0;
-
- if(getArgs().length >= 2)
- {
- message = getArgs()[1];
- if(message.length() < getMessageSize())
- {
- StringBuilder builder = new StringBuilder(getMessageSize());
- builder.append(message);
- for(int x = message.length(); x < getMessageSize(); x++)
- {
- builder.append('.');
- }
- message = builder.toString();
- }
-
- for(int i = 0; i < getCount(); i++)
- {
- Properties properties = new Properties();
- properties.setMessageId(UnsignedLong.valueOf(i));
- properties.setReplyTo(responseReceiver.getAddress());
-
- AmqpValue amqpValue = new AmqpValue(message);
- Section[] sections = { new Header() , properties, amqpValue};
- final Message message1 = new Message(Arrays.asList(sections));
-
- s.send(message1, txn);
-
- Message responseMessage = responseReceiver.receive(false);
- if(responseMessage != null)
- {
- responseReceiver.acknowledge(responseMessage.getDeliveryTag(),txn);
- received++;
- }
- }
- }
-
- if(txn != null)
- {
- txn.commit();
- }
-
-
- while(received < getCount())
- {
- Message responseMessage = responseReceiver.receive();
- responseReceiver.acknowledge(responseMessage.getDeliveryTag());
- received++;
- }
-
-
-
-
- s.close();
- session.close();
- conn.close();
-
- if(session2 != null)
- {
- session2.close();
- conn2.close();
- }
- }
- catch (Connection.ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (AmqpErrorException e)
- {
- e.printStackTrace(); //TODO.
- }
-
- }
-
- protected boolean hasSingleLinkPerConnectionMode()
- {
- return true;
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java deleted file mode 100644 index 8d9de4893f..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java +++ /dev/null @@ -1,347 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import org.apache.commons.cli.*; - -import java.util.*; - -public class Respond extends Util -{ - private static final String USAGE_STRING = "respond [options] <address>\n\nOptions:"; - private Connection _conn; - private Session _session; - private Receiver _receiver; - private Transaction _txn; - private Map<String,Sender> _senders; - private UnsignedLong _responseMsgId = UnsignedLong.ZERO; - private Connection _conn2; - private Session _session2; - - public Respond(final String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return false; - } - - @Override - protected boolean hasLinkNameOption() - { - return false; - } - - @Override - protected boolean hasResponseQueueOption() - { - return true; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return true; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return true; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return true; - } - - @Override - protected boolean hasSingleLinkPerConnectionMode() - { - return true; - } - - - @Override - protected boolean hasWindowSizeOption() - { - return true; - } - - public static void main(String[] args) - { - new Respond(args).run(); - } - - public void run() - { - try - { - - _senders = new HashMap<String, Sender>(); - - final String queue = getArgs()[0]; - - String message = ""; - - _conn = newConnection(); - - - - if(isUseMultipleConnections()) - { - _conn2 = newConnection(); - _session2 = _conn2.createSession(); - } - - - _session = _conn.createSession(); - - - _receiver = _session.createReceiver(queue, getMode()); - _txn = null; - - int credit = 0; - int receivedCount = 0; - _responseMsgId = UnsignedLong.ZERO; - - Random random = null; - int batch = 0; - List<Message> txnMessages = null; - if(useTran()) - { - if(getRollbackRatio() != 0) - { - random = new Random(); - } - batch = getBatchSize(); - _txn = _session.createSessionLocalTransaction(); - txnMessages = new ArrayList<Message>(batch); - } - - - for(int i = 0; receivedCount < getCount(); i++) - { - - if(credit == 0) - { - if(getCount() - i <= getWindowSize()) - { - credit = getCount() - i; - - } - else - { - credit = getWindowSize(); - - } - - _receiver.setCredit(UnsignedInteger.valueOf(credit), false); - - if(!isBlock()) - _receiver.drain(); - } - - Message m = isBlock() ? (receivedCount == 0 ? _receiver.receive() : _receiver.receive(10000L)) : _receiver.receive(1000L); - credit--; - if(m==null) - { - if(useTran() && batch != getBatchSize()) - { - _txn.commit(); - } - break; - } - - System.out.println("Received Message: " + m.getPayload()); - - respond(m); - - - - if(useTran()) - { - - txnMessages.add(m); - - if(--batch == 0) - { - - if(getRollbackRatio() == 0 || random.nextDouble() >= getRollbackRatio()) - { - _txn.commit(); - txnMessages.clear(); - receivedCount += getBatchSize(); - } - else - { - System.out.println("Random Rollback"); - _txn.rollback(); - double result; - do - { - _txn = _session.createSessionLocalTransaction(); - - for(Message msg : txnMessages) - { - respond(msg); - } - - result = random.nextDouble(); - if(result<getRollbackRatio()) - { - _txn.rollback(); - } - else - { - _txn.commit(); - txnMessages.clear(); - receivedCount += getBatchSize(); - } - } - while(result < getRollbackRatio()); - } - _txn = _session.createSessionLocalTransaction(); - - batch = getBatchSize(); - } - } - else - { - receivedCount++; - } - - } - - - for(Sender s : _senders.values()) - { - s.close(); - } - - _receiver.close(); - _session.close(); - _conn.close(); - System.out.println("Received: " + receivedCount); - } - catch (Connection.ConnectionException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //TODO. - } - catch (AmqpErrorException e) - { - e.printStackTrace(); //TODO. - } - } - - private void respond(Message m) throws Sender.SenderCreationException - { - List<Section> sections = m.getPayload(); - String replyTo = null; - Object correlationId = null; - for(Section section : sections) - { - if(section instanceof Properties) - { - replyTo = getResponseQueue() == null ? ((Properties)section).getReplyTo() : getResponseQueue(); - correlationId = ((Properties) section).getMessageId(); - break; - } - } - - if(replyTo != null) - { - Sender s = _senders.get(replyTo); - if(s == null) - { - s = (isUseMultipleConnections() ? _session2 : _session).createSender(replyTo,getWindowSize()); - _senders.put(replyTo, s); - } - - List<Section> replySections = new ArrayList<Section>(sections); - - ListIterator<Section> sectionIterator = replySections.listIterator(); - - while(sectionIterator.hasNext()) - { - Section section = sectionIterator.next(); - if(section instanceof Properties) - { - Properties newProps = new Properties(); - newProps.setTo(replyTo); - newProps.setCorrelationId(correlationId); - newProps.setMessageId(_responseMsgId); - _responseMsgId = _responseMsgId.add(UnsignedLong.ONE); - sectionIterator.set(newProps); - } - } - - Message replyMessage = new Message(replySections); - System.out.println("Sent Message: " + replySections); - s.send(replyMessage, _txn); - - } - _receiver.acknowledge(m.getDeliveryTag(), _txn); - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java deleted file mode 100644 index 6f6575e083..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java +++ /dev/null @@ -1,244 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.LineNumberReader;
-import java.util.Arrays;
-
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.commons.cli.*;
-
-public class Send extends Util
-{
- private static final String USAGE_STRING = "send [options] <address> [<content> ...]\n\nOptions:";
- private static final char[] HEX = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'};
-
-
- public static void main(String[] args) throws Sender.SenderCreationException, Sender.SenderClosingException, Connection.ConnectionException
- {
- new Send(args).run();
- }
-
-
- public Send(final String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasSubjectOption()
- {
- return true;
- }
-
- public void run()
- {
-
- final String queue = getArgs()[0];
-
- String message = "";
-
- try
- {
- Connection conn = newConnection();
-
- Session session = conn.createSession();
-
-
- Sender s = session.createSender(queue, getWindowSize(), getMode(), getLinkName());
-
- Transaction txn = null;
-
- if(useTran())
- {
- txn = session.createSessionLocalTransaction();
- }
-
- if(!useStdIn())
- {
- if(getArgs().length <= 2)
- {
- if(getArgs().length == 2)
- {
- message = getArgs()[1];
- }
- for(int i = 0; i < getCount(); i++)
- {
-
- Properties properties = new Properties();
- properties.setMessageId(UnsignedLong.valueOf(i));
- if(getSubject() != null)
- {
- properties.setSubject(getSubject());
- }
- Section bodySection;
- byte[] bytes = (message + " " + i).getBytes();
- if(bytes.length < getMessageSize())
- {
- byte[] origBytes = bytes;
- bytes = new byte[getMessageSize()];
- System.arraycopy(origBytes,0,bytes,0,origBytes.length);
- for(int x = origBytes.length; x < bytes.length; x++)
- {
- bytes[x] = (byte) '.';
- }
- bodySection = new Data(new Binary(bytes));
- }
- else
- {
- bodySection = new AmqpValue(message + " " + i);
- }
-
- Section[] sections = {properties, bodySection};
- final Message message1 = new Message(Arrays.asList(sections));
-
- s.send(message1, txn);
- }
- }
- else
- {
- for(int i = 1; i < getArgs().length; i++)
- {
- s.send(new Message(getArgs()[i]), txn);
- }
-
- }
- }
- else
- {
- LineNumberReader buf = new LineNumberReader(new InputStreamReader(System.in));
-
-
- try
- {
- while((message = buf.readLine()) != null)
- {
- s.send(new Message(message), txn);
- }
- }
- catch (IOException e)
- {
- // TODO
- e.printStackTrace();
- }
- }
-
- if(txn != null)
- {
- txn.commit();
- }
-
- s.close();
-
- session.close();
- conn.close();
- }
- catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Connection.ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //TODO.
- }
-
-
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SendBytes.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SendBytes.java deleted file mode 100644 index 6f97ecd810..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SendBytes.java +++ /dev/null @@ -1,331 +0,0 @@ -/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.codec.FrameWriter;
-import org.apache.qpid.amqp_1_0.codec.ValueWriter;
-import org.apache.qpid.amqp_1_0.framing.AMQFrame;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.FrameBody;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.UnsignedByte;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-import org.apache.qpid.amqp_1_0.type.UnsignedShort;
-import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.amqp_1_0.type.messaging.Footer;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.qpid.amqp_1_0.type.transport.Flow;
-
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-
-public class SendBytes
-{
-
- public static void main(String[] args) throws
- Sender.SenderCreationException,
- Sender.SenderClosingException,
- Connection.ConnectionException,
- IOException, ParseException
- {
- Transfer xfr = new Transfer();
- Flow fs = new Flow();
- fs.setIncomingWindow(UnsignedInteger.valueOf(1024));
- fs.setDeliveryCount(UnsignedInteger.valueOf(2));
- fs.setLinkCredit(UnsignedInteger.valueOf(18));
- fs.setAvailable(UnsignedInteger.valueOf(0));
- fs.setDrain(false);
-
- xfr.setHandle(UnsignedInteger.valueOf(0));
- xfr.setDeliveryTag(new Binary("\"queue\"<-6ec024a7-d98e-4196-9348-15f6026c32ca:0".getBytes()));
- //xfr.setDeliveryTag(new Binary(new byte[] {0}));
- xfr.setDeliveryId(UnsignedInteger.valueOf(0));
- xfr.setSettled(true);
-
-
- Header h = new Header();
- Properties p = new Properties();
- p.setTo("queue");
- //p.setMessageId(new Binary(UUID.randomUUID().toString().getBytes()));
-
- Footer f = new Footer(Collections.EMPTY_MAP);
-
- Section[] sections = new Section[] { h,p,f};
- //Section[] sections = new Section[] { b };
- //Section[] sections = { h,p, b};
-/*
- Fragment[] fragments = new Fragment[5];
-
- final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance().registerTransportLayer().registerMessagingLayer();
-
- SectionEncoderImpl encoder = new SectionEncoderImpl(typeRegistry);
-
- int num = 0;
- int i = 0;
- for(Section s : sections)
- {
- Fragment frag = new Fragment();
-
- frag.setPayload(s.encode(encoder));
- frag.setFirst(true);
- frag.setLast(true);
- frag.setSectionCode(s.getSectionCode());
- frag.setSectionNumber(UnsignedInteger.valueOf(num++));
- frag.setSectionOffset(UnsignedLong.valueOf(0L));
- fragments[i++] =frag;
- }
-
- xfr.setFragments(fragments);
-*/
-
- encodeTypes("xfr",xfr);
-
- final byte[] result;
- final Object input = xfr;
-/*
- result = encode(1024, input);
-
- boolean ok = true;
-
- for(int j = 10; ok && j < 400; j++)
- {
-
- byte[] result2 = encode(j,input);
-
- for(int i = 0; i <400; i++)
- {
- if(result[i] != result2[i])
- {
- System.out.println("result differs at " + i + " Splitting at " + j+ " [" + result[i] + " - " + result2[i] + "]");
- //break;
- //ok = false;
-
- }
- }
- }*/
- //System.out.println(Arrays.equals(result, result2));
-
- //doEncodes();
- /*OutputStream out = System.out;
- if(args.length > 0)
- {
- out = new FileOutputStream(args[0]);
- }
-
- Transfer xfr = new Transfer();
- fs.setSessionCredit(UnsignedInteger.valueOf(1024));
- fs.setTransferCount(UnsignedInteger.valueOf(2));
- fs.setLinkCredit(UnsignedInteger.valueOf(18));
- fs.setAvailable(UnsignedInteger.valueOf(0));
- fs.setDrain(false);
-
- xfr.setHandle(UnsignedInteger.valueOf(0));
- //xfr.setDeliveryTag(new Binary("\"queue\"<-6ec024a7-d98e-4196-9348-15f6026c32ca:0".getBytes()));
- xfr.setDeliveryTag(new Binary(new byte[] {0}));
- xfr.setTransferId(UnsignedInteger.valueOf(0));
- xfr.setSettled(true);
- xfr.setFlowState(fs);
-
- Header h = new Header();
- h.setTransmitTime(new Date(System.currentTimeMillis()));
- Properties p = new Properties();
- p.setTo(new Address("queue"));
- //p.setMessageId(new Binary(UUID.randomUUID().toString().getBytes()));
- AmqpMapSection m = new AmqpMapSection();
- DataSection b = new DataSection("Hello World!".getBytes());
-
- Footer f = new Footer();
-
- Section[] sections = new Section[] { h,p,m,b,f};
- //Section[] sections = new Section[] { b };
- //Section[] sections = { h,p, b};
- List<Fragment> fragments = new ArrayList<Fragment>(5);
-
- final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance();
-
- SectionEncoderImpl encoder = new SectionEncoderImpl(typeRegistry);
-
- for(Section s : sections)
- {
- Fragment frag = new Fragment();
-
- frag.setPayload(s.encode(encoder));
- frag.setFirst(true);
- frag.setLast(true);
- frag.setFormatCode(s.getSectionCode());
- frag.setFragmentOffset(null);
- fragments.add(frag);
- }
-
- xfr.setFragments(fragments);
-
-
- Object[] objectsToWrite = new Object[] { xfr };
- ByteBuffer buf = ByteBuffer.allocate(4096);
-
-
- for(Object obj : objectsToWrite)
- {
- ValueWriter writer = typeRegistry.getValueWriter(obj);
-
- int count;
-
-
- do
- {
- count = writer.writeToBuffer(buf);
- out.write(buf.array(), buf.arrayOffset(), count);
- buf.clear();
- } while (!writer.isComplete());
-
- }
-
- out.flush();
- out.close();*/
-
- }
-
- public static void doEncodes() throws IOException, ParseException
- {
- encodeTypes("boolean", Boolean.TRUE, Boolean.FALSE);
- encodeTypes("ubyte", UnsignedByte.valueOf((byte)0), UnsignedByte.valueOf((byte)1 ),UnsignedByte.valueOf((byte)3), UnsignedByte.valueOf((byte)42), UnsignedByte.valueOf("255"));
- encodeTypes("byte", Byte.valueOf((byte)0), Byte.valueOf( (byte)1), Byte.valueOf((byte) 3), Byte.valueOf((byte) 42), Byte.valueOf((byte) 127), Byte.valueOf((byte) -1), Byte.valueOf((byte) -3), Byte.valueOf((byte) -42), Byte.valueOf( (byte)-128));
- encodeTypes("ushort", UnsignedShort.valueOf((short)0), UnsignedShort.valueOf((short)1), UnsignedShort.valueOf((short)3), UnsignedShort.valueOf((short)42), UnsignedShort.valueOf("65535"));
- encodeTypes("short", Short.valueOf((short)0), Short.valueOf((short)1), Short.valueOf((short)3), Short.valueOf((short)42), Short.valueOf((short)32767), Short.valueOf((short)-1), Short.valueOf((short)-3), Short.valueOf((short)-42), Short.valueOf((short)-32768));
- encodeTypes("uint",UnsignedInteger.valueOf(0), UnsignedInteger.valueOf(1), UnsignedInteger.valueOf(3), UnsignedInteger.valueOf(42), UnsignedInteger.valueOf("4294967295"));
- encodeTypes("int", 0, 1, 3, 42, 2147483647, -1, -3, -42, -2147483648);
- encodeTypes("ulong", UnsignedLong.valueOf(0), UnsignedLong.valueOf(1), UnsignedLong.valueOf(3), UnsignedLong.valueOf(42), UnsignedLong.valueOf("18446744073709551615"));
- encodeTypes("long", 0l, 1l, 3l, 42l, 9223372036854775807l, -1l, -3l, -42l, -9223372036854775808l);
- encodeTypes("float", 3.14159);
- encodeTypes("double", Double.valueOf(3.14159265359));
- encodeTypes("char", '?');
-
- SimpleDateFormat df = new SimpleDateFormat("HHa z MMM d yyyy");
-
- encodeTypes("timestamp", df.parse("9AM PST Dec 6 2010"), df.parse("9AM PST Dec 6 1910"));
- encodeTypes("uuid", UUID.fromString("f275ea5e-0c57-4ad7-b11a-b20c563d3b71"));
- encodeTypes("binary", new Binary( new byte[] {(byte)0xDE, (byte)0xAD, (byte)0xBE, (byte)0xEF}), new Binary(new byte[] { (byte)0xCA,(byte)0xFE, (byte)0xBA, (byte)0xBE}));
- encodeTypes("string", "The quick brown fox jumped over the lazy cow.");
- encodeTypes("symbol", Symbol.valueOf("connectathon"));
- encodeTypes("list", Arrays.asList(new Object[] {Long.valueOf(1), "two", Double.valueOf(3.14159265359), null, Boolean.FALSE}));
- Map map = new HashMap();
- map.put("one", Long.valueOf(1));
- map.put("two", Long.valueOf(2));
- map.put("pi", Double.valueOf(3.14159265359));
- map.put("list:", Arrays.asList(new Object[] {Long.valueOf(1), "two", Double.valueOf(3.14159265359), null, Boolean.FALSE}));
- map.put(null, Boolean.TRUE);
- encodeTypes("map", map);
- encodeTypes("null", null);
-
- }
-
- static void encodeTypes(String name, Object... vals ) throws IOException
- {
- FileOutputStream out = new FileOutputStream("/home/rob/"+name+".out");
- ByteBuffer buf = ByteBuffer.allocate(4096);
- final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance();
-
- if(vals != null)
- {
- for(Object obj : vals)
- {
- ValueWriter writer = typeRegistry.getValueWriter(obj);
-
- int count;
-
-
- do
- {
- count = writer.writeToBuffer(buf);
- out.write(buf.array(), buf.arrayOffset(), count);
- buf.clear();
- } while (!writer.isComplete());
-
- }
- }
- else
- {
- ValueWriter writer = typeRegistry.getValueWriter(null);
-
- int count;
-
-
- do
- {
- count = writer.writeToBuffer(buf);
- out.write(buf.array(), buf.arrayOffset(), count);
- buf.clear();
- } while (!writer.isComplete());
-
- }
- out.flush();
- out.close();
-
- }
-
- static byte[] encode(int size, Object... vals)
- {
- byte[] result = new byte[10000];
- int pos = 0;
-
- final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance();
- AMQFrame frame = AMQFrame.createAMQFrame((short) 0, (FrameBody) vals[0]);
- FrameWriter writer = new FrameWriter(typeRegistry);
- /*for(Object obj : vals)
- {
- final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance();
- ValueWriter writer = typeRegistry.getValueWriter(obj);
-*/
- int count;
-
- ByteBuffer buf = ByteBuffer.wrap(result, pos, size);
-
- do
- {
-
- writer.writeToBuffer(buf);
- pos = buf.position();
- buf = ByteBuffer.wrap(result, pos, size);
- if(!writer.isComplete())
- {
- count = 3;
- }
-
- } while (!writer.isComplete());
-/*
-
- }
-*/
-
- return result;
-
- }
-
-
-}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java deleted file mode 100644 index 6fe2a6d510..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java +++ /dev/null @@ -1,529 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.transport.Container; -import org.apache.commons.cli.*; - -import java.util.logging.*; - -public abstract class Util -{ - - private static final Logger FRAME_LOGGER = Logger.getLogger("FRM"); - private static final Logger RAW_LOGGER = Logger.getLogger("RAW"); - private String _host; - private String _username; - private String _password; - private int _port; - private int _count; - private boolean _useStdIn; - private boolean _useTran; - private String[] _args; - private AcknowledgeMode _mode; - private boolean _block; - private int _frameSize; - private int _messageSize; - private String _responseQueue; - private int _batchSize; - private double _rollbackRatio; - private String _linkName; - private String _containerName; - private boolean _durableLink; - private boolean _useMultipleConnections; - private int _windowSize = 100; - private String _subject; - private String _filter; - private String _remoteHost; - private boolean _useSSL; - - protected Util(String[] args) - { - CommandLineParser cmdLineParse = new PosixParser(); - - Options options = new Options(); - options.addOption("h","help",false,"show this help message and exit"); - options.addOption(OptionBuilder.withLongOpt("host") - .withDescription( "host to connect to (default 0.0.0.0)" ) - .hasArg(true) - .withArgName("HOST") - .create('H')); - options.addOption(OptionBuilder.withLongOpt("username") - .withDescription( "username to use for authentication" ) - .hasArg(true) - .withArgName("USERNAME") - .create('u')); - options.addOption(OptionBuilder.withLongOpt("password") - .withDescription( "password to use for authentication" ) - .hasArg(true) - .withArgName("PASSWORD") - .create('w')); - options.addOption(OptionBuilder.withLongOpt("port") - .withDescription( "port to connect to (default 5672)" ) - .hasArg(true) - .withArgName("PORT") - .create('p')); - options.addOption(OptionBuilder.withLongOpt("frame-size") - .withDescription( "specify the maximum frame size" ) - .hasArg(true) - .withArgName("FRAME_SIZE") - .create('f')); - options.addOption(OptionBuilder.withLongOpt("container-name") - .withDescription( "Container name" ) - .hasArg(true) - .withArgName("CONTAINER_NAME") - .create('C')); - - options.addOption(OptionBuilder.withLongOpt("ssl") - .withDescription("Use SSL") - .create('S')); - - options.addOption(OptionBuilder.withLongOpt("remote-hostname") - .withDescription( "hostname to supply in the open frame" ) - .hasArg(true) - .withArgName("HOST") - .create('O')); - - if(hasBlockOption()) - options.addOption(OptionBuilder.withLongOpt("block") - .withDescription("block until messages arrive") - .create('b')); - - if(hasCountOption()) - options.addOption(OptionBuilder.withLongOpt("count") - .withDescription( "number of messages to send (default 1)" ) - .hasArg(true) - .withArgName("COUNT") - .create('c')); - if(hasModeOption()) - options.addOption(OptionBuilder.withLongOpt("acknowledge-mode") - .withDescription( "acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once" ) - .hasArg(true) - .withArgName("MODE") - .create('k')); - - if(hasSubjectOption()) - options.addOption(OptionBuilder.withLongOpt("subject") - .withDescription( "subject message property" ) - .hasArg(true) - .withArgName("SUBJECT") - .create('s')); - - - if(hasSingleLinkPerConnectionMode()) - options.addOption(OptionBuilder.withLongOpt("single-link-per-connection") - .withDescription("acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once") - .hasArg(false) - .create('Z')); - - if(hasFilterOption()) - options.addOption(OptionBuilder.withLongOpt("filter") - .withDescription("filter, e.g. exact-subject=hello; matching-subject=%.a.#") - .hasArg(true) - .withArgName("<TYPE>=<VALUE>") - .create('F')); - - - if(hasTxnOption()) - { - options.addOption("x","txn",false,"use transactions"); - options.addOption(OptionBuilder.withLongOpt("batch-size") - .withDescription( "transaction batch size (default: 1)" ) - .hasArg(true) - .withArgName("BATCH-SIZE") - .create('B')); - options.addOption(OptionBuilder.withLongOpt("rollback-ratio") - .withDescription( "rollback ratio - must be between 0 and 1 (default: 0)" ) - .hasArg(true) - .withArgName("RATIO") - .create('R')); - } - - if(hasLinkDurableOption()) - { - options.addOption("d","durable-link",false,"use a durable link"); - } - - if(hasStdInOption()) - options.addOption("i","stdin",false,"read messages from stdin (one message per line)"); - - options.addOption(OptionBuilder.withLongOpt("trace") - .withDescription("trace logging specified categories: RAW, FRM") - .hasArg(true) - .withArgName("TRACE") - .create('t')); - if(hasSizeOption()) - options.addOption(OptionBuilder.withLongOpt("message-size") - .withDescription( "size to pad outgoing messages to" ) - .hasArg(true) - .withArgName("SIZE") - .create('z')); - - if(hasResponseQueueOption()) - options.addOption(OptionBuilder.withLongOpt("response-queue") - .withDescription( "response queue to reply to" ) - .hasArg(true) - .withArgName("RESPONSE_QUEUE") - .create('r')); - - if(hasLinkNameOption()) - { - options.addOption(OptionBuilder.withLongOpt("link") - .withDescription( "link name" ) - .hasArg(true) - .withArgName("LINK") - .create('l')); - } - - if(hasWindowSizeOption()) - { - options.addOption(OptionBuilder.withLongOpt("window-size") - .withDescription("credit window size") - .hasArg(true) - .withArgName("WINDOW-SIZE") - .create('W')); - } - - CommandLine cmdLine = null; - try - { - cmdLine = cmdLineParse.parse(options, args); - - } - catch (ParseException e) - { - printUsage(options); - System.exit(-1); - } - - if(cmdLine.hasOption('h') || cmdLine.getArgList().isEmpty()) - { - printUsage(options); - System.exit(0); - } - _host = cmdLine.getOptionValue('H',"0.0.0.0"); - _remoteHost = cmdLine.getOptionValue('O',null); - String portStr = cmdLine.getOptionValue('p',"5672"); - String countStr = cmdLine.getOptionValue('c',"1"); - - _useSSL = cmdLine.hasOption('S'); - - if(hasWindowSizeOption()) - { - String windowSizeStr = cmdLine.getOptionValue('W',"100"); - _windowSize = Integer.parseInt(windowSizeStr); - } - - if(hasSubjectOption()) - { - _subject = cmdLine.getOptionValue('s'); - } - - if(cmdLine.hasOption('u')) - { - _username = cmdLine.getOptionValue('u'); - } - - if(cmdLine.hasOption('w')) - { - _password = cmdLine.getOptionValue('w'); - } - - if(cmdLine.hasOption('F')) - { - _filter = cmdLine.getOptionValue('F'); - } - - _port = Integer.parseInt(portStr); - - _containerName = cmdLine.getOptionValue('C'); - - if(hasBlockOption()) - _block = cmdLine.hasOption('b'); - - if(hasLinkNameOption()) - _linkName = cmdLine.getOptionValue('l'); - - - if(hasLinkDurableOption()) - _durableLink = cmdLine.hasOption('d'); - - if(hasCountOption()) - _count = Integer.parseInt(countStr); - - if(hasStdInOption()) - _useStdIn = cmdLine.hasOption('i'); - - if(hasSingleLinkPerConnectionMode()) - _useMultipleConnections = cmdLine.hasOption('Z'); - - if(hasTxnOption()) - { - _useTran = cmdLine.hasOption('x'); - _batchSize = Integer.parseInt(cmdLine.getOptionValue('B',"1")); - _rollbackRatio = Double.parseDouble(cmdLine.getOptionValue('R',"0")); - } - - if(hasModeOption()) - { - _mode = AcknowledgeMode.ALO; - - if(cmdLine.hasOption('k')) - { - _mode = AcknowledgeMode.valueOf(cmdLine.getOptionValue('k')); - } - } - - if(hasResponseQueueOption()) - { - _responseQueue = cmdLine.getOptionValue('r'); - } - - _frameSize = Integer.parseInt(cmdLine.getOptionValue('f',"65536")); - - if(hasSizeOption()) - { - _messageSize = Integer.parseInt(cmdLine.getOptionValue('z',"-1")); - } - - String categoriesList = cmdLine.getOptionValue('t'); - String[]categories = categoriesList == null ? new String[0] : categoriesList.split("[, ]"); - for(String cat : categories) - { - if(cat.equalsIgnoreCase("FRM")) - { - FRAME_LOGGER.setLevel(Level.FINE); - Formatter formatter = new Formatter() - { - @Override - public String format(final LogRecord record) - { - return "[" + record.getMillis() + " FRM]\t" + record.getMessage() + "\n"; - } - }; - for(Handler handler : FRAME_LOGGER.getHandlers()) - { - FRAME_LOGGER.removeHandler(handler); - } - Handler handler = new ConsoleHandler(); - handler.setLevel(Level.FINE); - handler.setFormatter(formatter); - FRAME_LOGGER.addHandler(handler); - } - else if (cat.equalsIgnoreCase("RAW")) - { - RAW_LOGGER.setLevel(Level.FINE); - Formatter formatter = new Formatter() - { - @Override - public String format(final LogRecord record) - { - return "[" + record.getMillis() + " RAW]\t" + record.getMessage() + "\n"; - } - }; - for(Handler handler : RAW_LOGGER.getHandlers()) - { - RAW_LOGGER.removeHandler(handler); - } - Handler handler = new ConsoleHandler(); - handler.setLevel(Level.FINE); - handler.setFormatter(formatter); - RAW_LOGGER.addHandler(handler); - } - } - - - _args = cmdLine.getArgs(); - - } - - protected boolean hasFilterOption() - { - return false; - } - - protected boolean hasSubjectOption() - { - return false; - } - - protected boolean hasWindowSizeOption() - { - return false; - } - - protected boolean hasSingleLinkPerConnectionMode() - { - return false; - } - - protected abstract boolean hasLinkDurableOption(); - - protected abstract boolean hasLinkNameOption(); - - protected abstract boolean hasResponseQueueOption(); - - protected abstract boolean hasSizeOption(); - - protected abstract boolean hasBlockOption(); - - protected abstract boolean hasStdInOption(); - - protected abstract boolean hasTxnOption(); - - protected abstract boolean hasModeOption(); - - protected abstract boolean hasCountOption(); - - public String getHost() - { - return _host; - } - - public String getUsername() - { - return _username; - } - - public String getPassword() - { - return _password; - } - - public int getPort() - { - return _port; - } - - public int getCount() - { - return _count; - } - - public boolean useStdIn() - { - return _useStdIn; - } - - public boolean useTran() - { - return _useTran; - } - - public AcknowledgeMode getMode() - { - return _mode; - } - - public boolean isBlock() - { - return _block; - } - - public String[] getArgs() - { - return _args; - } - - public int getMessageSize() - { - return _messageSize; - } - - public String getResponseQueue() - { - return _responseQueue; - } - - public int getBatchSize() - { - return _batchSize; - } - - public double getRollbackRatio() - { - return _rollbackRatio; - } - - public String getLinkName() - { - return _linkName; - } - - public boolean isDurableLink() - { - return _durableLink; - } - - public boolean isUseMultipleConnections() - { - return _useMultipleConnections; - } - - public void setUseMultipleConnections(boolean useMultipleConnections) - { - _useMultipleConnections = useMultipleConnections; - } - - public String getSubject() - { - return _subject; - } - - public void setSubject(String subject) - { - _subject = subject; - } - - protected abstract void printUsage(final Options options); - - protected abstract void run(); - - - public Connection newConnection() throws Connection.ConnectionException - { - Container container = getContainerName() == null ? new Container() : new Container(getContainerName()); - return getUsername() == null ? new Connection(getHost(), getPort(), null, null, _frameSize, container, - _remoteHost == null ? getHost() : _remoteHost, _useSSL) - : new Connection(getHost(), getPort(), getUsername(), getPassword(), _frameSize, - container, _remoteHost == null ? getHost() : _remoteHost, _useSSL); - } - - public String getContainerName() - { - return _containerName; - } - - public int getWindowSize() - { - return _windowSize; - } - - public void setWindowSize(int windowSize) - { - _windowSize = windowSize; - } - - public String getFilter() - { - return _filter; - } -} |