summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
blob: 3a0d865876f7ca03fbb94e0806c62c68cf67f444 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.store;

import org.apache.commons.configuration.Configuration;

import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;

/**
 * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages, queues
 * and exchanges in a transactional manner.
 *
 * <p/>All message store, remove, enqueue and dequeue operations are carried out against a {@link StoreContext} which
 * encapsulates the transactional context they are performed in. Many such operations can be carried out in a single
 * transaction.
 *
 * <p/>The storage and removal of queues and exchanges, are not carried out in a transactional context.
 *
 * <p/><table id="crc"><caption>CRC Card</caption>
 * <tr><th> Responsibilities
 * <tr><td> Accept transaction boundary demarcations: Begin, Commit, Abort.
 * <tr><td> Store and remove queues.
 * <tr><td> Store and remove exchanges.
 * <tr><td> Store and remove messages.
 * <tr><td> Bind and unbind queues to exchanges.
 * <tr><td> Enqueue and dequeue messages to queues.
 * <tr><td> Generate message identifiers.
 * </table>
 */
public interface MessageStore
{
    /**
     * Called after instantiation in order to configure the message store. A particular implementation can define
     * whatever parameters it wants.
     *
     * @param virtualHost The virtual host using by this store
     * @param base        The base element identifier from which all configuration items are relative. For example, if
     *                    the base element is "store", the all elements used by concrete classes will be "store.foo" etc.
     * @param config      The apache commons configuration object.
     *
     * @throws Exception If any error occurs that means the store is unable to configure itself.
     */
    void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception;

    /**
     * Called to close and cleanup any resources used by the message store.
     *
     * @throws Exception If the close fails.
     */
    void close() throws Exception;

    /**
     * Removes the specified message from the store in the given transactional store context.
     *
     * @param storeContext The transactional context to remove the message in.
     * @param messageId    Identifies the message to remove.
     *
     * @throws AMQException If the operation fails for any reason.
     */
    void removeMessage(StoreContext storeContext, Long messageId) throws AMQException;

    /**
     * Makes the specified exchange persistent.
     *
     * @param exchange The exchange to persist.
     *
     * @throws AMQException If the operation fails for any reason.
     */
    void createExchange(Exchange exchange) throws AMQException;

    /**
     * Removes the specified persistent exchange.
     *
     * @param exchange The exchange to remove.
     *
     * @throws AMQException If the operation fails for any reason.
     */
    void removeExchange(Exchange exchange) throws AMQException;

    /**
     * Binds the specified queue to an exchange with a routing key.
     *
     * @param exchange   The exchange to bind to.
     * @param routingKey The routing key to bind by.
     * @param queue      The queue to bind.
     * @param args       Additional parameters.
     *
     * @throws AMQException If the operation fails for any reason.
     */
    void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;

    /**
     * Unbinds the specified from an exchange under a particular routing key.
     *
     * @param exchange   The exchange to unbind from.
     * @param routingKey The routing key to unbind.
     * @param queue      The queue to unbind.
     * @param args       Additonal parameters.
     *
     * @throws AMQException If the operation fails for any reason.
     */
    void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;

    /**
     * Makes the specified queue persistent.
     *
     * @param queue The queue to store.
     *
     * @throws AMQException If the operation fails for any reason.
     */
    void createQueue(AMQQueue queue) throws AMQException;

    /**
     * Removes the specified queue from the persistent store.
     *
     * @param name The queue to remove.
     *
     * @throws AMQException If the operation fails for any reason.
     */
    void removeQueue(AMQShortString name) throws AMQException;

    /**
     * Places a message onto a specified queue, in a given transactional context.
     *
     * @param context   The transactional context for the operation.
     * @param name      The name of the queue to place the message on.
     * @param messageId The message to enqueue.
     *
     * @throws AMQException If the operation fails for any reason.
     */
    void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException;

    /**
     * Extracts a message from a specified queue, in a given transactional context.
     *
     * @param context   The transactional context for the operation.
     * @param name      The name of the queue to take the message from.
     * @param messageId The message to dequeue.
     *
     * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
     */
    void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException;

    /**
     * Begins a transactional context.
     *
     * @param context The transactional context to begin.
     *
     * @throws AMQException If the operation fails for any reason.
     */
    void beginTran(StoreContext context) throws AMQException;

    /**
     * Commits all operations performed within a given transactional context.
     *
     * @param context The transactional context to commit all operations for.
     *
     * @throws AMQException If the operation fails for any reason.
     */
    void commitTran(StoreContext context) throws AMQException;

    /**
     * Abandons all operations performed within a given transactional context.
     *
     * @param context The transactional context to abandon.
     *
     * @throws AMQException If the operation fails for any reason.
     */
    void abortTran(StoreContext context) throws AMQException;

    /**
     * Tests a transactional context to see if it has been begun but not yet committed or aborted.
     *
     * @param context The transactional context to test.
     *
     * @return <tt>true</tt> if the transactional context is live, <tt>false</tt> otherwise.
     */
    boolean inTran(StoreContext context);

    /**
     * Return a valid, currently unused message id.
     *
     * @return A fresh message id.
     */
    Long getNewMessageId();

    /**
     * Stores a chunk of message data.
     *
     * @param context         The transactional context for the operation.
     * @param messageId       The message to store the data for.
     * @param index           The index of the data chunk.
     * @param contentBody     The content of the data chunk.
     * @param lastContentBody Flag to indicate that this is the last such chunk for the message.
     *
     * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
     */
    void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody,
        boolean lastContentBody) throws AMQException;

    /**
     * Stores message meta-data.
     *
     * @param context         The transactional context for the operation.
     * @param messageId       The message to store the data for.
     * @param messageMetaData The message meta data to store.
     *
     * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
     */
    void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException;

    /**
     * Retrieves message meta-data.
     *
     * @param context   The transactional context for the operation.
     * @param messageId The message to get the meta-data for.
     *
     * @return The message meta data.
     *
     * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
     */
    MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;

    /**
     * Retrieves a chunk of message data.
     *
     * @param context   The transactional context for the operation.
     * @param messageId The message to get the data chunk for.
     * @param index     The offset index of the data chunk within the message.
     *
     * @return A chunk of message data.
     *
     * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
     */
    ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
}