diff options
Diffstat (limited to 'java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java')
-rw-r--r-- | java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java | 347 |
1 files changed, 347 insertions, 0 deletions
diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java new file mode 100644 index 0000000000..4d98655ad2 --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java @@ -0,0 +1,347 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; + +import java.io.*; +import java.nio.ByteBuffer; +import java.util.*; + +public class Filereceiver extends Util +{ + private static final String USAGE_STRING = "filereceiver [options] <address> <directory>\n\nOptions:"; + + protected Filereceiver(String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return true; + } + + @Override + protected boolean hasLinkNameOption() + { + return true; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return false; + } + + @Override + protected boolean hasBlockOption() + { + return true; + } + + @Override + protected boolean hasStdInOption() + { + return false; + } + + @Override + protected boolean hasTxnOption() + { + return false; + } + + @Override + protected boolean hasModeOption() + { + return false; + } + + @Override + protected boolean hasCountOption() + { + return false; + } + + @Override + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + + } + + @Override + protected void run() + { + final String queue = getArgs()[0]; + final String directoryName = getArgs()[1]; + + try + { + Connection conn = newConnection(); + + Session session = conn.createSession(); + + final File directory = new File(directoryName); + if(directory.isDirectory() && directory.canWrite()) + { + File tmpDirectory = new File(directoryName, ".tmp"); + if(!tmpDirectory.exists()) + { + tmpDirectory.mkdir(); + } + + String[] unsettledFiles = tmpDirectory.list(); + + Map<Binary, Outcome> unsettled = new HashMap<Binary, Outcome>(); + final Map<Binary, String> unsettledFileNames = new HashMap<Binary, String>(); + + Accepted accepted = new Accepted(); + + for(String fileName : unsettledFiles) + { + File theFile = new File(tmpDirectory, fileName); + if(theFile.isFile()) + { + if(fileName.startsWith("~") && fileName.endsWith("~")) + { + theFile.delete(); + } + else + { + int splitPoint = fileName.indexOf("."); + String deliveryTagStr = fileName.substring(0,splitPoint); + String actualFileName = fileName.substring(splitPoint+1); + + byte[] bytes = new byte[deliveryTagStr.length()/2]; + + + for(int i = 0; i < bytes.length; i++) + { + char c = deliveryTagStr.charAt(2*i); + char d = deliveryTagStr.charAt(1+(2*i)); + + bytes[i] = (byte) (((c <= '9' ? c - '0' : c - 'W') << 4) + | (d <= '9' ? d - '0' : d - 'W')); + + } + Binary deliveryTag = new Binary(bytes); + unsettled.put(deliveryTag, accepted); + unsettledFileNames.put(deliveryTag, fileName); + } + } + + } + + Receiver r = session.createReceiver(queue, AcknowledgeMode.EO, getLinkName(), isDurableLink(), + unsettled); + + Map<Binary, Outcome> remoteUnsettled = r.getRemoteUnsettled(); + + for(Map.Entry<Binary, String> entry : unsettledFileNames.entrySet()) + { + if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey())) + { + + File tmpFile = new File(tmpDirectory, entry.getValue()); + final File dest = new File(directory, + entry.getValue().substring(entry.getValue().indexOf(".") + 1)); + if(dest.exists()) + { + System.err.println("Duplicate detected - filename " + dest.getName()); + } + + tmpFile.renameTo(dest); + } + } + + + int credit = 10; + + r.setCredit(UnsignedInteger.valueOf(credit), true); + + + int received = 0; + Message m = null; + do + { + m = isBlock() && received == 0 ? r.receive() : r.receive(10000); + if(m != null) + { + if(m.isResume() && unsettled.containsKey(m.getDeliveryTag())) + { + final String tmpFileName = unsettledFileNames.get(m.getDeliveryTag()); + final File unsettledFile = new File(tmpDirectory, + tmpFileName); + r.acknowledge(m, new Receiver.SettledAction() + { + public void onSettled(final Binary deliveryTag) + { + int splitPoint = tmpFileName.indexOf("."); + + String fileName = tmpFileName.substring(splitPoint+1); + + final File dest = new File(directory, fileName); + if(dest.exists()) + { + System.err.println("Duplicate detected - filename " + dest.getName()); + } + unsettledFile.renameTo(dest); + unsettledFileNames.remove(deliveryTag); + } + }); + } + else + { + received++; + List<Section> sections = m.getPayload(); + Binary deliveryTag = m.getDeliveryTag(); + StringBuilder tagNameBuilder = new StringBuilder(); + + ByteBuffer dtbuf = deliveryTag.asByteBuffer(); + while(dtbuf.hasRemaining()) + { + tagNameBuilder.append(String.format("%02x", dtbuf.get())); + } + + + ApplicationProperties properties = null; + List<Binary> data = new ArrayList<Binary>(); + int totalSize = 0; + for(Section section : sections) + { + if(section instanceof ApplicationProperties) + { + properties = (ApplicationProperties) section; + } + else if(section instanceof AmqpValue) + { + AmqpValue value = (AmqpValue) section; + if(value.getValue() instanceof Binary) + { + Binary binary = (Binary) value.getValue(); + data.add(binary); + totalSize += binary.getLength(); + + } + else + { + // TODO exception + } + } + else if(section instanceof Data) + { + Data value = (Data) section; + Binary binary = value.getValue(); + data.add(binary); + totalSize += binary.getLength(); + + } + } + if(properties != null) + { + final String fileName = (String) properties.getValue().get("filename"); + byte[] fileData = new byte[totalSize]; + ByteBuffer buf = ByteBuffer.wrap(fileData); + int offset = 0; + for(Binary bin : data) + { + buf.put(bin.asByteBuffer()); + } + File outputFile = new File(tmpDirectory, "~"+fileName+"~"); + if(outputFile.exists()) + { + outputFile.delete(); + } + FileOutputStream fos = new FileOutputStream(outputFile); + fos.write(fileData); + fos.flush(); + fos.close(); + + final File unsettledFile = new File(tmpDirectory, tagNameBuilder.toString() + "." + + fileName); + outputFile.renameTo(unsettledFile); + r.acknowledge(m, new Receiver.SettledAction() + { + public void onSettled(final Binary deliveryTag) + { + final File dest = new File(directory, fileName); + if(dest.exists()) + { + System.err.println("Duplicate detected - filename " + dest.getName()); + } + unsettledFile.renameTo(dest); + + } + }); + + } + } + } + } + while(m != null); + + + r.close(); + } + else + { + System.err.println("No such directory: " + directoryName); + } + session.close(); + conn.close(); + } + catch (Connection.ConnectionException e) + { + e.printStackTrace(); + } + catch (FileNotFoundException e) + { + e.printStackTrace(); //TODO. + } + catch (IOException e) + { + e.printStackTrace(); //TODO. + } + catch (AmqpErrorException e) + { + e.printStackTrace(); //TODO. + } + + } + + public static void main(String[] args) + { + new Filereceiver(args).run(); + } +} |