summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
blob: 9e432235e6261083a7ee5dedf055d3427cc8c140 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#ifndef QPID_BROKER_AMQP_0_10_MESSAGETRANSFER_H
#define QPID_BROKER_AMQP_0_10_MESSAGETRANSFER_H

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/framing/FrameSet.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/PersistableMessage.h"
#include "qpid/types/Variant.h"

namespace qpid {
namespace broker {
class Queue;
namespace amqp_0_10 {

/**
 *
 */
class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::broker::PersistableMessage
{
  public:
    QPID_BROKER_EXTERN MessageTransfer();
    QPID_BROKER_EXTERN MessageTransfer(const qpid::framing::SequenceNumber&);

    std::string getRoutingKey() const;
    bool isPersistent() const;
    uint8_t getPriority() const;
    uint64_t getContentSize() const;
    std::string getPropertyAsString(const std::string& key) const;
    std::string getAnnotationAsString(const std::string& key) const;
    bool getTtl(uint64_t&) const;
    bool hasExpiration() const;
    std::string getExchangeName() const;
    void processProperties(MapHandler&) const;
    std::string getUserId() const;

    bool requiresAccept() const;
    const qpid::framing::SequenceNumber& getCommandId() const;
    QPID_BROKER_EXTERN qpid::framing::FrameSet& getFrames();
    QPID_BROKER_EXTERN const qpid::framing::FrameSet& getFrames() const;

    template <class T> const T* getProperties() const {
        const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
        return p->get<T>();
    }

    template <class T> const T* hasProperties() const {
        const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
        return p->get<T>();
    }
    template <class T> const T* getMethod() const {
        return frames.as<T>();
    }

    template <class T> T* getMethod() {
        return frames.as<T>();
    }

    template <class T> bool isA() const {
        return frames.isA<T>();
    }

    template <class T> void eraseProperties() {
        qpid::framing::AMQHeaderBody* p = frames.getHeaders();
        p->erase<T>();
    }
    std::string getContent() const;
    uint32_t getRequiredCredit() const;
    void computeRequiredCredit();

    void clearApplicationHeadersFlag();
    void sendContent(framing::FrameHandler& out, uint16_t maxFrameSize) const;
    void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize, bool redelivered, uint64_t ttl, uint64_t timestamp, const qpid::types::Variant::Map& annotations) const;

    void decodeHeader(framing::Buffer& buffer);
    void decodeContent(framing::Buffer& buffer);

    void encode(framing::Buffer& buffer) const;
    uint32_t encodedSize() const;

    /**
     * @returns the size of the buffer needed to encode the
     * 'header' of this message (not just the header frame,
     * but other meta data e.g.routing key and exchange)
     */
    uint32_t encodedHeaderSize() const;
    boost::intrusive_ptr<PersistableMessage> merge(const std::map<std::string, qpid::types::Variant>& annotations) const;

    QPID_BROKER_EXTERN bool isQMFv2() const;
    QPID_BROKER_EXTERN bool isLastQMFResponse(const std::string correlation) const;

    static bool isImmediateDeliveryRequired(const qpid::broker::Message& message);
    static MessageTransfer& get(qpid::broker::Message& message) {
        return *dynamic_cast<MessageTransfer*>(&message.getEncoding());
    }
    static const MessageTransfer& get(const qpid::broker::Message& message) {
        return *dynamic_cast<const MessageTransfer*>(&message.getEncoding());
    }
    QPID_BROKER_EXTERN static bool isQMFv2(const qpid::broker::Message& message);
    QPID_BROKER_EXTERN static bool isLastQMFResponse(const qpid::broker::Message& message, const std::string correlation);
  private:
    qpid::framing::FrameSet frames;
    uint32_t requiredCredit;
    bool cachedRequiredCredit;

    MessageTransfer(const qpid::framing::FrameSet&);
    void encodeHeader(framing::Buffer& buffer) const;
    uint32_t encodedContentSize() const;
    void encodeContent(framing::Buffer& buffer) const;
};
}}} // namespace qpid::broker::amqp_0_10

#endif  /*!QPID_BROKER_AMQP_0_10_MESSAGETRANSFER_H*/