summaryrefslogtreecommitdiff
path: root/client/test/src/org/apache/qpid/example/publisher/FileMessageDispatcher.java
blob: 4dd58cf4a1207b9956b20f3debfee0f787cbafa6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package org.apache.qpid.example.publisher;

import org.apache.log4j.Logger;
import java.util.Properties;
import java.io.File;

import org.apache.qpid.example.shared.FileUtils;
import org.apache.qpid.example.shared.Statics;

import javax.jms.JMSException;

/**
 * Class that sends message files to the Publisher to distribute
 * using files as input
 * Must set system properties for host etc or amend and use config props
 * Author: Marnie McCormack
 * Date: 20-Jul-2006
 * Time: 09:56:56
 * Copyright JPMorgan Chase 2006
 */
public class FileMessageDispatcher {

    private static final Logger _logger = Logger.getLogger(FileMessageDispatcher.class);

    private static Publisher _publisher = null;

    private static final String DEFAULT_PUB_NAME = "Publisher";

    public static void main(String[] args)
    {

        //Check command line args ok - must provide a path or file for us to run
        if (args.length == 0)
        {
            System.err.println("Usage: FileMessageDispatcher <filesToDispatch>" + "");
        }
        else
        {
            try
            {
                //publish message(s) from file(s) and send message to monitor queue
                publish(args[0]);

                //Move payload file(s) to archive location as no error
                FileUtils.moveFileToNewDir(args[0], System.getProperties().getProperty(Statics.ARCHIVE_PATH));
            }
            catch(Exception e)
            {
                System.err.println("Error trying to dispatch message: " + e);
                System.exit(1);
            }
            finally
            {
                //clean up before exiting
                if (getPublisher() != null)
                {
                    getPublisher().cleanup();
                }
            }
        }

        if (_logger.isDebugEnabled())
        {
            _logger.debug("Finished dispatching message");
        }

        System.exit(0);
    }


    //Publish files or file as message
    public static void publish(String path) throws JMSException, MessageFactoryException
    {
        File tempFile = new File(path);
        if (tempFile.isDirectory())
        {
            //while more files in dir publish them
            File[] files = tempFile.listFiles();

            if (files == null || files.length == 0)
            {
                _logger.info("FileMessageDispatcher - No files to publish in input directory: " + tempFile);
            }
            else
            {
                for (File file : files)
                {
                    //Create message factory passing in payload path
                    MessageFactory factory = new MessageFactory(getPublisher().getSession(), file.toString());

                    //Send the message generated from the payload using the _publisher
                    getPublisher().sendMessage(factory.createEventMessage());

                }
            }
        }
        else
        {
            //handle as single file
            //Create message factory passing in payload path
            MessageFactory factory = new MessageFactory(getPublisher().getSession(),tempFile.toString());

            //Send the message generated from the payload using the _publisher
            getPublisher().sendMessage(factory.createEventMessage());
        }
    }

    //cleanup publishers
    public static void cleanup()
    {
        if (getPublisher() != null)
        {
            getPublisher().cleanup();
        }
    }

    /*
     * Returns a _publisher for a queue
     * Using system properties to get connection info for now
     * Must set using -D the host, client, queue, user, pwd, virtual path, archive path
     */
    private static Publisher getPublisher()
    {
       if (_publisher != null)
       {
           return _publisher;
       }

       //Create _publisher using system properties
       Properties props = System.getProperties();

       //Create a _publisher using failover details
       _publisher = new Publisher(props.getProperty(Statics.HOST_PROPERTY),
                               props.getProperty(Statics.CLIENT_PROPERTY), props.getProperty(Statics.QUEUE_PROPERTY),
                               props.getProperty(Statics.USER_PROPERTY), props.getProperty(Statics.PWD_PROPERTY),
                               props.getProperty(Statics.VIRTUAL_PATH_PROPERTY), props.getProperty(Statics.ARCHIVE_PATH));

       _publisher.setName(DEFAULT_PUB_NAME);
       return _publisher;
    }

}