/__w/shurbej/shurbej/_build/test/cover/ct/shurbej_files.html

1 -module(shurbej_files).
2 -behaviour(gen_server).
3 -include("shurbej_records.hrl").
4
5 %% Public API
6 -export([
7 start_link/0,
8 blob_path/1,
9 prepare_upload/3,
10 get_pending/1,
11 store/3,
12 register_upload/1,
13 mark_stored/2,
14 cleanup_expired_uploads/0,
15 confirm_existing/2
16 ]).
17
18 %% gen_server callbacks
19 -export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
20
21 -define(TABLE, shurbej_pending_uploads).
22
23 start_link() ->
24 1 gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
25
26 %% Content-addressed path: <root>/<first 2 hex chars of sha256>/<full sha256>
27 blob_path(Hash) ->
28 29 Root = to_list(application:get_env(shurbej, file_storage_path, <<"./data/files">>)),
29 29 Prefix = binary_to_list(binary:part(Hash, 0, 2)),
30 29 filename:join([Root, Prefix, binary_to_list(Hash)]).
31
32 prepare_upload(LibRef, ItemKey, Meta) ->
33 16 gen_server:call(?MODULE, {prepare, LibRef, ItemKey, Meta}).
34
35 get_pending(UploadKey) ->
36 %% Reads are safe from any process on a public/protected table
37 17 case ets:lookup(?TABLE, UploadKey) of
38 16 [{_, Info}] -> {ok, Info};
39 1 [] -> {error, not_found}
40 end.
41
42 %% Store file data to disk. Verifies MD5 and computes SHA-256.
43 %% Zotero sends files as ZIP-compressed. Extract first, then verify MD5.
44 %% Caps the decompressed payload so a zip bomb can't OOM the node.
45 store(UploadKey, #{meta := Meta} = _Info, Data) ->
46 16 #{md5 := ExpectedMd5} = Meta,
47 16 case maybe_unzip(Data, max_file_bytes()) of
48 {error, _} = Err ->
49
:-(
Err;
50 FileData ->
51 16 store_unpacked(UploadKey, ExpectedMd5, FileData)
52 end.
53
54 store_unpacked(UploadKey, ExpectedMd5, FileData) ->
55 16 ActualMd5 = hex_hash(md5, FileData),
56 16 case ActualMd5 of
57 ExpectedMd5 ->
58 15 Sha256 = hex_hash(sha256, FileData),
59 15 BlobFile = blob_path(Sha256),
60 15 case filelib:is_regular(BlobFile) of
61 true ->
62 1 gen_server:call(?MODULE, {mark_stored, UploadKey, Sha256}),
63 1 ok;
64 false ->
65 14 ok = filelib:ensure_dir(BlobFile),
66 14 case file:write_file(BlobFile, FileData) of
67 ok ->
68 14 gen_server:call(?MODULE, {mark_stored, UploadKey, Sha256}),
69 14 ok;
70 {error, _} = Err ->
71
:-(
Err
72 end
73 end;
74 _ ->
75 1 {error, md5_mismatch}
76 end.
77
78 %% Mark a pending upload as pre-stored (for dedup when blob already exists).
79 mark_stored(UploadKey, Info) ->
80
:-(
gen_server:call(?MODULE, {mark_stored_raw, UploadKey, Info}).
81
82 %% Clean up expired pending uploads.
83 cleanup_expired_uploads() ->
84 2 gen_server:call(?MODULE, cleanup_expired).
85
86 %% Register a completed upload (atomic via gen_server to prevent TOCTOU races).
87 register_upload(UploadKey) ->
88 15 gen_server:call(?MODULE, {register_upload, UploadKey}).
89
90 %% gen_server callbacks
91
92 init([]) ->
93 1 Table = ets:new(?TABLE, [named_table, protected, set]),
94 1 {ok, #{table => Table}}.
95
96 handle_call({prepare, LibRef, ItemKey, Meta}, _From, State) ->
97 16 UploadKey = generate_upload_key(),
98 16 ets:insert(?TABLE, {UploadKey, #{
99 lib_ref => LibRef,
100 item_key => ItemKey,
101 meta => Meta,
102 stored => false,
103 created => erlang:system_time(millisecond)
104 }}),
105 16 {reply, UploadKey, State};
106
107 handle_call({mark_stored, UploadKey, Sha256}, _From, State) ->
108 15 case ets:lookup(?TABLE, UploadKey) of
109 [{_, Info}] ->
110 15 ets:insert(?TABLE, {UploadKey, Info#{stored => true, sha256 => Sha256}}),
111 15 {reply, ok, State};
112 [] ->
113
:-(
{reply, {error, not_found}, State}
114 end;
115
116 handle_call({mark_stored_raw, UploadKey, Info}, _From, State) ->
117
:-(
ets:insert(?TABLE, {UploadKey, Info#{stored => true}}),
118
:-(
{reply, ok, State};
119
120 handle_call({register_upload, UploadKey}, _From, State) ->
121 15 Result = case ets:lookup(?TABLE, UploadKey) of
122 [{_, #{stored := true, lib_ref := {LT, LI} = LibRef, item_key := ItemKey, meta := Meta,
123 sha256 := Sha256}}] ->
124 15 #{md5 := Md5, filename := Filename, filesize := Filesize,
125 mtime := Mtime} = Meta,
126 %% Use `any` — concurrent uploads must not fail with precondition
127 %% errors. The gen_server serializes writes, so versions are
128 %% monotonically increasing and responses stay ordered.
129 15 WriteResult = shurbej_version:write(LibRef, any, fun(NewVersion) ->
130 15 case shurbej_db:get_file_meta(LibRef, ItemKey) of
131 {ok, #shurbej_file_meta{sha256 = OldHash}} when OldHash =/= Sha256 ->
132
:-(
case shurbej_db:blob_unref(OldHash) of
133
:-(
{ok, 0} -> delete_blob_file(OldHash);
134
:-(
{ok, _} -> ok
135 end;
136 15 _ -> ok
137 end,
138 15 shurbej_db:blob_ref(Sha256),
139 15 shurbej_db:write_file_meta(#shurbej_file_meta{
140 id = {LT, LI, ItemKey},
141 md5 = Md5, sha256 = Sha256,
142 filename = Filename, filesize = Filesize,
143 mtime = Mtime
144 }),
145 %% Bump item version so incremental sync detects the file change
146 15 bump_item_version(LibRef, ItemKey, NewVersion),
147 15 ok
148 end),
149 15 ets:delete(?TABLE, UploadKey),
150 15 WriteResult;
151 [{_, #{stored := false}}] ->
152
:-(
{error, not_stored};
153 [] ->
154
:-(
{error, not_found}
155 end,
156 15 {reply, Result, State};
157
158 handle_call(cleanup_expired, _From, State) ->
159 2 Now = erlang:system_time(millisecond),
160 2 TTL = 3600000, %% 1 hour
161 2 Expired = ets:foldl(fun({Key, Info}, Acc) ->
162 2 Created = maps:get(created, Info, 0),
163 2 case Created > 0 andalso Now - Created > TTL of
164
:-(
true -> [Key | Acc];
165 2 false -> Acc
166 end
167 end, [], ?TABLE),
168 2 lists:foreach(fun(Key) -> ets:delete(?TABLE, Key) end, Expired),
169 2 {reply, ok, State};
170
171 handle_call(_Msg, _From, State) ->
172
:-(
{reply, {error, unknown}, State}.
173
174 handle_cast(_Msg, State) ->
175
:-(
{noreply, State}.
176
177 handle_info(_Msg, State) ->
178
:-(
{noreply, State}.
179
180 %% Confirm an existing file — bumps library + item version so concurrent
181 %% exists responses are serialized with registrations through the version
182 %% gen_server, keeping Last-Modified-Version monotonic for the client.
183 confirm_existing(LibRef, ItemKey) ->
184 4 shurbej_version:write(LibRef, any, fun(NewVersion) ->
185 4 bump_item_version(LibRef, ItemKey, NewVersion),
186 4 ok
187 end).
188
189 bump_item_version(LibRef, ItemKey, NewVersion) ->
190 19 case shurbej_db:get_item(LibRef, ItemKey) of
191 {ok, Item} ->
192 19 shurbej_db:write_item(Item#shurbej_item{version = NewVersion});
193
:-(
_ -> ok
194 end.
195
196 %% Internal
197
198 delete_blob_file(Hash) ->
199
:-(
file:delete(blob_path(Hash)).
200
201 %% If the data is a ZIP archive, extract the first file from it.
202 %% Zotero ZFS compresses files into ZIP before uploading. A post-decompression
203 %% size check catches zip bombs — we can't refuse to allocate in zip:unzip, but
204 %% we can reject the result before writing it to disk.
205 maybe_unzip(<<80, 75, 3, 4, _/binary>> = ZipData, Max) ->
206 2 case zip:unzip(ZipData, [memory]) of
207 {ok, [{_Filename, Content} | _Rest]} when byte_size(Content) > Max ->
208
:-(
{error, zip_too_large};
209 {ok, [{_Filename, Content} | _Rest]} ->
210 2 Content;
211 _ ->
212
:-(
ZipData %% fallback: treat as raw
213 end;
214 maybe_unzip(Data, Max) when byte_size(Data) > Max ->
215
:-(
{error, zip_too_large};
216 maybe_unzip(Data, _Max) ->
217 14 Data.
218
219 max_file_bytes() ->
220 16 application:get_env(shurbej, max_upload_bytes, 100 * 1024 * 1024).
221
222 hex_hash(Algorithm, Data) ->
223 31 binary:encode_hex(crypto:hash(Algorithm, Data), lowercase).
224
225 generate_upload_key() ->
226 16 binary:encode_hex(crypto:strong_rand_bytes(16), lowercase).
227
228
:-(
to_list(B) when is_binary(B) -> binary_to_list(B);
229 29 to_list(L) when is_list(L) -> L.
Line Hits Source