/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.amqp_1_0.client; import org.apache.qpid.amqp_1_0.type.*; import org.apache.qpid.amqp_1_0.type.messaging.*; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import java.io.*; import java.nio.ByteBuffer; import java.util.*; public class Filereceiver extends Util { private static final String USAGE_STRING = "filereceiver [options]
\n\nOptions:"; protected Filereceiver(String[] args) { super(args); } @Override protected boolean hasLinkDurableOption() { return true; } @Override protected boolean hasLinkNameOption() { return true; } @Override protected boolean hasResponseQueueOption() { return false; } @Override protected boolean hasSizeOption() { return false; } @Override protected boolean hasBlockOption() { return true; } @Override protected boolean hasStdInOption() { return false; } @Override protected boolean hasTxnOption() { return false; } @Override protected boolean hasModeOption() { return false; } @Override protected boolean hasCountOption() { return false; } @Override protected void printUsage(Options options) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(USAGE_STRING, options ); } @Override protected void run() { final String queue = getArgs()[0]; final String directoryName = getArgs()[1]; try { Connection conn = newConnection(); Session session = conn.createSession(); final File directory = new File(directoryName); if(directory.isDirectory() && directory.canWrite()) { File tmpDirectory = new File(directoryName, ".tmp"); if(!tmpDirectory.exists()) { tmpDirectory.mkdir(); } String[] unsettledFiles = tmpDirectory.list(); Map unsettled = new HashMap(); final Map unsettledFileNames = new HashMap(); Accepted accepted = new Accepted(); for(String fileName : unsettledFiles) { File theFile = new File(tmpDirectory, fileName); if(theFile.isFile()) { if(fileName.startsWith("~") && fileName.endsWith("~")) { theFile.delete(); } else { int splitPoint = fileName.indexOf("."); String deliveryTagStr = fileName.substring(0,splitPoint); String actualFileName = fileName.substring(splitPoint+1); byte[] bytes = new byte[deliveryTagStr.length()/2]; for(int i = 0; i < bytes.length; i++) { char c = deliveryTagStr.charAt(2*i); char d = deliveryTagStr.charAt(1+(2*i)); bytes[i] = (byte) (((c <= '9' ? c - '0' : c - 'W') << 4) | (d <= '9' ? d - '0' : d - 'W')); } Binary deliveryTag = new Binary(bytes); unsettled.put(deliveryTag, accepted); unsettledFileNames.put(deliveryTag, fileName); } } } Receiver r = session.createReceiver(queue, AcknowledgeMode.EO, getLinkName(), isDurableLink(), unsettled); Map remoteUnsettled = r.getRemoteUnsettled(); for(Map.Entry entry : unsettledFileNames.entrySet()) { if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey())) { File tmpFile = new File(tmpDirectory, entry.getValue()); final File dest = new File(directory, entry.getValue().substring(entry.getValue().indexOf(".") + 1)); if(dest.exists()) { System.err.println("Duplicate detected - filename " + dest.getName()); } tmpFile.renameTo(dest); } } int credit = 10; r.setCredit(UnsignedInteger.valueOf(credit), true); int received = 0; Message m = null; do { m = isBlock() && received == 0 ? r.receive() : r.receive(10000); if(m != null) { if(m.isResume() && unsettled.containsKey(m.getDeliveryTag())) { final String tmpFileName = unsettledFileNames.get(m.getDeliveryTag()); final File unsettledFile = new File(tmpDirectory, tmpFileName); r.acknowledge(m, new Receiver.SettledAction() { public void onSettled(final Binary deliveryTag) { int splitPoint = tmpFileName.indexOf("."); String fileName = tmpFileName.substring(splitPoint+1); final File dest = new File(directory, fileName); if(dest.exists()) { System.err.println("Duplicate detected - filename " + dest.getName()); } unsettledFile.renameTo(dest); unsettledFileNames.remove(deliveryTag); } }); } else { received++; List
sections = m.getPayload(); Binary deliveryTag = m.getDeliveryTag(); StringBuilder tagNameBuilder = new StringBuilder(); ByteBuffer dtbuf = deliveryTag.asByteBuffer(); while(dtbuf.hasRemaining()) { tagNameBuilder.append(String.format("%02x", dtbuf.get())); } ApplicationProperties properties = null; List data = new ArrayList(); int totalSize = 0; for(Section section : sections) { if(section instanceof ApplicationProperties) { properties = (ApplicationProperties) section; } else if(section instanceof AmqpValue) { AmqpValue value = (AmqpValue) section; if(value.getValue() instanceof Binary) { Binary binary = (Binary) value.getValue(); data.add(binary); totalSize += binary.getLength(); } else { // TODO exception } } else if(section instanceof Data) { Data value = (Data) section; Binary binary = value.getValue(); data.add(binary); totalSize += binary.getLength(); } } if(properties != null) { final String fileName = (String) properties.getValue().get("filename"); byte[] fileData = new byte[totalSize]; ByteBuffer buf = ByteBuffer.wrap(fileData); int offset = 0; for(Binary bin : data) { buf.put(bin.asByteBuffer()); } File outputFile = new File(tmpDirectory, "~"+fileName+"~"); if(outputFile.exists()) { outputFile.delete(); } FileOutputStream fos = new FileOutputStream(outputFile); fos.write(fileData); fos.flush(); fos.close(); final File unsettledFile = new File(tmpDirectory, tagNameBuilder.toString() + "." + fileName); outputFile.renameTo(unsettledFile); r.acknowledge(m, new Receiver.SettledAction() { public void onSettled(final Binary deliveryTag) { final File dest = new File(directory, fileName); if(dest.exists()) { System.err.println("Duplicate detected - filename " + dest.getName()); } unsettledFile.renameTo(dest); } }); } } } } while(m != null); r.close(); } else { System.err.println("No such directory: " + directoryName); } session.close(); conn.close(); } catch (Connection.ConnectionException e) { e.printStackTrace(); } catch (FileNotFoundException e) { e.printStackTrace(); //TODO. } catch (IOException e) { e.printStackTrace(); //TODO. } catch (AmqpErrorException e) { e.printStackTrace(); //TODO. } } public static void main(String[] args) { new Filereceiver(args).run(); } }