summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl
blob: 03c5942d35e4e2502cad831eb74533fd7985383c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates.  All rights reserved.
%%

-module(rabbit_mqtt_retained_msg_store_dets).

-behaviour(rabbit_mqtt_retained_msg_store).
-include("rabbit_mqtt.hrl").

-export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]).

-record(store_state, {
  %% DETS table name
  table
}).


new(Dir, VHost) ->
  Tid = open_table(Dir, VHost),
  #store_state{table = Tid}.

recover(Dir, VHost) ->
  case open_table(Dir, VHost) of
    {error, _} -> {error, uninitialized};
    {ok, Tid}  -> {ok, #store_state{table = Tid}}
  end.

insert(Topic, Msg, #store_state{table = T}) ->
  ok = dets:insert(T, #retained_message{topic = Topic, mqtt_msg = Msg}).

lookup(Topic, #store_state{table = T}) ->
  case dets:lookup(T, Topic) of
    []      -> not_found;
    [Entry] -> Entry
  end.

delete(Topic, #store_state{table = T}) ->
  ok = dets:delete(T, Topic).

terminate(#store_state{table = T}) ->
  ok = dets:close(T).

open_table(Dir, VHost) ->
  dets:open_file(rabbit_mqtt_retained_msg_store:table_name_for(VHost),
    table_options(rabbit_mqtt_util:path_for(Dir, VHost, ".dets"))).

table_options(Path) ->
  [{type, set}, {keypos, #retained_message.topic},
    {file, Path}, {ram_file, true}, {repair, true},
    {auto_save, rabbit_misc:get_env(rabbit_mqtt,
                                    retained_message_store_dets_sync_interval, 2000)}].