summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/FileMessage.java
blob: 179c91c2e96d4c61d5975035f0c1bc9fd6dbc3f8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package org.apache.qpid.nclient.util;

import java.io.EOFException;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;

import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.api.Message;

/**
 * FileMessage provides pull style semantics for
 * larges messages backed by a disk. 
 * Instead of loading all data into memeory it uses
 * FileChannel to map regions of the file into memeory
 * at a time.
 * 
 * The write methods are not supported. 
 * 
 * From the standpoint of performance it is generally 
 * only worth mapping relatively large files into memory.
 *    
 * FileMessage msg = new FileMessage(in,delProps,msgProps);
 * session.messageTransfer(dest,msg,0,0);
 * 
 * The messageTransfer method will read the file in chunks
 * and stream it.
 *
 */
public class FileMessage extends ReadOnlyMessage implements Message
{
    private FileChannel _fileChannel;
    private int _chunkSize;
    private long _fileSize;
    private long _pos = 0;
    
    public FileMessage(FileInputStream in,int chunkSize,DeliveryProperties deliveryProperties,MessageProperties messageProperties)throws IOException
    {
        _messageProperties = messageProperties;
        _deliveryProperties = deliveryProperties;
        
        _fileChannel = in.getChannel();
        _chunkSize = chunkSize;
        _fileSize = _fileChannel.size();
        
        if (_fileSize <= _chunkSize)
        {
            _chunkSize = (int)_fileSize;
        }
    }

    public void setHeader(Header header) {
        //To change body of implemented methods use File | Settings | File Templates.
    }

    public Header getHeader() {
        return null;  //To change body of implemented methods use File | Settings | File Templates.
    }

    public void readData(byte[] target) throws IOException
    {        
        throw new UnsupportedOperationException();              
    }
    
    public ByteBuffer readData() throws IOException
    {
        if (_pos == _fileSize)
        {
            throw new EOFException();
        }
        
        if (_pos + _chunkSize > _fileSize)
        {
            _chunkSize = (int)(_fileSize - _pos);
        }
        MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, _chunkSize);        
        _pos += _chunkSize;
        return bb;
    }

    /**
     * This message is used by an application user to
     * provide data to the client library using pull style
     * semantics. Since the message is not transfered yet, it
     * does not have a transfer id. Hence this method is not
     * applicable to this implementation.    
     */
    public int getMessageTransferId()
    {
        throw new UnsupportedOperationException();
    }
}