| 1 |
|
-module(shurbej_version). |
| 2 |
|
-behaviour(gen_server). |
| 3 |
|
-include("shurbej_records.hrl"). |
| 4 |
|
|
| 5 |
|
-export([start_link/1]). |
| 6 |
|
-export([get/1, write/3]). |
| 7 |
|
-export([init/1, handle_call/3, handle_cast/2]). |
| 8 |
|
|
| 9 |
|
start_link(LibRef) -> |
| 10 |
212 |
gen_server:start_link({global, {?MODULE, LibRef}}, ?MODULE, LibRef, []). |
| 11 |
|
|
| 12 |
|
%% Get current library version. |
| 13 |
|
get(LibRef) -> |
| 14 |
104 |
call(LibRef, get_version). |
| 15 |
|
|
| 16 |
|
%% Execute a write operation with version tracking. |
| 17 |
|
%% WriteFun is called as WriteFun(NewVersion) and must return ok | {error, _}. |
| 18 |
|
%% Returns {ok, NewVersion} | {error, precondition, CurrentVersion} | {error, _}. |
| 19 |
|
write(LibRef, ExpectedVersion, WriteFun) -> |
| 20 |
108 |
call(LibRef, {write, ExpectedVersion, WriteFun}). |
| 21 |
|
|
| 22 |
|
%% gen_server callbacks |
| 23 |
|
|
| 24 |
|
init(LibRef) -> |
| 25 |
9 |
ok = shurbej_db:ensure_library(LibRef), |
| 26 |
9 |
case shurbej_db:get_library(LibRef) of |
| 27 |
|
{ok, #shurbej_library{version = Version}} -> |
| 28 |
9 |
{ok, #{lib_ref => LibRef, version => Version}}; |
| 29 |
|
undefined -> |
| 30 |
:-( |
{stop, {unknown_library, LibRef}} |
| 31 |
|
end. |
| 32 |
|
|
| 33 |
|
handle_call(get_version, _From, #{version := V} = State) -> |
| 34 |
104 |
{reply, {ok, V}, State}; |
| 35 |
|
|
| 36 |
|
handle_call({write, ExpectedVersion, WriteFun}, _From, |
| 37 |
|
#{lib_ref := LibRef, version := Current} = State) -> |
| 38 |
108 |
case ExpectedVersion of |
| 39 |
|
any -> |
| 40 |
89 |
do_write(LibRef, Current, WriteFun, State); |
| 41 |
|
Current -> |
| 42 |
14 |
do_write(LibRef, Current, WriteFun, State); |
| 43 |
|
_ when Current =:= 0 -> |
| 44 |
|
%% Fresh library — fast-forward to client's version for migration |
| 45 |
:-( |
do_write(LibRef, ExpectedVersion, WriteFun, State); |
| 46 |
|
_ -> |
| 47 |
5 |
{reply, {error, precondition, Current}, State} |
| 48 |
|
end; |
| 49 |
|
|
| 50 |
|
handle_call(_Msg, _From, State) -> |
| 51 |
:-( |
{reply, {error, unknown}, State}. |
| 52 |
|
|
| 53 |
|
handle_cast(_Msg, State) -> |
| 54 |
:-( |
{noreply, State}. |
| 55 |
|
|
| 56 |
|
%% Internal |
| 57 |
|
|
| 58 |
|
do_write(LibRef, Current, WriteFun, State) -> |
| 59 |
103 |
NewVersion = Current + 1, |
| 60 |
|
%% Clear any leftover orphan-blob entries from a prior run so aborts |
| 61 |
|
%% from that run can't pollute this one. |
| 62 |
103 |
shurbej_db:reset_orphan_blobs(), |
| 63 |
|
%% Execute write + version update atomically in a Mnesia transaction |
| 64 |
103 |
case mnesia:transaction(fun() -> |
| 65 |
103 |
case WriteFun(NewVersion) of |
| 66 |
|
ok -> |
| 67 |
103 |
mnesia:write(#shurbej_library{ref = LibRef, version = NewVersion}), |
| 68 |
103 |
ok; |
| 69 |
|
{error, _} = Err -> |
| 70 |
:-( |
mnesia:abort(Err) |
| 71 |
|
end |
| 72 |
|
end) of |
| 73 |
|
{atomic, ok} -> |
| 74 |
|
%% Transaction committed — safe to unlink freed blobs now. |
| 75 |
103 |
shurbej_db:reap_orphan_blobs(), |
| 76 |
|
%% Notify stream subscribers via pg (no compile-time dependency) |
| 77 |
103 |
Topic = topic(LibRef), |
| 78 |
103 |
try |
| 79 |
103 |
Members = pg:get_members(shurbej_stream, Topic), |
| 80 |
103 |
[Pid ! {topic_updated, Topic, NewVersion} || Pid <- Members] |
| 81 |
:-( |
catch _:_ -> ok |
| 82 |
|
end, |
| 83 |
103 |
{reply, {ok, NewVersion}, State#{version := NewVersion}}; |
| 84 |
|
{aborted, {error, _} = Err} -> |
| 85 |
:-( |
shurbej_db:reset_orphan_blobs(), |
| 86 |
:-( |
{reply, Err, State}; |
| 87 |
|
{aborted, Reason} -> |
| 88 |
:-( |
shurbej_db:reset_orphan_blobs(), |
| 89 |
:-( |
{reply, {error, Reason}, State} |
| 90 |
|
end. |
| 91 |
|
|
| 92 |
|
topic({user, Id}) -> |
| 93 |
99 |
<<"/users/", (integer_to_binary(Id))/binary>>; |
| 94 |
|
topic({group, Id}) -> |
| 95 |
4 |
<<"/groups/", (integer_to_binary(Id))/binary>>. |
| 96 |
|
|
| 97 |
|
call(LibRef, Msg) -> |
| 98 |
|
%% The version server is lazy-started per library. `ensure_started` is |
| 99 |
|
%% race-free — start_child returns {already_started, Pid} if another |
| 100 |
|
%% caller beat us to it — and we also tolerate the server dying |
| 101 |
|
%% between resolve and call (e.g. during a group wipe) by retrying |
| 102 |
|
%% once. A single retry is enough because ensure_started will always |
| 103 |
|
%% spawn a fresh worker on the second attempt. |
| 104 |
212 |
try gen_server:call(ensure_started(LibRef), Msg) |
| 105 |
|
catch exit:{noproc, _} -> |
| 106 |
:-( |
gen_server:call(ensure_started(LibRef), Msg) |
| 107 |
|
end. |
| 108 |
|
|
| 109 |
|
ensure_started(LibRef) -> |
| 110 |
212 |
case shurbej_version_sup:start_child(LibRef) of |
| 111 |
9 |
{ok, Pid} -> Pid; |
| 112 |
203 |
{error, {already_started, Pid}} -> Pid |
| 113 |
|
end. |