summaryrefslogtreecommitdiff
path: root/java/amqp-1-0-client/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'java/amqp-1-0-client/src/main')
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java43
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java115
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java407
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java136
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java347
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java296
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java77
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java246
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java35
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java249
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java347
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java244
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SendBytes.java331
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java529
14 files changed, 45 insertions, 3357 deletions
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java
deleted file mode 100644
index 3bb26744c4..0000000000
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import java.lang.reflect.InvocationTargetException;
-
-public class Command
-{
- public static void main(String[] args) throws
- ClassNotFoundException,
- NoSuchMethodException,
- InvocationTargetException,
- IllegalAccessException,
- InstantiationException
- {
- String name = args[0];
- String[] cmdArgs = new String[args.length-1];
- System.arraycopy(args,1,cmdArgs,0,args.length-1);
- name = "org.apache.qpid.amqp_1_0.client." + String.valueOf(name.charAt(0)).toUpperCase() + name.substring(1).toLowerCase();
- Class<Util> clazz = (Class<Util>) Class.forName(name);
- Util util = clazz.getDeclaredConstructor(String[].class).newInstance((Object)cmdArgs);
- util.run();
-
- }
-}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
index e3d56fae09..e501662dbb 100644
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
+++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
@@ -20,6 +20,15 @@
*/
package org.apache.qpid.amqp_1_0.client;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.net.ssl.SSLSocketFactory;
import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
import org.apache.qpid.amqp_1_0.transport.AMQPTransport;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
@@ -30,17 +39,6 @@ import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import javax.net.ssl.SSLSocket;
-import javax.net.ssl.SSLSocketFactory;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
public class Connection
{
private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
@@ -224,7 +222,6 @@ public class Connection
}
- //ConnectionHandler.OutputHandler outputHandler = new ConnectionHandler.OutputHandler(outputStream, out, _conn.getDescribedTypeRegistry());
ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn);
Thread outputThread = new Thread(outputHandler);
outputThread.setDaemon(true);
@@ -236,8 +233,6 @@ public class Connection
final ConnectionHandler handler = new ConnectionHandler(_conn);
final InputStream inputStream = s.getInputStream();
- //final AMQPTransport transport = new AMQPTransport(new AMQPFrameTransport(_conn));
-
Thread inputThread = new Thread(new Runnable()
{
@@ -246,7 +241,6 @@ public class Connection
try
{
doRead(handler, inputStream);
-// doRead(transport, inputStream);
}
finally
{
@@ -268,85 +262,6 @@ public class Connection
inputThread.setDaemon(true);
inputThread.start();
-/*
- Thread outputThread = new Thread(new Runnable()
- {
-
- private int _lastWrite;
-
- public void run()
- {
- try
- {
-// doRead(handler, inputStream);
- final Object lock = new Object();
- transport.setOutputStateChangeListener(new StateChangeListener()
- {
-
- public void onStateChange(final boolean active)
- {
- synchronized (lock)
- {
- lock.notifyAll();
- }
- }
- });
-
- synchronized(lock)
- {
- while(transport.isOpenForOutput())
- {
- _lastWrite = 0;
- transport.getNextBytes(new BytesProcessor()
- {
-
- public void processBytes(final ByteBuffer buf)
- {
- _lastWrite = buf.remaining();
- try
- {
- outputStream.write(buf.array(),
- buf.arrayOffset() + buf.position(),
- buf.limit() - buf.position());
- }
- catch (IOException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- });
- if(_lastWrite == 0 && transport.isOpenForOutput())
- {
- try
- {
- lock.wait(1000);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- }
- }
- finally
- {
- if(_conn.closedForInput() && _conn.closedForOutput())
- {
- try
- {
- s.close();
- }
- catch (IOException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- }
- });
-*/
-
_conn.open();
}
@@ -394,7 +309,7 @@ public class Connection
}
catch (IOException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ e.printStackTrace();
}
}
@@ -419,7 +334,7 @@ public class Connection
{
int read;
boolean done = false;
- while(!done && (read = inputStream.read(buf)) != -1)
+ while(!handler.isDone() && (read = inputStream.read(buf)) != -1)
{
ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read);
Binary b = new Binary(buf,0,read);
@@ -428,12 +343,6 @@ public class Connection
{
RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString());
}
- /*System.err.println(b);
- System.err.println("XXX: " + bbuf.hasRemaining() + "; " + handler.isDone());
- if(handler.isDone())
- {
- System.err.println(handler.getClass().getName() + "IS DONE!");
- } */
while(bbuf.hasRemaining() && !handler.isDone())
{
handler.parse(bbuf);
@@ -444,7 +353,7 @@ public class Connection
}
catch (IOException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ e.printStackTrace();
}
}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
deleted file mode 100644
index b58ce6bfe5..0000000000
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class Demo extends Util
-{
- private static final String USAGE_STRING = "demo [options] <vendor> [<content> ...]\n\nOptions:";
- private static final String OPCODE = "opcode";
- private static final String ACTION = "action";
- private static final String MESSAGE_ID = "message-id";
- private static final String VENDOR = "vendor";
- private static final String LOG = "log";
- private static final String RECEIVED = "received";
- private static final String TEST = "test";
- private static final String APACHE = "apache";
- private static final String SENT = "sent";
- private static final String LINK_REF = "link-ref";
- private static final String HOST = "host";
- private static final String PORT = "port";
- private static final String SASL_USER = "sasl-user";
- private static final String SASL_PASSWORD = "sasl-password";
- private static final String ROLE = "role";
- private static final String ADDRESS = "address";
- private static final String SENDER = "sender";
- private static final String SEND_MESSAGE = "send-message";
- private static final String ANNOUNCE = "announce";
- private static final String MESSAGE_VENDOR = "message-vendor";
- private static final String CREATE_LINK = "create-link";
-
- public static void main(String[] args)
- {
- new Demo(args).run();
- }
-
- public Demo(String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return false;
- }
-
- public void run()
- {
-
- try
- {
-
- final String vendor = getArgs()[0];
- final String queue = "control";
-
- String message = "";
-
- Connection conn = newConnection();
- Session session = conn.createSession();
-
-
- Receiver responseReceiver;
-
- responseReceiver = session.createTemporaryQueueReceiver();
-
-
-
-
- responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
-
-
-
- Sender s = session.createSender(queue, getWindowSize(), getMode());
-
-
- Properties properties = new Properties();
- properties.setMessageId(java.util.UUID.randomUUID());
- properties.setReplyTo(responseReceiver.getAddress());
-
- HashMap appPropMap = new HashMap();
- ApplicationProperties appProperties = new ApplicationProperties(appPropMap);
-
- appPropMap.put(OPCODE, ANNOUNCE);
- appPropMap.put(VENDOR, vendor);
- appPropMap.put(ADDRESS,responseReceiver.getAddress());
-
- AmqpValue amqpValue = new AmqpValue(message);
- Section[] sections = { properties, appProperties, amqpValue};
- final Message message1 = new Message(Arrays.asList(sections));
-
- s.send(message1);
-
- Map<Object, Sender> sendingLinks = new HashMap<Object, Sender>();
- Map<Object, Receiver> receivingLinks = new HashMap<Object, Receiver>();
-
-
- boolean done = false;
-
- while(!done)
- {
- boolean wait = true;
- Message m = responseReceiver.receive(false);
- if(m != null)
- {
- List<Section> payload = m.getPayload();
- wait = false;
- ApplicationProperties props = m.getApplicationProperties();
- Map map = props.getValue();
- String op = (String) map.get(OPCODE);
- if("reset".equals(op))
- {
- for(Sender sender : sendingLinks.values())
- {
- try
- {
- sender.close();
- Session session1 = sender.getSession();
- session1.close();
- session1.getConnection().close();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- for(Receiver receiver : receivingLinks.values())
- {
- try
- {
- receiver.close();
- receiver.getSession().close();
- receiver.getSession().getConnection().close();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- sendingLinks.clear();
- receivingLinks.clear();
- }
- else if(CREATE_LINK.equals(op))
- {
- Object linkRef = map.get(LINK_REF);
- String host = (String) map.get(HOST);
- Object o = map.get(PORT);
- int port = Integer.parseInt(String.valueOf(o));
- String user = (String) map.get(SASL_USER);
- String password = (String) map.get(SASL_PASSWORD);
- String role = (String) map.get(ROLE);
- String address = (String) map.get(ADDRESS);
- System.err.println("Host: " + host + "\tPort: " + port + "\t user: " + user +"\t password: " + password);
- try{
-
-
- Connection conn2 = new Connection(host, port, user, password, host);
- Session session2 = conn2.createSession();
- if(sendingLinks.containsKey(linkRef))
- {
- try
- {
- sendingLinks.remove(linkRef).close();
- }
- catch (Exception e)
- {
-
- }
- }
- if(receivingLinks.containsKey(linkRef))
- {
- try
- {
- receivingLinks.remove(linkRef).close();
- }
- catch (Exception e)
- {
-
- }
- }
- if(SENDER.equals(role))
- {
-
- System.err.println("%%% Creating sender (" + linkRef + ")");
- Sender sender = session2.createSender(address);
- sendingLinks.put(linkRef, sender);
- }
- else
- {
-
- System.err.println("%%% Creating receiver (" + linkRef + ")");
- Receiver receiver2 = session2.createReceiver(address);
- receiver2.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
-
- receivingLinks.put(linkRef, receiver2);
- }
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- else if(SEND_MESSAGE.equals(op))
- {
- Sender sender = sendingLinks.get(map.get(LINK_REF));
- Properties m2props = new Properties();
- Object messageId = map.get(MESSAGE_ID);
- m2props.setMessageId(messageId);
- Map m2propmap = new HashMap();
- m2propmap.put(OPCODE, TEST);
- m2propmap.put(VENDOR, vendor);
- ApplicationProperties m2appProps = new ApplicationProperties(m2propmap);
- Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId)));
- sender.send(m2);
-
- Map m3propmap = new HashMap();
- m3propmap.put(OPCODE, LOG);
- m3propmap.put(ACTION, SENT);
- m3propmap.put(MESSAGE_ID, messageId);
- m3propmap.put(VENDOR, vendor);
- m3propmap.put(MESSAGE_VENDOR, vendor);
-
-
- Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
- new AmqpValue("AMQP-"+messageId)));
- s.send(m3);
-
- }
-
- responseReceiver.acknowledge(m);
- }
- else
- {
- for(Map.Entry<Object, Receiver> entry : receivingLinks.entrySet())
- {
- m = entry.getValue().receive(false);
- if(m != null)
- {
- wait = false;
-
- System.err.println("%%% Received message from " + entry.getKey());
-
- Properties mp = m.getProperties();
- ApplicationProperties ap = m.getApplicationProperties();
-
- Map m3propmap = new HashMap();
- m3propmap.put(OPCODE, LOG);
- m3propmap.put(ACTION, RECEIVED);
- m3propmap.put(MESSAGE_ID, mp.getMessageId());
- m3propmap.put(VENDOR, vendor);
- m3propmap.put(MESSAGE_VENDOR, ap.getValue().get(VENDOR));
-
- Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
- new AmqpValue("AMQP-"+mp.getMessageId())));
- s.send(m3);
-
- entry.getValue().acknowledge(m);
- }
-
- }
- }
-
- if(wait)
- {
- try
- {
- Thread.sleep(500l);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //TODO.
- }
- }
-
- }
-
-
-
-
-
-
-
-
-
- s.close();
- session.close();
- conn.close();
-
- }
- catch (Connection.ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (AmqpErrorException e)
- {
- e.printStackTrace(); //TODO.
- }
-
- }
-
- protected boolean hasSingleLinkPerConnectionMode()
- {
- return false;
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
deleted file mode 100644
index 65d27b21f8..0000000000
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.commons.cli.Options;
-
-public class Dump extends Util
-{
- private static final String USAGE_STRING = "dump [options] <address>\n\nOptions:";
-
-
- protected Dump(String[] args)
- {
- super(args);
- }
-
- public static void main(String[] args)
- {
- new Dump(args).run();
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected void printUsage(Options options)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected void run()
- {
- final String queue = getArgs()[0];
-
- try
- {
- Connection conn = newConnection();
-
- Session session = conn.createSession();
-
-
- Sender s = session.createSender(queue, 10);
-
- Message message = new Message("dump me");
- message.setDeliveryTag(new Binary("dump".getBytes()));
-
- s.send(message);
-
- s.close();
- session.close();
- conn.close();
-
- } catch (Connection.ConnectionException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- } catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
-}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java
deleted file mode 100644
index 4d98655ad2..0000000000
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-public class Filereceiver extends Util
-{
- private static final String USAGE_STRING = "filereceiver [options] <address> <directory>\n\nOptions:";
-
- protected Filereceiver(String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return false;
- }
-
- @Override
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
-
- }
-
- @Override
- protected void run()
- {
- final String queue = getArgs()[0];
- final String directoryName = getArgs()[1];
-
- try
- {
- Connection conn = newConnection();
-
- Session session = conn.createSession();
-
- final File directory = new File(directoryName);
- if(directory.isDirectory() && directory.canWrite())
- {
- File tmpDirectory = new File(directoryName, ".tmp");
- if(!tmpDirectory.exists())
- {
- tmpDirectory.mkdir();
- }
-
- String[] unsettledFiles = tmpDirectory.list();
-
- Map<Binary, Outcome> unsettled = new HashMap<Binary, Outcome>();
- final Map<Binary, String> unsettledFileNames = new HashMap<Binary, String>();
-
- Accepted accepted = new Accepted();
-
- for(String fileName : unsettledFiles)
- {
- File theFile = new File(tmpDirectory, fileName);
- if(theFile.isFile())
- {
- if(fileName.startsWith("~") && fileName.endsWith("~"))
- {
- theFile.delete();
- }
- else
- {
- int splitPoint = fileName.indexOf(".");
- String deliveryTagStr = fileName.substring(0,splitPoint);
- String actualFileName = fileName.substring(splitPoint+1);
-
- byte[] bytes = new byte[deliveryTagStr.length()/2];
-
-
- for(int i = 0; i < bytes.length; i++)
- {
- char c = deliveryTagStr.charAt(2*i);
- char d = deliveryTagStr.charAt(1+(2*i));
-
- bytes[i] = (byte) (((c <= '9' ? c - '0' : c - 'W') << 4)
- | (d <= '9' ? d - '0' : d - 'W'));
-
- }
- Binary deliveryTag = new Binary(bytes);
- unsettled.put(deliveryTag, accepted);
- unsettledFileNames.put(deliveryTag, fileName);
- }
- }
-
- }
-
- Receiver r = session.createReceiver(queue, AcknowledgeMode.EO, getLinkName(), isDurableLink(),
- unsettled);
-
- Map<Binary, Outcome> remoteUnsettled = r.getRemoteUnsettled();
-
- for(Map.Entry<Binary, String> entry : unsettledFileNames.entrySet())
- {
- if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey()))
- {
-
- File tmpFile = new File(tmpDirectory, entry.getValue());
- final File dest = new File(directory,
- entry.getValue().substring(entry.getValue().indexOf(".") + 1));
- if(dest.exists())
- {
- System.err.println("Duplicate detected - filename " + dest.getName());
- }
-
- tmpFile.renameTo(dest);
- }
- }
-
-
- int credit = 10;
-
- r.setCredit(UnsignedInteger.valueOf(credit), true);
-
-
- int received = 0;
- Message m = null;
- do
- {
- m = isBlock() && received == 0 ? r.receive() : r.receive(10000);
- if(m != null)
- {
- if(m.isResume() && unsettled.containsKey(m.getDeliveryTag()))
- {
- final String tmpFileName = unsettledFileNames.get(m.getDeliveryTag());
- final File unsettledFile = new File(tmpDirectory,
- tmpFileName);
- r.acknowledge(m, new Receiver.SettledAction()
- {
- public void onSettled(final Binary deliveryTag)
- {
- int splitPoint = tmpFileName.indexOf(".");
-
- String fileName = tmpFileName.substring(splitPoint+1);
-
- final File dest = new File(directory, fileName);
- if(dest.exists())
- {
- System.err.println("Duplicate detected - filename " + dest.getName());
- }
- unsettledFile.renameTo(dest);
- unsettledFileNames.remove(deliveryTag);
- }
- });
- }
- else
- {
- received++;
- List<Section> sections = m.getPayload();
- Binary deliveryTag = m.getDeliveryTag();
- StringBuilder tagNameBuilder = new StringBuilder();
-
- ByteBuffer dtbuf = deliveryTag.asByteBuffer();
- while(dtbuf.hasRemaining())
- {
- tagNameBuilder.append(String.format("%02x", dtbuf.get()));
- }
-
-
- ApplicationProperties properties = null;
- List<Binary> data = new ArrayList<Binary>();
- int totalSize = 0;
- for(Section section : sections)
- {
- if(section instanceof ApplicationProperties)
- {
- properties = (ApplicationProperties) section;
- }
- else if(section instanceof AmqpValue)
- {
- AmqpValue value = (AmqpValue) section;
- if(value.getValue() instanceof Binary)
- {
- Binary binary = (Binary) value.getValue();
- data.add(binary);
- totalSize += binary.getLength();
-
- }
- else
- {
- // TODO exception
- }
- }
- else if(section instanceof Data)
- {
- Data value = (Data) section;
- Binary binary = value.getValue();
- data.add(binary);
- totalSize += binary.getLength();
-
- }
- }
- if(properties != null)
- {
- final String fileName = (String) properties.getValue().get("filename");
- byte[] fileData = new byte[totalSize];
- ByteBuffer buf = ByteBuffer.wrap(fileData);
- int offset = 0;
- for(Binary bin : data)
- {
- buf.put(bin.asByteBuffer());
- }
- File outputFile = new File(tmpDirectory, "~"+fileName+"~");
- if(outputFile.exists())
- {
- outputFile.delete();
- }
- FileOutputStream fos = new FileOutputStream(outputFile);
- fos.write(fileData);
- fos.flush();
- fos.close();
-
- final File unsettledFile = new File(tmpDirectory, tagNameBuilder.toString() + "." +
- fileName);
- outputFile.renameTo(unsettledFile);
- r.acknowledge(m, new Receiver.SettledAction()
- {
- public void onSettled(final Binary deliveryTag)
- {
- final File dest = new File(directory, fileName);
- if(dest.exists())
- {
- System.err.println("Duplicate detected - filename " + dest.getName());
- }
- unsettledFile.renameTo(dest);
-
- }
- });
-
- }
- }
- }
- }
- while(m != null);
-
-
- r.close();
- }
- else
- {
- System.err.println("No such directory: " + directoryName);
- }
- session.close();
- conn.close();
- }
- catch (Connection.ConnectionException e)
- {
- e.printStackTrace();
- }
- catch (FileNotFoundException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (IOException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (AmqpErrorException e)
- {
- e.printStackTrace(); //TODO.
- }
-
- }
-
- public static void main(String[] args)
- {
- new Filereceiver(args).run();
- }
-}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
deleted file mode 100644
index 46e6ba537f..0000000000
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.*;
-
-public class Filesender extends Util
-{
- private static final String USAGE_STRING = "filesender [options] <address> <directory>\n\nOptions:";
-
- protected Filesender(String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return false;
- }
-
- @Override
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
-
- }
-
- @Override
- protected void run()
- {
- final String queue = getArgs()[0];
- final String directoryName = getArgs()[1];
-
- try
- {
- MessageDigest md5 = MessageDigest.getInstance("MD5");
- Connection conn = newConnection();
-
- Session session = conn.createSession();
-
- File directory = new File(directoryName);
- if(directory.isDirectory() && directory.canWrite())
- {
-
- File tmpDirectory = new File(directoryName, ".tmp");
- if(!tmpDirectory.exists())
- {
- tmpDirectory.mkdir();
- }
-
- String[] unsettledFiles = tmpDirectory.list();
-
-
-
- Map<Binary, Outcome> unsettled = new HashMap<Binary, Outcome>();
- Map<Binary, String> unsettledFileNames = new HashMap<Binary, String>();
- for(String fileName : unsettledFiles)
- {
- File aFile = new File(tmpDirectory, fileName);
- if(aFile.canRead() && aFile.canWrite())
- {
- Binary deliveryTag = new Binary(md5.digest(fileName.getBytes()));
- unsettled.put(deliveryTag, null);
- unsettledFileNames.put(deliveryTag, fileName);
- }
- }
-
-
- Sender s = session.createSender(queue, 10, AcknowledgeMode.EO, getLinkName(), isDurableLink(),
- unsettled);
-
- Map<Binary, DeliveryState> remoteUnsettled = s.getRemoteUnsettled();
-
- for(Map.Entry<Binary, String> entry: unsettledFileNames.entrySet())
- {
- if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey()))
- {
- (new File(tmpDirectory, entry.getValue())).renameTo(new File(directory, entry.getValue()));
- }
- }
-
- if(remoteUnsettled != null)
- {
- for(Map.Entry<Binary, DeliveryState> entry : remoteUnsettled.entrySet())
- {
- if(entry.getValue() instanceof Accepted)
- {
- final String fileName = unsettledFileNames.get(entry.getKey());
- if(fileName != null)
- {
-
- Message resumed = new Message();
- resumed.setDeliveryTag(entry.getKey());
- resumed.setDeliveryState(entry.getValue());
- resumed.setResume(Boolean.TRUE);
- resumed.setSettled(Boolean.TRUE);
-
-
-
- final File unsettledFile = new File(tmpDirectory, fileName);
- unsettledFile.delete();
-
- s.send(resumed);
-
- }
-
- }
- else if(entry.getValue() instanceof Received || entry.getValue() == null)
- {
- final File unsettledFile = new File(tmpDirectory, unsettledFileNames.get(entry.getKey()));
- Message resumed = createMessageFromFile(md5, unsettledFileNames.get(entry.getKey()), unsettledFile);
- resumed.setResume(Boolean.TRUE);
- Sender.OutcomeAction action = new Sender.OutcomeAction()
- {
- public void onOutcome(Binary deliveryTag, Outcome outcome)
- {
- if(outcome instanceof Accepted)
- {
- unsettledFile.delete();
- }
- }
- };
- s.send(resumed, action);
-
- }
- }
- }
-
-
-
- String[] files = directory.list();
-
- for(String fileName : files)
- {
- final File file = new File(directory, fileName);
-
- if(file.canRead() && file.canWrite() && !file.isDirectory())
- {
- Message message = createMessageFromFile(md5, fileName, file);
-
- final File unsettledFile = new File(tmpDirectory, fileName);
-
- Sender.OutcomeAction action = new Sender.OutcomeAction()
- {
- public void onOutcome(Binary deliveryTag, Outcome outcome)
- {
- if(outcome instanceof Accepted)
- {
- unsettledFile.delete();
- }
- }
- };
-
- file.renameTo(unsettledFile);
-
- s.send(message, action);
- }
- }
-
- s.close();
- }
- else
- {
- System.err.println("No such directory: " + directory);
- }
- session.close();
- conn.close();
- }
- catch (Connection.ConnectionException e)
- {
- e.printStackTrace();
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace();
- } catch (FileNotFoundException e)
- {
- e.printStackTrace();
- } catch (IOException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- } catch (NoSuchAlgorithmException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- } catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
-
- }
-
- private Message createMessageFromFile(MessageDigest md5, String fileName, File file) throws IOException
- {
- FileInputStream fis = new FileInputStream(file);
- byte[] data = new byte[(int) file.length()];
-
- int read = fis.read(data);
-
- fis.close();
-
- Section applicationProperties = new ApplicationProperties(Collections.singletonMap("filename", fileName));
- Section amqpValue = new Data(new Binary(data));
- Message message = new Message(Arrays.asList(applicationProperties, amqpValue));
- Binary deliveryTag = new Binary(md5.digest(fileName.getBytes()));
- message.setDeliveryTag(deliveryTag);
- md5.reset();
- return message;
- }
-
- public static void main(String[] args)
- {
- new Filesender(args).run();
- }
-}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java
deleted file mode 100644
index 07ae54b54f..0000000000
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.codec.ValueHandler;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-public class ReadBytes
-{
-
- public static void main(String[] args) throws IOException, AmqpErrorException
- {
-
- if(args.length == 0)
- {
- readBytes(System.in);
- }
- else
- {
- for(String fileName : args)
- {
- System.out.println("=========================== " + fileName + " ===========================");
- final FileInputStream fis = new FileInputStream(fileName);
- readBytes(fis);
- fis.close();
- }
- }
-
- }
-
- private static void readBytes(final InputStream inputStream) throws IOException, AmqpErrorException
- {
- byte[] bytes = new byte[4096];
-
- ValueHandler valueHandler = new ValueHandler(AMQPDescribedTypeRegistry.newInstance());
-
- int count;
-
- while((count = inputStream.read(bytes))!=-1)
- {
- ByteBuffer buf = ByteBuffer.wrap(bytes);
- buf.limit(count);
- while(buf.hasRemaining())
- {
-
- final Object value = valueHandler.parse(buf);
- System.out.print((value == null ? "" : value.getClass().getName() + ":") +value +"\n");
-
- }
- }
-
- }
-
-
-}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java
deleted file mode 100644
index 0da9dc3fb7..0000000000
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-import org.apache.commons.cli.*;
-import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter;
-
-import java.util.Collections;
-
-public class Receive extends Util
-{
- private static final String USAGE_STRING = "receive [options] <address> \n\nOptions:";
- private static final UnsignedLong UNSIGNED_LONG_ONE = UnsignedLong.valueOf(1L);
- private UnsignedLong _lastCorrelationId;
-
- public static void main(String[] args)
- {
- new Receive(args).run();
- }
-
-
- public Receive(final String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasFilterOption()
- {
- return true;
- }
-
- protected void run()
- {
-
- try
- {
- final String queue = getArgs()[0];
-
- String message = "";
-
- Connection conn = newConnection();
-
-
- Session session = conn.createSession();
-
- Filter filter = null;
- if(getFilter() != null)
- {
- String[] filterParts = getFilter().split("=",2);
- if("exact-subject".equals(filterParts[0]))
- {
- filter = new ExactSubjectFilter(filterParts[1]);
- }
- else if("matching-subject".equals(filterParts[0]))
- {
- filter = new MatchingSubjectFilter(filterParts[1]);
- }
- else
- {
- System.err.println("Unknown filter type: " + filterParts[0]);
- }
- }
-
- Receiver r =
- filter == null
- ? session.createReceiver(queue, getMode(), getLinkName(), isDurableLink())
- : session.createReceiver(queue, getMode(), getLinkName(), isDurableLink(), Collections.singletonMap(Symbol.valueOf("filter"), filter), null);
- Transaction txn = null;
-
- int credit = 0;
- int receivedCount = 0;
-
- if(!useStdIn())
- {
- if(getArgs().length <= 2)
- {
-
- Transaction txn2 = null;
- if(useTran())
- {
- txn = session.createSessionLocalTransaction();
- txn2 = session.createSessionLocalTransaction();
- }
-
- for(int i = 0; i < getCount(); i++)
- {
-
- if(credit == 0)
- {
- if(getCount() - i <= getWindowSize())
- {
- credit = getCount() - i;
-
- }
- else
- {
- credit = getWindowSize();
-
- }
-
- {
- r.setCredit(UnsignedInteger.valueOf(credit), false);
- }
- if(!isBlock())
- r.drain();
- }
-
- Message m = isBlock() ? r.receive() : r.receive(1000L);
- credit--;
- if(m==null)
- {
- break;
- }
-
-
-
- r.acknowledge(m.getDeliveryTag(),txn);
-
- receivedCount++;
-
- System.out.println("Received Message : " + m.getPayload());
- }
-
- if(useTran())
- {
- txn.commit();
- }
- }
- else
- {
- // TODO
- }
- }
- else
- {
- // TODO
- }
- r.close();
- session.close();
- conn.close();
- System.out.println("Total Messages Received: " + receivedCount);
- }
- catch (Connection.ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (AmqpErrorException e)
- {
- e.printStackTrace(); //TODO.
- }
-
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
index ad390fd498..8b792db1f1 100644
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
+++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
@@ -241,7 +241,7 @@ public class Receiver implements DeliveryStateHandler
}
if(hasMore)
{
- xfr = receiveFromPrefetch(0L);
+ xfr = receiveFromPrefetch(-1l);
if(xfr== null)
{
// TODO - this is wrong!!!!
@@ -503,6 +503,37 @@ public class Receiver implements DeliveryStateHandler
_endpoint.drain();
}
+ /**
+ * Waits for the receiver to drain or a message to be available to be received.
+ * @return true if the receiver has been drained.
+ */
+ public boolean drainWait()
+ {
+ final Object lock = _endpoint.getLock();
+ synchronized(lock)
+ {
+ try
+ {
+ while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() )
+ {
+ lock.wait();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ return _prefetchQueue.peek()==null && _endpoint.isDrained();
+ }
+
+ /**
+ * Clears the receiver drain so that message delivery can resume.
+ */
+ public void clearDrain()
+ {
+ _endpoint.clearDrain();
+ }
+
public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn)
{
_endpoint.setLinkCredit(credit);
@@ -558,4 +589,4 @@ public class Receiver implements DeliveryStateHandler
void messageArrived(Receiver receiver);
}
-} \ No newline at end of file
+}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
deleted file mode 100644
index 6e1d15376c..0000000000
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.commons.cli.*;
-
-import java.util.Arrays;
-
-public class Request extends Util
-{
- private static final String USAGE_STRING = "request [options] <address> [<content> ...]\n\nOptions:";
-
- public static void main(String[] args)
- {
- new Request(args).run();
- }
-
- public Request(String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return true;
- }
-
- public void run()
- {
-
- try
- {
-
-
- final String queue = getArgs()[0];
-
- String message = "";
-
- Connection conn = newConnection();
- Session session = conn.createSession();
-
- Connection conn2;
- Session session2;
- Receiver responseReceiver;
-
- if(isUseMultipleConnections())
- {
- conn2 = newConnection();
- session2 = conn2.createSession();
- responseReceiver = session2.createTemporaryQueueReceiver();
- }
- else
- {
- conn2 = null;
- session2 = null;
- responseReceiver = session.createTemporaryQueueReceiver();
- }
-
-
-
-
- responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
-
-
-
- Sender s = session.createSender(queue, getWindowSize(), getMode());
-
- Transaction txn = null;
-
- if(useTran())
- {
- txn = session.createSessionLocalTransaction();
- }
-
- int received = 0;
-
- if(getArgs().length >= 2)
- {
- message = getArgs()[1];
- if(message.length() < getMessageSize())
- {
- StringBuilder builder = new StringBuilder(getMessageSize());
- builder.append(message);
- for(int x = message.length(); x < getMessageSize(); x++)
- {
- builder.append('.');
- }
- message = builder.toString();
- }
-
- for(int i = 0; i < getCount(); i++)
- {
- Properties properties = new Properties();
- properties.setMessageId(UnsignedLong.valueOf(i));
- properties.setReplyTo(responseReceiver.getAddress());
-
- AmqpValue amqpValue = new AmqpValue(message);
- Section[] sections = { new Header() , properties, amqpValue};
- final Message message1 = new Message(Arrays.asList(sections));
-
- s.send(message1, txn);
-
- Message responseMessage = responseReceiver.receive(false);
- if(responseMessage != null)
- {
- responseReceiver.acknowledge(responseMessage.getDeliveryTag(),txn);
- received++;
- }
- }
- }
-
- if(txn != null)
- {
- txn.commit();
- }
-
-
- while(received < getCount())
- {
- Message responseMessage = responseReceiver.receive();
- responseReceiver.acknowledge(responseMessage.getDeliveryTag());
- received++;
- }
-
-
-
-
- s.close();
- session.close();
- conn.close();
-
- if(session2 != null)
- {
- session2.close();
- conn2.close();
- }
- }
- catch (Connection.ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (AmqpErrorException e)
- {
- e.printStackTrace(); //TODO.
- }
-
- }
-
- protected boolean hasSingleLinkPerConnectionMode()
- {
- return true;
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
deleted file mode 100644
index 8d9de4893f..0000000000
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.commons.cli.*;
-
-import java.util.*;
-
-public class Respond extends Util
-{
- private static final String USAGE_STRING = "respond [options] <address>\n\nOptions:";
- private Connection _conn;
- private Session _session;
- private Receiver _receiver;
- private Transaction _txn;
- private Map<String,Sender> _senders;
- private UnsignedLong _responseMsgId = UnsignedLong.ZERO;
- private Connection _conn2;
- private Session _session2;
-
- public Respond(final String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasSingleLinkPerConnectionMode()
- {
- return true;
- }
-
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return true;
- }
-
- public static void main(String[] args)
- {
- new Respond(args).run();
- }
-
- public void run()
- {
- try
- {
-
- _senders = new HashMap<String, Sender>();
-
- final String queue = getArgs()[0];
-
- String message = "";
-
- _conn = newConnection();
-
-
-
- if(isUseMultipleConnections())
- {
- _conn2 = newConnection();
- _session2 = _conn2.createSession();
- }
-
-
- _session = _conn.createSession();
-
-
- _receiver = _session.createReceiver(queue, getMode());
- _txn = null;
-
- int credit = 0;
- int receivedCount = 0;
- _responseMsgId = UnsignedLong.ZERO;
-
- Random random = null;
- int batch = 0;
- List<Message> txnMessages = null;
- if(useTran())
- {
- if(getRollbackRatio() != 0)
- {
- random = new Random();
- }
- batch = getBatchSize();
- _txn = _session.createSessionLocalTransaction();
- txnMessages = new ArrayList<Message>(batch);
- }
-
-
- for(int i = 0; receivedCount < getCount(); i++)
- {
-
- if(credit == 0)
- {
- if(getCount() - i <= getWindowSize())
- {
- credit = getCount() - i;
-
- }
- else
- {
- credit = getWindowSize();
-
- }
-
- _receiver.setCredit(UnsignedInteger.valueOf(credit), false);
-
- if(!isBlock())
- _receiver.drain();
- }
-
- Message m = isBlock() ? (receivedCount == 0 ? _receiver.receive() : _receiver.receive(10000L)) : _receiver.receive(1000L);
- credit--;
- if(m==null)
- {
- if(useTran() && batch != getBatchSize())
- {
- _txn.commit();
- }
- break;
- }
-
- System.out.println("Received Message: " + m.getPayload());
-
- respond(m);
-
-
-
- if(useTran())
- {
-
- txnMessages.add(m);
-
- if(--batch == 0)
- {
-
- if(getRollbackRatio() == 0 || random.nextDouble() >= getRollbackRatio())
- {
- _txn.commit();
- txnMessages.clear();
- receivedCount += getBatchSize();
- }
- else
- {
- System.out.println("Random Rollback");
- _txn.rollback();
- double result;
- do
- {
- _txn = _session.createSessionLocalTransaction();
-
- for(Message msg : txnMessages)
- {
- respond(msg);
- }
-
- result = random.nextDouble();
- if(result<getRollbackRatio())
- {
- _txn.rollback();
- }
- else
- {
- _txn.commit();
- txnMessages.clear();
- receivedCount += getBatchSize();
- }
- }
- while(result < getRollbackRatio());
- }
- _txn = _session.createSessionLocalTransaction();
-
- batch = getBatchSize();
- }
- }
- else
- {
- receivedCount++;
- }
-
- }
-
-
- for(Sender s : _senders.values())
- {
- s.close();
- }
-
- _receiver.close();
- _session.close();
- _conn.close();
- System.out.println("Received: " + receivedCount);
- }
- catch (Connection.ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (AmqpErrorException e)
- {
- e.printStackTrace(); //TODO.
- }
- }
-
- private void respond(Message m) throws Sender.SenderCreationException
- {
- List<Section> sections = m.getPayload();
- String replyTo = null;
- Object correlationId = null;
- for(Section section : sections)
- {
- if(section instanceof Properties)
- {
- replyTo = getResponseQueue() == null ? ((Properties)section).getReplyTo() : getResponseQueue();
- correlationId = ((Properties) section).getMessageId();
- break;
- }
- }
-
- if(replyTo != null)
- {
- Sender s = _senders.get(replyTo);
- if(s == null)
- {
- s = (isUseMultipleConnections() ? _session2 : _session).createSender(replyTo,getWindowSize());
- _senders.put(replyTo, s);
- }
-
- List<Section> replySections = new ArrayList<Section>(sections);
-
- ListIterator<Section> sectionIterator = replySections.listIterator();
-
- while(sectionIterator.hasNext())
- {
- Section section = sectionIterator.next();
- if(section instanceof Properties)
- {
- Properties newProps = new Properties();
- newProps.setTo(replyTo);
- newProps.setCorrelationId(correlationId);
- newProps.setMessageId(_responseMsgId);
- _responseMsgId = _responseMsgId.add(UnsignedLong.ONE);
- sectionIterator.set(newProps);
- }
- }
-
- Message replyMessage = new Message(replySections);
- System.out.println("Sent Message: " + replySections);
- s.send(replyMessage, _txn);
-
- }
- _receiver.acknowledge(m.getDeliveryTag(), _txn);
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
deleted file mode 100644
index 6f6575e083..0000000000
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.LineNumberReader;
-import java.util.Arrays;
-
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.commons.cli.*;
-
-public class Send extends Util
-{
- private static final String USAGE_STRING = "send [options] <address> [<content> ...]\n\nOptions:";
- private static final char[] HEX = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'};
-
-
- public static void main(String[] args) throws Sender.SenderCreationException, Sender.SenderClosingException, Connection.ConnectionException
- {
- new Send(args).run();
- }
-
-
- public Send(final String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasSubjectOption()
- {
- return true;
- }
-
- public void run()
- {
-
- final String queue = getArgs()[0];
-
- String message = "";
-
- try
- {
- Connection conn = newConnection();
-
- Session session = conn.createSession();
-
-
- Sender s = session.createSender(queue, getWindowSize(), getMode(), getLinkName());
-
- Transaction txn = null;
-
- if(useTran())
- {
- txn = session.createSessionLocalTransaction();
- }
-
- if(!useStdIn())
- {
- if(getArgs().length <= 2)
- {
- if(getArgs().length == 2)
- {
- message = getArgs()[1];
- }
- for(int i = 0; i < getCount(); i++)
- {
-
- Properties properties = new Properties();
- properties.setMessageId(UnsignedLong.valueOf(i));
- if(getSubject() != null)
- {
- properties.setSubject(getSubject());
- }
- Section bodySection;
- byte[] bytes = (message + " " + i).getBytes();
- if(bytes.length < getMessageSize())
- {
- byte[] origBytes = bytes;
- bytes = new byte[getMessageSize()];
- System.arraycopy(origBytes,0,bytes,0,origBytes.length);
- for(int x = origBytes.length; x < bytes.length; x++)
- {
- bytes[x] = (byte) '.';
- }
- bodySection = new Data(new Binary(bytes));
- }
- else
- {
- bodySection = new AmqpValue(message + " " + i);
- }
-
- Section[] sections = {properties, bodySection};
- final Message message1 = new Message(Arrays.asList(sections));
-
- s.send(message1, txn);
- }
- }
- else
- {
- for(int i = 1; i < getArgs().length; i++)
- {
- s.send(new Message(getArgs()[i]), txn);
- }
-
- }
- }
- else
- {
- LineNumberReader buf = new LineNumberReader(new InputStreamReader(System.in));
-
-
- try
- {
- while((message = buf.readLine()) != null)
- {
- s.send(new Message(message), txn);
- }
- }
- catch (IOException e)
- {
- // TODO
- e.printStackTrace();
- }
- }
-
- if(txn != null)
- {
- txn.commit();
- }
-
- s.close();
-
- session.close();
- conn.close();
- }
- catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Connection.ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //TODO.
- }
-
-
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SendBytes.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SendBytes.java
deleted file mode 100644
index 6f97ecd810..0000000000
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SendBytes.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.codec.FrameWriter;
-import org.apache.qpid.amqp_1_0.codec.ValueWriter;
-import org.apache.qpid.amqp_1_0.framing.AMQFrame;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.FrameBody;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.UnsignedByte;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-import org.apache.qpid.amqp_1_0.type.UnsignedShort;
-import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.amqp_1_0.type.messaging.Footer;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.qpid.amqp_1_0.type.transport.Flow;
-
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-
-public class SendBytes
-{
-
- public static void main(String[] args) throws
- Sender.SenderCreationException,
- Sender.SenderClosingException,
- Connection.ConnectionException,
- IOException, ParseException
- {
- Transfer xfr = new Transfer();
- Flow fs = new Flow();
- fs.setIncomingWindow(UnsignedInteger.valueOf(1024));
- fs.setDeliveryCount(UnsignedInteger.valueOf(2));
- fs.setLinkCredit(UnsignedInteger.valueOf(18));
- fs.setAvailable(UnsignedInteger.valueOf(0));
- fs.setDrain(false);
-
- xfr.setHandle(UnsignedInteger.valueOf(0));
- xfr.setDeliveryTag(new Binary("\"queue\"<-6ec024a7-d98e-4196-9348-15f6026c32ca:0".getBytes()));
- //xfr.setDeliveryTag(new Binary(new byte[] {0}));
- xfr.setDeliveryId(UnsignedInteger.valueOf(0));
- xfr.setSettled(true);
-
-
- Header h = new Header();
- Properties p = new Properties();
- p.setTo("queue");
- //p.setMessageId(new Binary(UUID.randomUUID().toString().getBytes()));
-
- Footer f = new Footer(Collections.EMPTY_MAP);
-
- Section[] sections = new Section[] { h,p,f};
- //Section[] sections = new Section[] { b };
- //Section[] sections = { h,p, b};
-/*
- Fragment[] fragments = new Fragment[5];
-
- final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance().registerTransportLayer().registerMessagingLayer();
-
- SectionEncoderImpl encoder = new SectionEncoderImpl(typeRegistry);
-
- int num = 0;
- int i = 0;
- for(Section s : sections)
- {
- Fragment frag = new Fragment();
-
- frag.setPayload(s.encode(encoder));
- frag.setFirst(true);
- frag.setLast(true);
- frag.setSectionCode(s.getSectionCode());
- frag.setSectionNumber(UnsignedInteger.valueOf(num++));
- frag.setSectionOffset(UnsignedLong.valueOf(0L));
- fragments[i++] =frag;
- }
-
- xfr.setFragments(fragments);
-*/
-
- encodeTypes("xfr",xfr);
-
- final byte[] result;
- final Object input = xfr;
-/*
- result = encode(1024, input);
-
- boolean ok = true;
-
- for(int j = 10; ok && j < 400; j++)
- {
-
- byte[] result2 = encode(j,input);
-
- for(int i = 0; i <400; i++)
- {
- if(result[i] != result2[i])
- {
- System.out.println("result differs at " + i + " Splitting at " + j+ " [" + result[i] + " - " + result2[i] + "]");
- //break;
- //ok = false;
-
- }
- }
- }*/
- //System.out.println(Arrays.equals(result, result2));
-
- //doEncodes();
- /*OutputStream out = System.out;
- if(args.length > 0)
- {
- out = new FileOutputStream(args[0]);
- }
-
- Transfer xfr = new Transfer();
- fs.setSessionCredit(UnsignedInteger.valueOf(1024));
- fs.setTransferCount(UnsignedInteger.valueOf(2));
- fs.setLinkCredit(UnsignedInteger.valueOf(18));
- fs.setAvailable(UnsignedInteger.valueOf(0));
- fs.setDrain(false);
-
- xfr.setHandle(UnsignedInteger.valueOf(0));
- //xfr.setDeliveryTag(new Binary("\"queue\"<-6ec024a7-d98e-4196-9348-15f6026c32ca:0".getBytes()));
- xfr.setDeliveryTag(new Binary(new byte[] {0}));
- xfr.setTransferId(UnsignedInteger.valueOf(0));
- xfr.setSettled(true);
- xfr.setFlowState(fs);
-
- Header h = new Header();
- h.setTransmitTime(new Date(System.currentTimeMillis()));
- Properties p = new Properties();
- p.setTo(new Address("queue"));
- //p.setMessageId(new Binary(UUID.randomUUID().toString().getBytes()));
- AmqpMapSection m = new AmqpMapSection();
- DataSection b = new DataSection("Hello World!".getBytes());
-
- Footer f = new Footer();
-
- Section[] sections = new Section[] { h,p,m,b,f};
- //Section[] sections = new Section[] { b };
- //Section[] sections = { h,p, b};
- List<Fragment> fragments = new ArrayList<Fragment>(5);
-
- final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance();
-
- SectionEncoderImpl encoder = new SectionEncoderImpl(typeRegistry);
-
- for(Section s : sections)
- {
- Fragment frag = new Fragment();
-
- frag.setPayload(s.encode(encoder));
- frag.setFirst(true);
- frag.setLast(true);
- frag.setFormatCode(s.getSectionCode());
- frag.setFragmentOffset(null);
- fragments.add(frag);
- }
-
- xfr.setFragments(fragments);
-
-
- Object[] objectsToWrite = new Object[] { xfr };
- ByteBuffer buf = ByteBuffer.allocate(4096);
-
-
- for(Object obj : objectsToWrite)
- {
- ValueWriter writer = typeRegistry.getValueWriter(obj);
-
- int count;
-
-
- do
- {
- count = writer.writeToBuffer(buf);
- out.write(buf.array(), buf.arrayOffset(), count);
- buf.clear();
- } while (!writer.isComplete());
-
- }
-
- out.flush();
- out.close();*/
-
- }
-
- public static void doEncodes() throws IOException, ParseException
- {
- encodeTypes("boolean", Boolean.TRUE, Boolean.FALSE);
- encodeTypes("ubyte", UnsignedByte.valueOf((byte)0), UnsignedByte.valueOf((byte)1 ),UnsignedByte.valueOf((byte)3), UnsignedByte.valueOf((byte)42), UnsignedByte.valueOf("255"));
- encodeTypes("byte", Byte.valueOf((byte)0), Byte.valueOf( (byte)1), Byte.valueOf((byte) 3), Byte.valueOf((byte) 42), Byte.valueOf((byte) 127), Byte.valueOf((byte) -1), Byte.valueOf((byte) -3), Byte.valueOf((byte) -42), Byte.valueOf( (byte)-128));
- encodeTypes("ushort", UnsignedShort.valueOf((short)0), UnsignedShort.valueOf((short)1), UnsignedShort.valueOf((short)3), UnsignedShort.valueOf((short)42), UnsignedShort.valueOf("65535"));
- encodeTypes("short", Short.valueOf((short)0), Short.valueOf((short)1), Short.valueOf((short)3), Short.valueOf((short)42), Short.valueOf((short)32767), Short.valueOf((short)-1), Short.valueOf((short)-3), Short.valueOf((short)-42), Short.valueOf((short)-32768));
- encodeTypes("uint",UnsignedInteger.valueOf(0), UnsignedInteger.valueOf(1), UnsignedInteger.valueOf(3), UnsignedInteger.valueOf(42), UnsignedInteger.valueOf("4294967295"));
- encodeTypes("int", 0, 1, 3, 42, 2147483647, -1, -3, -42, -2147483648);
- encodeTypes("ulong", UnsignedLong.valueOf(0), UnsignedLong.valueOf(1), UnsignedLong.valueOf(3), UnsignedLong.valueOf(42), UnsignedLong.valueOf("18446744073709551615"));
- encodeTypes("long", 0l, 1l, 3l, 42l, 9223372036854775807l, -1l, -3l, -42l, -9223372036854775808l);
- encodeTypes("float", 3.14159);
- encodeTypes("double", Double.valueOf(3.14159265359));
- encodeTypes("char", '?');
-
- SimpleDateFormat df = new SimpleDateFormat("HHa z MMM d yyyy");
-
- encodeTypes("timestamp", df.parse("9AM PST Dec 6 2010"), df.parse("9AM PST Dec 6 1910"));
- encodeTypes("uuid", UUID.fromString("f275ea5e-0c57-4ad7-b11a-b20c563d3b71"));
- encodeTypes("binary", new Binary( new byte[] {(byte)0xDE, (byte)0xAD, (byte)0xBE, (byte)0xEF}), new Binary(new byte[] { (byte)0xCA,(byte)0xFE, (byte)0xBA, (byte)0xBE}));
- encodeTypes("string", "The quick brown fox jumped over the lazy cow.");
- encodeTypes("symbol", Symbol.valueOf("connectathon"));
- encodeTypes("list", Arrays.asList(new Object[] {Long.valueOf(1), "two", Double.valueOf(3.14159265359), null, Boolean.FALSE}));
- Map map = new HashMap();
- map.put("one", Long.valueOf(1));
- map.put("two", Long.valueOf(2));
- map.put("pi", Double.valueOf(3.14159265359));
- map.put("list:", Arrays.asList(new Object[] {Long.valueOf(1), "two", Double.valueOf(3.14159265359), null, Boolean.FALSE}));
- map.put(null, Boolean.TRUE);
- encodeTypes("map", map);
- encodeTypes("null", null);
-
- }
-
- static void encodeTypes(String name, Object... vals ) throws IOException
- {
- FileOutputStream out = new FileOutputStream("/home/rob/"+name+".out");
- ByteBuffer buf = ByteBuffer.allocate(4096);
- final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance();
-
- if(vals != null)
- {
- for(Object obj : vals)
- {
- ValueWriter writer = typeRegistry.getValueWriter(obj);
-
- int count;
-
-
- do
- {
- count = writer.writeToBuffer(buf);
- out.write(buf.array(), buf.arrayOffset(), count);
- buf.clear();
- } while (!writer.isComplete());
-
- }
- }
- else
- {
- ValueWriter writer = typeRegistry.getValueWriter(null);
-
- int count;
-
-
- do
- {
- count = writer.writeToBuffer(buf);
- out.write(buf.array(), buf.arrayOffset(), count);
- buf.clear();
- } while (!writer.isComplete());
-
- }
- out.flush();
- out.close();
-
- }
-
- static byte[] encode(int size, Object... vals)
- {
- byte[] result = new byte[10000];
- int pos = 0;
-
- final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance();
- AMQFrame frame = AMQFrame.createAMQFrame((short) 0, (FrameBody) vals[0]);
- FrameWriter writer = new FrameWriter(typeRegistry);
- /*for(Object obj : vals)
- {
- final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance();
- ValueWriter writer = typeRegistry.getValueWriter(obj);
-*/
- int count;
-
- ByteBuffer buf = ByteBuffer.wrap(result, pos, size);
-
- do
- {
-
- writer.writeToBuffer(buf);
- pos = buf.position();
- buf = ByteBuffer.wrap(result, pos, size);
- if(!writer.isComplete())
- {
- count = 3;
- }
-
- } while (!writer.isComplete());
-/*
-
- }
-*/
-
- return result;
-
- }
-
-
-}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java
deleted file mode 100644
index 6fe2a6d510..0000000000
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java
+++ /dev/null
@@ -1,529 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.transport.Container;
-import org.apache.commons.cli.*;
-
-import java.util.logging.*;
-
-public abstract class Util
-{
-
- private static final Logger FRAME_LOGGER = Logger.getLogger("FRM");
- private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
- private String _host;
- private String _username;
- private String _password;
- private int _port;
- private int _count;
- private boolean _useStdIn;
- private boolean _useTran;
- private String[] _args;
- private AcknowledgeMode _mode;
- private boolean _block;
- private int _frameSize;
- private int _messageSize;
- private String _responseQueue;
- private int _batchSize;
- private double _rollbackRatio;
- private String _linkName;
- private String _containerName;
- private boolean _durableLink;
- private boolean _useMultipleConnections;
- private int _windowSize = 100;
- private String _subject;
- private String _filter;
- private String _remoteHost;
- private boolean _useSSL;
-
- protected Util(String[] args)
- {
- CommandLineParser cmdLineParse = new PosixParser();
-
- Options options = new Options();
- options.addOption("h","help",false,"show this help message and exit");
- options.addOption(OptionBuilder.withLongOpt("host")
- .withDescription( "host to connect to (default 0.0.0.0)" )
- .hasArg(true)
- .withArgName("HOST")
- .create('H'));
- options.addOption(OptionBuilder.withLongOpt("username")
- .withDescription( "username to use for authentication" )
- .hasArg(true)
- .withArgName("USERNAME")
- .create('u'));
- options.addOption(OptionBuilder.withLongOpt("password")
- .withDescription( "password to use for authentication" )
- .hasArg(true)
- .withArgName("PASSWORD")
- .create('w'));
- options.addOption(OptionBuilder.withLongOpt("port")
- .withDescription( "port to connect to (default 5672)" )
- .hasArg(true)
- .withArgName("PORT")
- .create('p'));
- options.addOption(OptionBuilder.withLongOpt("frame-size")
- .withDescription( "specify the maximum frame size" )
- .hasArg(true)
- .withArgName("FRAME_SIZE")
- .create('f'));
- options.addOption(OptionBuilder.withLongOpt("container-name")
- .withDescription( "Container name" )
- .hasArg(true)
- .withArgName("CONTAINER_NAME")
- .create('C'));
-
- options.addOption(OptionBuilder.withLongOpt("ssl")
- .withDescription("Use SSL")
- .create('S'));
-
- options.addOption(OptionBuilder.withLongOpt("remote-hostname")
- .withDescription( "hostname to supply in the open frame" )
- .hasArg(true)
- .withArgName("HOST")
- .create('O'));
-
- if(hasBlockOption())
- options.addOption(OptionBuilder.withLongOpt("block")
- .withDescription("block until messages arrive")
- .create('b'));
-
- if(hasCountOption())
- options.addOption(OptionBuilder.withLongOpt("count")
- .withDescription( "number of messages to send (default 1)" )
- .hasArg(true)
- .withArgName("COUNT")
- .create('c'));
- if(hasModeOption())
- options.addOption(OptionBuilder.withLongOpt("acknowledge-mode")
- .withDescription( "acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once" )
- .hasArg(true)
- .withArgName("MODE")
- .create('k'));
-
- if(hasSubjectOption())
- options.addOption(OptionBuilder.withLongOpt("subject")
- .withDescription( "subject message property" )
- .hasArg(true)
- .withArgName("SUBJECT")
- .create('s'));
-
-
- if(hasSingleLinkPerConnectionMode())
- options.addOption(OptionBuilder.withLongOpt("single-link-per-connection")
- .withDescription("acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once")
- .hasArg(false)
- .create('Z'));
-
- if(hasFilterOption())
- options.addOption(OptionBuilder.withLongOpt("filter")
- .withDescription("filter, e.g. exact-subject=hello; matching-subject=%.a.#")
- .hasArg(true)
- .withArgName("<TYPE>=<VALUE>")
- .create('F'));
-
-
- if(hasTxnOption())
- {
- options.addOption("x","txn",false,"use transactions");
- options.addOption(OptionBuilder.withLongOpt("batch-size")
- .withDescription( "transaction batch size (default: 1)" )
- .hasArg(true)
- .withArgName("BATCH-SIZE")
- .create('B'));
- options.addOption(OptionBuilder.withLongOpt("rollback-ratio")
- .withDescription( "rollback ratio - must be between 0 and 1 (default: 0)" )
- .hasArg(true)
- .withArgName("RATIO")
- .create('R'));
- }
-
- if(hasLinkDurableOption())
- {
- options.addOption("d","durable-link",false,"use a durable link");
- }
-
- if(hasStdInOption())
- options.addOption("i","stdin",false,"read messages from stdin (one message per line)");
-
- options.addOption(OptionBuilder.withLongOpt("trace")
- .withDescription("trace logging specified categories: RAW, FRM")
- .hasArg(true)
- .withArgName("TRACE")
- .create('t'));
- if(hasSizeOption())
- options.addOption(OptionBuilder.withLongOpt("message-size")
- .withDescription( "size to pad outgoing messages to" )
- .hasArg(true)
- .withArgName("SIZE")
- .create('z'));
-
- if(hasResponseQueueOption())
- options.addOption(OptionBuilder.withLongOpt("response-queue")
- .withDescription( "response queue to reply to" )
- .hasArg(true)
- .withArgName("RESPONSE_QUEUE")
- .create('r'));
-
- if(hasLinkNameOption())
- {
- options.addOption(OptionBuilder.withLongOpt("link")
- .withDescription( "link name" )
- .hasArg(true)
- .withArgName("LINK")
- .create('l'));
- }
-
- if(hasWindowSizeOption())
- {
- options.addOption(OptionBuilder.withLongOpt("window-size")
- .withDescription("credit window size")
- .hasArg(true)
- .withArgName("WINDOW-SIZE")
- .create('W'));
- }
-
- CommandLine cmdLine = null;
- try
- {
- cmdLine = cmdLineParse.parse(options, args);
-
- }
- catch (ParseException e)
- {
- printUsage(options);
- System.exit(-1);
- }
-
- if(cmdLine.hasOption('h') || cmdLine.getArgList().isEmpty())
- {
- printUsage(options);
- System.exit(0);
- }
- _host = cmdLine.getOptionValue('H',"0.0.0.0");
- _remoteHost = cmdLine.getOptionValue('O',null);
- String portStr = cmdLine.getOptionValue('p',"5672");
- String countStr = cmdLine.getOptionValue('c',"1");
-
- _useSSL = cmdLine.hasOption('S');
-
- if(hasWindowSizeOption())
- {
- String windowSizeStr = cmdLine.getOptionValue('W',"100");
- _windowSize = Integer.parseInt(windowSizeStr);
- }
-
- if(hasSubjectOption())
- {
- _subject = cmdLine.getOptionValue('s');
- }
-
- if(cmdLine.hasOption('u'))
- {
- _username = cmdLine.getOptionValue('u');
- }
-
- if(cmdLine.hasOption('w'))
- {
- _password = cmdLine.getOptionValue('w');
- }
-
- if(cmdLine.hasOption('F'))
- {
- _filter = cmdLine.getOptionValue('F');
- }
-
- _port = Integer.parseInt(portStr);
-
- _containerName = cmdLine.getOptionValue('C');
-
- if(hasBlockOption())
- _block = cmdLine.hasOption('b');
-
- if(hasLinkNameOption())
- _linkName = cmdLine.getOptionValue('l');
-
-
- if(hasLinkDurableOption())
- _durableLink = cmdLine.hasOption('d');
-
- if(hasCountOption())
- _count = Integer.parseInt(countStr);
-
- if(hasStdInOption())
- _useStdIn = cmdLine.hasOption('i');
-
- if(hasSingleLinkPerConnectionMode())
- _useMultipleConnections = cmdLine.hasOption('Z');
-
- if(hasTxnOption())
- {
- _useTran = cmdLine.hasOption('x');
- _batchSize = Integer.parseInt(cmdLine.getOptionValue('B',"1"));
- _rollbackRatio = Double.parseDouble(cmdLine.getOptionValue('R',"0"));
- }
-
- if(hasModeOption())
- {
- _mode = AcknowledgeMode.ALO;
-
- if(cmdLine.hasOption('k'))
- {
- _mode = AcknowledgeMode.valueOf(cmdLine.getOptionValue('k'));
- }
- }
-
- if(hasResponseQueueOption())
- {
- _responseQueue = cmdLine.getOptionValue('r');
- }
-
- _frameSize = Integer.parseInt(cmdLine.getOptionValue('f',"65536"));
-
- if(hasSizeOption())
- {
- _messageSize = Integer.parseInt(cmdLine.getOptionValue('z',"-1"));
- }
-
- String categoriesList = cmdLine.getOptionValue('t');
- String[]categories = categoriesList == null ? new String[0] : categoriesList.split("[, ]");
- for(String cat : categories)
- {
- if(cat.equalsIgnoreCase("FRM"))
- {
- FRAME_LOGGER.setLevel(Level.FINE);
- Formatter formatter = new Formatter()
- {
- @Override
- public String format(final LogRecord record)
- {
- return "[" + record.getMillis() + " FRM]\t" + record.getMessage() + "\n";
- }
- };
- for(Handler handler : FRAME_LOGGER.getHandlers())
- {
- FRAME_LOGGER.removeHandler(handler);
- }
- Handler handler = new ConsoleHandler();
- handler.setLevel(Level.FINE);
- handler.setFormatter(formatter);
- FRAME_LOGGER.addHandler(handler);
- }
- else if (cat.equalsIgnoreCase("RAW"))
- {
- RAW_LOGGER.setLevel(Level.FINE);
- Formatter formatter = new Formatter()
- {
- @Override
- public String format(final LogRecord record)
- {
- return "[" + record.getMillis() + " RAW]\t" + record.getMessage() + "\n";
- }
- };
- for(Handler handler : RAW_LOGGER.getHandlers())
- {
- RAW_LOGGER.removeHandler(handler);
- }
- Handler handler = new ConsoleHandler();
- handler.setLevel(Level.FINE);
- handler.setFormatter(formatter);
- RAW_LOGGER.addHandler(handler);
- }
- }
-
-
- _args = cmdLine.getArgs();
-
- }
-
- protected boolean hasFilterOption()
- {
- return false;
- }
-
- protected boolean hasSubjectOption()
- {
- return false;
- }
-
- protected boolean hasWindowSizeOption()
- {
- return false;
- }
-
- protected boolean hasSingleLinkPerConnectionMode()
- {
- return false;
- }
-
- protected abstract boolean hasLinkDurableOption();
-
- protected abstract boolean hasLinkNameOption();
-
- protected abstract boolean hasResponseQueueOption();
-
- protected abstract boolean hasSizeOption();
-
- protected abstract boolean hasBlockOption();
-
- protected abstract boolean hasStdInOption();
-
- protected abstract boolean hasTxnOption();
-
- protected abstract boolean hasModeOption();
-
- protected abstract boolean hasCountOption();
-
- public String getHost()
- {
- return _host;
- }
-
- public String getUsername()
- {
- return _username;
- }
-
- public String getPassword()
- {
- return _password;
- }
-
- public int getPort()
- {
- return _port;
- }
-
- public int getCount()
- {
- return _count;
- }
-
- public boolean useStdIn()
- {
- return _useStdIn;
- }
-
- public boolean useTran()
- {
- return _useTran;
- }
-
- public AcknowledgeMode getMode()
- {
- return _mode;
- }
-
- public boolean isBlock()
- {
- return _block;
- }
-
- public String[] getArgs()
- {
- return _args;
- }
-
- public int getMessageSize()
- {
- return _messageSize;
- }
-
- public String getResponseQueue()
- {
- return _responseQueue;
- }
-
- public int getBatchSize()
- {
- return _batchSize;
- }
-
- public double getRollbackRatio()
- {
- return _rollbackRatio;
- }
-
- public String getLinkName()
- {
- return _linkName;
- }
-
- public boolean isDurableLink()
- {
- return _durableLink;
- }
-
- public boolean isUseMultipleConnections()
- {
- return _useMultipleConnections;
- }
-
- public void setUseMultipleConnections(boolean useMultipleConnections)
- {
- _useMultipleConnections = useMultipleConnections;
- }
-
- public String getSubject()
- {
- return _subject;
- }
-
- public void setSubject(String subject)
- {
- _subject = subject;
- }
-
- protected abstract void printUsage(final Options options);
-
- protected abstract void run();
-
-
- public Connection newConnection() throws Connection.ConnectionException
- {
- Container container = getContainerName() == null ? new Container() : new Container(getContainerName());
- return getUsername() == null ? new Connection(getHost(), getPort(), null, null, _frameSize, container,
- _remoteHost == null ? getHost() : _remoteHost, _useSSL)
- : new Connection(getHost(), getPort(), getUsername(), getPassword(), _frameSize,
- container, _remoteHost == null ? getHost() : _remoteHost, _useSSL);
- }
-
- public String getContainerName()
- {
- return _containerName;
- }
-
- public int getWindowSize()
- {
- return _windowSize;
- }
-
- public void setWindowSize(int windowSize)
- {
- _windowSize = windowSize;
- }
-
- public String getFilter()
- {
- return _filter;
- }
-}