summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
blob: c10c77ae18ff6086f217a31cd3ff01965c6a7297 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#ifndef QPID_CLIENT_AMQP0_10_SENDERIMPL_H
#define QPID_CLIENT_AMQP0_10_SENDERIMPL_H

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/SenderImpl.h"
#include "qpid/client/AsyncSession.h"
#include "qpid/client/amqp0_10/SessionImpl.h"
#include <memory>
#include <boost/intrusive_ptr.hpp>
#include <boost/ptr_container/ptr_deque.hpp>

namespace qpid {
namespace client {
namespace amqp0_10 {

class AddressResolution;
class MessageSink;
struct OutgoingMessage;

/**
 *
 */
class SenderImpl : public qpid::messaging::SenderImpl
{
  public:
    enum State {UNRESOLVED, ACTIVE, CANCELLED};

    SenderImpl(SessionImpl& parent, const std::string& name, 
               const qpid::messaging::Address& address);
    void send(const qpid::messaging::Message&, bool sync);
    void close();
    void setCapacity(uint32_t);
    uint32_t getCapacity();
    uint32_t getUnsettled();
    void init(qpid::client::AsyncSession, AddressResolution&);
    const std::string& getName() const;
    qpid::messaging::Session getSession() const;

  private:
    mutable sys::Mutex lock;
    boost::intrusive_ptr<SessionImpl> parent;
    const std::string name;
    const qpid::messaging::Address address;
    State state;
    std::auto_ptr<MessageSink> sink;

    qpid::client::AsyncSession session;
    std::string destination;
    std::string routingKey;

    typedef boost::ptr_deque<OutgoingMessage> OutgoingMessages;
    OutgoingMessages outgoing;
    uint32_t capacity;
    uint32_t window;
    bool flushed;
    const bool unreliable;

    uint32_t checkPendingSends(bool flush);
    // Dummy ScopedLock parameter means call with lock held
    uint32_t checkPendingSends(bool flush, const sys::Mutex::ScopedLock&);
    void replay(const sys::Mutex::ScopedLock&); 
    void waitForCapacity();

    //logic for application visible methods:
    void sendImpl(const qpid::messaging::Message&);
    void sendUnreliable(const qpid::messaging::Message&);
    void closeImpl();


    //functors for application visible methods (allowing locking and
    //retry to be centralised):
    struct Command
    {
        SenderImpl& impl;

        Command(SenderImpl& i) : impl(i) {}
    };

    struct Send : Command
    {
        const qpid::messaging::Message* message;
        bool repeat;

        Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m), repeat(true) {}
        void operator()() 
        {
            impl.waitForCapacity();
            //from this point message will be recorded if there is any
            //failure (and replayed) so need not repeat the call
            repeat = false;
            impl.sendImpl(*message);
        }
    };

    struct UnreliableSend : Command
    {
        const qpid::messaging::Message* message;

        UnreliableSend(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {}
        void operator()() 
        {
            //TODO: ideally want to put messages on the outbound
            //queue and pull them off in io thread, but the old
            //0-10 client doesn't support that option so for now
            //we simply don't queue unreliable messages
            impl.sendUnreliable(*message);                
        }
    };

    struct Close : Command
    {
        Close(SenderImpl& i) : Command(i) {}
        void operator()() { impl.closeImpl(); }
    };

    struct CheckPendingSends : Command
    {
        bool flush;
        uint32_t pending;
        CheckPendingSends(SenderImpl& i, bool f) : Command(i), flush(f), pending(0) {}
        void operator()() { pending = impl.checkPendingSends(flush); }
    };

    //helper templates for some common patterns
    template <class F> void execute()
    {
        F f(*this);
        parent->execute(f);
    }
    
    template <class F, class P> bool execute1(P p)
    {
        F f(*this, p);
        return parent->execute(f);
    }    
};
}}} // namespace qpid::client::amqp0_10

#endif  /*!QPID_CLIENT_AMQP0_10_SENDERIMPL_H*/