summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/messaging/Message.cpp
blob: 0f03bc8ca3013994501bec3d1c5df068df38c4e5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
#include "qpid/messaging/Message.h"
#include "qpid/messaging/MessageImpl.h"
#include "qpid/amqp_0_10/Codecs.h"
#include <qpid/Exception.h>
#include <boost/format.hpp>

namespace qpid {
namespace messaging {

using namespace qpid::types;

Message::Message(const std::string& bytes) : impl(new MessageImpl(bytes)) {}
Message::Message(const char* bytes, size_t count) : impl(new MessageImpl(bytes, count)) {}

Message::Message(const Message& m) : impl(new MessageImpl(*m.impl)) {}
Message::~Message() { delete impl; }

Message& Message::operator=(const Message& m) { *impl = *m.impl; return *this; }

void Message::setReplyTo(const Address& d) { impl->setReplyTo(d); }
const Address& Message::getReplyTo() const { return impl->getReplyTo(); }

void Message::setSubject(const std::string& s) { impl->setSubject(s); }
const std::string& Message::getSubject() const { return impl->getSubject(); }

void Message::setContentType(const std::string& s) { impl->setContentType(s); }
const std::string& Message::getContentType() const { return impl->getContentType(); }

void Message::setMessageId(const std::string& id) { impl->setMessageId(id); }
const std::string& Message::getMessageId() const { return impl->getMessageId(); }

void Message::setUserId(const std::string& id) { impl->setUserId(id); }
const std::string& Message::getUserId() const { return impl->getUserId(); }

void Message::setCorrelationId(const std::string& id) { impl->setCorrelationId(id); }
const std::string& Message::getCorrelationId() const { return impl->getCorrelationId(); }

uint8_t Message::getPriority() const { return impl->getPriority(); }
void Message::setPriority(uint8_t priority) { impl->setPriority(priority); }

void Message::setTtl(Duration ttl) { impl->setTtl(ttl.getMilliseconds()); }
Duration Message::getTtl() const { return Duration(impl->getTtl()); }

void Message::setDurable(bool durable) { impl->setDurable(durable); }
bool Message::getDurable() const { return impl->isDurable(); }

bool Message::getRedelivered() const { return impl->isRedelivered(); }
void Message::setRedelivered(bool redelivered) { impl->setRedelivered(redelivered); }

const Variant::Map& Message::getProperties() const { return impl->getHeaders(); }
Variant::Map& Message::getProperties() { return impl->getHeaders(); }
void Message::setProperty(const std::string& k, const qpid::types::Variant& v) { impl->setHeader(k,v); }

void Message::setContent(const std::string& c) { impl->setBytes(c); }
void Message::setContent(const char* chars, size_t count) { impl->setBytes(chars, count); }
std::string Message::getContent() const { return impl->getBytes(); }

const char* Message::getContentPtr() const
{
    return impl->getBytes().data();
}

size_t Message::getContentSize() const
{
    return impl->getBytes().size();
}

EncodingException::EncodingException(const std::string& msg) : qpid::types::Exception(msg) {}

const std::string BAD_ENCODING("Unsupported encoding: %1% (only %2% is supported at present).");

template <class C> struct MessageCodec
{
    static bool checkEncoding(const std::string& requested)
    {
        if (requested.size()) {
            if (requested == C::contentType) return true;
            else throw EncodingException((boost::format(BAD_ENCODING) % requested % C::contentType).str());
        } else {
            return false;
        }
    }
    
    /*
     * Currently only support a single encoding type for both list and
     * map, based on AMQP 0-10, though wider support is anticipated in the
     * future. This method simply checks that the desired encoding (if one
     * is specified, either through the message-content or through an
     * override) is indeed supported.
     */
    static void checkEncoding(const Message& message, const std::string& requested)
    {
        checkEncoding(requested) || checkEncoding(message.getContentType());
    }

    static void decode(const Message& message, typename C::ObjectType& object, const std::string& encoding)
    {
        checkEncoding(message, encoding);
        try {
            C::decode(message.getContent(), object);
        } catch (const qpid::Exception &ex) {
            throw EncodingException(ex.what());
        }
    }

    static void encode(const typename C::ObjectType& map, Message& message, const std::string& encoding)
    {
        checkEncoding(message, encoding);
        std::string content;
        C::encode(map, content);
        message.setContentType(C::contentType);
        message.setContent(content);
    }
};

void decode(const Message& message, Variant::Map& map, const std::string& encoding)
{
    MessageCodec<qpid::amqp_0_10::MapCodec>::decode(message, map, encoding);
}
void decode(const Message& message, Variant::List& list, const std::string& encoding)
{
    MessageCodec<qpid::amqp_0_10::ListCodec>::decode(message, list, encoding);
}
void encode(const Variant::Map& map, Message& message, const std::string& encoding)
{
    MessageCodec<qpid::amqp_0_10::MapCodec>::encode(map, message, encoding);
}
void encode(const Variant::List& list, Message& message, const std::string& encoding)
{
    MessageCodec<qpid::amqp_0_10::ListCodec>::encode(list, message, encoding);
}

}} // namespace qpid::messaging