/__w/shurbej/shurbej/_build/test/cover/ct/shurbej_version.html

1 -module(shurbej_version).
2 -behaviour(gen_server).
3 -include("shurbej_records.hrl").
4
5 -export([start_link/1]).
6 -export([get/1, write/3]).
7 -export([init/1, handle_call/3, handle_cast/2]).
8
9 start_link(LibRef) ->
10 212 gen_server:start_link({global, {?MODULE, LibRef}}, ?MODULE, LibRef, []).
11
12 %% Get current library version.
13 get(LibRef) ->
14 104 call(LibRef, get_version).
15
16 %% Execute a write operation with version tracking.
17 %% WriteFun is called as WriteFun(NewVersion) and must return ok | {error, _}.
18 %% Returns {ok, NewVersion} | {error, precondition, CurrentVersion} | {error, _}.
19 write(LibRef, ExpectedVersion, WriteFun) ->
20 108 call(LibRef, {write, ExpectedVersion, WriteFun}).
21
22 %% gen_server callbacks
23
24 init(LibRef) ->
25 9 ok = shurbej_db:ensure_library(LibRef),
26 9 case shurbej_db:get_library(LibRef) of
27 {ok, #shurbej_library{version = Version}} ->
28 9 {ok, #{lib_ref => LibRef, version => Version}};
29 undefined ->
30
:-(
{stop, {unknown_library, LibRef}}
31 end.
32
33 handle_call(get_version, _From, #{version := V} = State) ->
34 104 {reply, {ok, V}, State};
35
36 handle_call({write, ExpectedVersion, WriteFun}, _From,
37 #{lib_ref := LibRef, version := Current} = State) ->
38 108 case ExpectedVersion of
39 any ->
40 89 do_write(LibRef, Current, WriteFun, State);
41 Current ->
42 14 do_write(LibRef, Current, WriteFun, State);
43 _ when Current =:= 0 ->
44 %% Fresh library — fast-forward to client's version for migration
45
:-(
do_write(LibRef, ExpectedVersion, WriteFun, State);
46 _ ->
47 5 {reply, {error, precondition, Current}, State}
48 end;
49
50 handle_call(_Msg, _From, State) ->
51
:-(
{reply, {error, unknown}, State}.
52
53 handle_cast(_Msg, State) ->
54
:-(
{noreply, State}.
55
56 %% Internal
57
58 do_write(LibRef, Current, WriteFun, State) ->
59 103 NewVersion = Current + 1,
60 %% Clear any leftover orphan-blob entries from a prior run so aborts
61 %% from that run can't pollute this one.
62 103 shurbej_db:reset_orphan_blobs(),
63 %% Execute write + version update atomically in a Mnesia transaction
64 103 case mnesia:transaction(fun() ->
65 103 case WriteFun(NewVersion) of
66 ok ->
67 103 mnesia:write(#shurbej_library{ref = LibRef, version = NewVersion}),
68 103 ok;
69 {error, _} = Err ->
70
:-(
mnesia:abort(Err)
71 end
72 end) of
73 {atomic, ok} ->
74 %% Transaction committed — safe to unlink freed blobs now.
75 103 shurbej_db:reap_orphan_blobs(),
76 %% Notify stream subscribers via pg (no compile-time dependency)
77 103 Topic = topic(LibRef),
78 103 try
79 103 Members = pg:get_members(shurbej_stream, Topic),
80 103 [Pid ! {topic_updated, Topic, NewVersion} || Pid <- Members]
81
:-(
catch _:_ -> ok
82 end,
83 103 {reply, {ok, NewVersion}, State#{version := NewVersion}};
84 {aborted, {error, _} = Err} ->
85
:-(
shurbej_db:reset_orphan_blobs(),
86
:-(
{reply, Err, State};
87 {aborted, Reason} ->
88
:-(
shurbej_db:reset_orphan_blobs(),
89
:-(
{reply, {error, Reason}, State}
90 end.
91
92 topic({user, Id}) ->
93 99 <<"/users/", (integer_to_binary(Id))/binary>>;
94 topic({group, Id}) ->
95 4 <<"/groups/", (integer_to_binary(Id))/binary>>.
96
97 call(LibRef, Msg) ->
98 %% The version server is lazy-started per library. `ensure_started` is
99 %% race-free — start_child returns {already_started, Pid} if another
100 %% caller beat us to it — and we also tolerate the server dying
101 %% between resolve and call (e.g. during a group wipe) by retrying
102 %% once. A single retry is enough because ensure_started will always
103 %% spawn a fresh worker on the second attempt.
104 212 try gen_server:call(ensure_started(LibRef), Msg)
105 catch exit:{noproc, _} ->
106
:-(
gen_server:call(ensure_started(LibRef), Msg)
107 end.
108
109 ensure_started(LibRef) ->
110 212 case shurbej_version_sup:start_child(LibRef) of
111 9 {ok, Pid} -> Pid;
112 203 {error, {already_started, Pid}} -> Pid
113 end.
Line Hits Source