summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
blob: 893813041715de9794097987fa969602e99ef04f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.client.message;

import java.io.IOException;
import java.nio.charset.Charset;

import javax.jms.JMSException;
import javax.jms.MessageEOFException;

import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;

/**
 * @author Apache Software Foundation
 */
public abstract class AbstractBytesMessage extends AbstractJMSMessage
{

    /**
     * The default initial size of the buffer. The buffer expands automatically.
     */
    private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024;

    AbstractBytesMessage()
    {
        this(null);
    }

    /**
     * Construct a bytes message with existing data.
     *
     * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
     *             set to auto expand
     */
    AbstractBytesMessage(ByteBuffer data)
    {
        super(data); // this instanties a content header
        getContentHeaderProperties().setContentType(getMimeTypeAsShortString());

        if (_data == null)
        {
            allocateInitialBuffer();
        }
    }

    protected void allocateInitialBuffer()
    {
        _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE);
        _data.setAutoExpand(true);
    }

    AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
                         AMQShortString routingKey, ByteBuffer data) throws AMQException
    {
        // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
        super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
        getContentHeaderProperties().setContentType(getMimeTypeAsShortString());
    }

    public void clearBodyImpl() throws JMSException
    {
        allocateInitialBuffer();
    }

    public String toBodyString() throws JMSException
    {
        checkReadable();
        try
        {
            return getText();
        }
        catch (IOException e)
        {
            throw new JMSException(e.toString());
        }
    }

    /**
     * We reset the stream before and after reading the data. This means that toString() will always output
     * the entire message and also that the caller can then immediately start reading as if toString() had
     * never been called.
     *
     * @return
     * @throws IOException
     */
    private String getText() throws IOException
    {
        // this will use the default platform encoding
        if (_data == null)
        {
            return null;
        }
        int pos = _data.position();
        _data.rewind();
        // one byte left is for the end of frame marker
        if (_data.remaining() == 0)
        {
            // this is really redundant since pos must be zero
            _data.position(pos);
            return null;
        }
        else
        {
            String data = _data.getString(Charset.forName("UTF8").newDecoder());
            _data.position(pos);
            return data;
        }
    }

    /**
     * Check that there is at least a certain number of bytes available to read
     *
     * @param len the number of bytes
     * @throws javax.jms.MessageEOFException if there are less than len bytes available to read
     */
    protected void checkAvailable(int len) throws MessageEOFException
    {
        if (_data.remaining() < len)
        {
            throw new MessageEOFException("Unable to read " + len + " bytes");
        }
    }
}