| 1 |
|
-module(shurbej_files). |
| 2 |
|
-behaviour(gen_server). |
| 3 |
|
-include("shurbej_records.hrl"). |
| 4 |
|
|
| 5 |
|
%% Public API |
| 6 |
|
-export([ |
| 7 |
|
start_link/0, |
| 8 |
|
blob_path/1, |
| 9 |
|
prepare_upload/3, |
| 10 |
|
get_pending/1, |
| 11 |
|
store/3, |
| 12 |
|
register_upload/1, |
| 13 |
|
mark_stored/2, |
| 14 |
|
cleanup_expired_uploads/0, |
| 15 |
|
confirm_existing/2 |
| 16 |
|
]). |
| 17 |
|
|
| 18 |
|
%% gen_server callbacks |
| 19 |
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]). |
| 20 |
|
|
| 21 |
|
-define(TABLE, shurbej_pending_uploads). |
| 22 |
|
|
| 23 |
|
start_link() -> |
| 24 |
1 |
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). |
| 25 |
|
|
| 26 |
|
%% Content-addressed path: <root>/<first 2 hex chars of sha256>/<full sha256> |
| 27 |
|
blob_path(Hash) -> |
| 28 |
29 |
Root = to_list(application:get_env(shurbej, file_storage_path, <<"./data/files">>)), |
| 29 |
29 |
Prefix = binary_to_list(binary:part(Hash, 0, 2)), |
| 30 |
29 |
filename:join([Root, Prefix, binary_to_list(Hash)]). |
| 31 |
|
|
| 32 |
|
prepare_upload(LibRef, ItemKey, Meta) -> |
| 33 |
16 |
gen_server:call(?MODULE, {prepare, LibRef, ItemKey, Meta}). |
| 34 |
|
|
| 35 |
|
get_pending(UploadKey) -> |
| 36 |
|
%% Reads are safe from any process on a public/protected table |
| 37 |
17 |
case ets:lookup(?TABLE, UploadKey) of |
| 38 |
16 |
[{_, Info}] -> {ok, Info}; |
| 39 |
1 |
[] -> {error, not_found} |
| 40 |
|
end. |
| 41 |
|
|
| 42 |
|
%% Store file data to disk. Verifies MD5 and computes SHA-256. |
| 43 |
|
%% Zotero sends files as ZIP-compressed. Extract first, then verify MD5. |
| 44 |
|
%% Caps the decompressed payload so a zip bomb can't OOM the node. |
| 45 |
|
store(UploadKey, #{meta := Meta} = _Info, Data) -> |
| 46 |
16 |
#{md5 := ExpectedMd5} = Meta, |
| 47 |
16 |
case maybe_unzip(Data, max_file_bytes()) of |
| 48 |
|
{error, _} = Err -> |
| 49 |
:-( |
Err; |
| 50 |
|
FileData -> |
| 51 |
16 |
store_unpacked(UploadKey, ExpectedMd5, FileData) |
| 52 |
|
end. |
| 53 |
|
|
| 54 |
|
store_unpacked(UploadKey, ExpectedMd5, FileData) -> |
| 55 |
16 |
ActualMd5 = hex_hash(md5, FileData), |
| 56 |
16 |
case ActualMd5 of |
| 57 |
|
ExpectedMd5 -> |
| 58 |
15 |
Sha256 = hex_hash(sha256, FileData), |
| 59 |
15 |
BlobFile = blob_path(Sha256), |
| 60 |
15 |
case filelib:is_regular(BlobFile) of |
| 61 |
|
true -> |
| 62 |
1 |
gen_server:call(?MODULE, {mark_stored, UploadKey, Sha256}), |
| 63 |
1 |
ok; |
| 64 |
|
false -> |
| 65 |
14 |
ok = filelib:ensure_dir(BlobFile), |
| 66 |
14 |
case file:write_file(BlobFile, FileData) of |
| 67 |
|
ok -> |
| 68 |
14 |
gen_server:call(?MODULE, {mark_stored, UploadKey, Sha256}), |
| 69 |
14 |
ok; |
| 70 |
|
{error, _} = Err -> |
| 71 |
:-( |
Err |
| 72 |
|
end |
| 73 |
|
end; |
| 74 |
|
_ -> |
| 75 |
1 |
{error, md5_mismatch} |
| 76 |
|
end. |
| 77 |
|
|
| 78 |
|
%% Mark a pending upload as pre-stored (for dedup when blob already exists). |
| 79 |
|
mark_stored(UploadKey, Info) -> |
| 80 |
:-( |
gen_server:call(?MODULE, {mark_stored_raw, UploadKey, Info}). |
| 81 |
|
|
| 82 |
|
%% Clean up expired pending uploads. |
| 83 |
|
cleanup_expired_uploads() -> |
| 84 |
2 |
gen_server:call(?MODULE, cleanup_expired). |
| 85 |
|
|
| 86 |
|
%% Register a completed upload (atomic via gen_server to prevent TOCTOU races). |
| 87 |
|
register_upload(UploadKey) -> |
| 88 |
15 |
gen_server:call(?MODULE, {register_upload, UploadKey}). |
| 89 |
|
|
| 90 |
|
%% gen_server callbacks |
| 91 |
|
|
| 92 |
|
init([]) -> |
| 93 |
1 |
Table = ets:new(?TABLE, [named_table, protected, set]), |
| 94 |
1 |
{ok, #{table => Table}}. |
| 95 |
|
|
| 96 |
|
handle_call({prepare, LibRef, ItemKey, Meta}, _From, State) -> |
| 97 |
16 |
UploadKey = generate_upload_key(), |
| 98 |
16 |
ets:insert(?TABLE, {UploadKey, #{ |
| 99 |
|
lib_ref => LibRef, |
| 100 |
|
item_key => ItemKey, |
| 101 |
|
meta => Meta, |
| 102 |
|
stored => false, |
| 103 |
|
created => erlang:system_time(millisecond) |
| 104 |
|
}}), |
| 105 |
16 |
{reply, UploadKey, State}; |
| 106 |
|
|
| 107 |
|
handle_call({mark_stored, UploadKey, Sha256}, _From, State) -> |
| 108 |
15 |
case ets:lookup(?TABLE, UploadKey) of |
| 109 |
|
[{_, Info}] -> |
| 110 |
15 |
ets:insert(?TABLE, {UploadKey, Info#{stored => true, sha256 => Sha256}}), |
| 111 |
15 |
{reply, ok, State}; |
| 112 |
|
[] -> |
| 113 |
:-( |
{reply, {error, not_found}, State} |
| 114 |
|
end; |
| 115 |
|
|
| 116 |
|
handle_call({mark_stored_raw, UploadKey, Info}, _From, State) -> |
| 117 |
:-( |
ets:insert(?TABLE, {UploadKey, Info#{stored => true}}), |
| 118 |
:-( |
{reply, ok, State}; |
| 119 |
|
|
| 120 |
|
handle_call({register_upload, UploadKey}, _From, State) -> |
| 121 |
15 |
Result = case ets:lookup(?TABLE, UploadKey) of |
| 122 |
|
[{_, #{stored := true, lib_ref := {LT, LI} = LibRef, item_key := ItemKey, meta := Meta, |
| 123 |
|
sha256 := Sha256}}] -> |
| 124 |
15 |
#{md5 := Md5, filename := Filename, filesize := Filesize, |
| 125 |
|
mtime := Mtime} = Meta, |
| 126 |
|
%% Use `any` — concurrent uploads must not fail with precondition |
| 127 |
|
%% errors. The gen_server serializes writes, so versions are |
| 128 |
|
%% monotonically increasing and responses stay ordered. |
| 129 |
15 |
WriteResult = shurbej_version:write(LibRef, any, fun(NewVersion) -> |
| 130 |
15 |
case shurbej_db:get_file_meta(LibRef, ItemKey) of |
| 131 |
|
{ok, #shurbej_file_meta{sha256 = OldHash}} when OldHash =/= Sha256 -> |
| 132 |
:-( |
case shurbej_db:blob_unref(OldHash) of |
| 133 |
:-( |
{ok, 0} -> delete_blob_file(OldHash); |
| 134 |
:-( |
{ok, _} -> ok |
| 135 |
|
end; |
| 136 |
15 |
_ -> ok |
| 137 |
|
end, |
| 138 |
15 |
shurbej_db:blob_ref(Sha256), |
| 139 |
15 |
shurbej_db:write_file_meta(#shurbej_file_meta{ |
| 140 |
|
id = {LT, LI, ItemKey}, |
| 141 |
|
md5 = Md5, sha256 = Sha256, |
| 142 |
|
filename = Filename, filesize = Filesize, |
| 143 |
|
mtime = Mtime |
| 144 |
|
}), |
| 145 |
|
%% Bump item version so incremental sync detects the file change |
| 146 |
15 |
bump_item_version(LibRef, ItemKey, NewVersion), |
| 147 |
15 |
ok |
| 148 |
|
end), |
| 149 |
15 |
ets:delete(?TABLE, UploadKey), |
| 150 |
15 |
WriteResult; |
| 151 |
|
[{_, #{stored := false}}] -> |
| 152 |
:-( |
{error, not_stored}; |
| 153 |
|
[] -> |
| 154 |
:-( |
{error, not_found} |
| 155 |
|
end, |
| 156 |
15 |
{reply, Result, State}; |
| 157 |
|
|
| 158 |
|
handle_call(cleanup_expired, _From, State) -> |
| 159 |
2 |
Now = erlang:system_time(millisecond), |
| 160 |
2 |
TTL = 3600000, %% 1 hour |
| 161 |
2 |
Expired = ets:foldl(fun({Key, Info}, Acc) -> |
| 162 |
2 |
Created = maps:get(created, Info, 0), |
| 163 |
2 |
case Created > 0 andalso Now - Created > TTL of |
| 164 |
:-( |
true -> [Key | Acc]; |
| 165 |
2 |
false -> Acc |
| 166 |
|
end |
| 167 |
|
end, [], ?TABLE), |
| 168 |
2 |
lists:foreach(fun(Key) -> ets:delete(?TABLE, Key) end, Expired), |
| 169 |
2 |
{reply, ok, State}; |
| 170 |
|
|
| 171 |
|
handle_call(_Msg, _From, State) -> |
| 172 |
:-( |
{reply, {error, unknown}, State}. |
| 173 |
|
|
| 174 |
|
handle_cast(_Msg, State) -> |
| 175 |
:-( |
{noreply, State}. |
| 176 |
|
|
| 177 |
|
handle_info(_Msg, State) -> |
| 178 |
:-( |
{noreply, State}. |
| 179 |
|
|
| 180 |
|
%% Confirm an existing file — bumps library + item version so concurrent |
| 181 |
|
%% exists responses are serialized with registrations through the version |
| 182 |
|
%% gen_server, keeping Last-Modified-Version monotonic for the client. |
| 183 |
|
confirm_existing(LibRef, ItemKey) -> |
| 184 |
4 |
shurbej_version:write(LibRef, any, fun(NewVersion) -> |
| 185 |
4 |
bump_item_version(LibRef, ItemKey, NewVersion), |
| 186 |
4 |
ok |
| 187 |
|
end). |
| 188 |
|
|
| 189 |
|
bump_item_version(LibRef, ItemKey, NewVersion) -> |
| 190 |
19 |
case shurbej_db:get_item(LibRef, ItemKey) of |
| 191 |
|
{ok, Item} -> |
| 192 |
19 |
shurbej_db:write_item(Item#shurbej_item{version = NewVersion}); |
| 193 |
:-( |
_ -> ok |
| 194 |
|
end. |
| 195 |
|
|
| 196 |
|
%% Internal |
| 197 |
|
|
| 198 |
|
delete_blob_file(Hash) -> |
| 199 |
:-( |
file:delete(blob_path(Hash)). |
| 200 |
|
|
| 201 |
|
%% If the data is a ZIP archive, extract the first file from it. |
| 202 |
|
%% Zotero ZFS compresses files into ZIP before uploading. A post-decompression |
| 203 |
|
%% size check catches zip bombs — we can't refuse to allocate in zip:unzip, but |
| 204 |
|
%% we can reject the result before writing it to disk. |
| 205 |
|
maybe_unzip(<<80, 75, 3, 4, _/binary>> = ZipData, Max) -> |
| 206 |
2 |
case zip:unzip(ZipData, [memory]) of |
| 207 |
|
{ok, [{_Filename, Content} | _Rest]} when byte_size(Content) > Max -> |
| 208 |
:-( |
{error, zip_too_large}; |
| 209 |
|
{ok, [{_Filename, Content} | _Rest]} -> |
| 210 |
2 |
Content; |
| 211 |
|
_ -> |
| 212 |
:-( |
ZipData %% fallback: treat as raw |
| 213 |
|
end; |
| 214 |
|
maybe_unzip(Data, Max) when byte_size(Data) > Max -> |
| 215 |
:-( |
{error, zip_too_large}; |
| 216 |
|
maybe_unzip(Data, _Max) -> |
| 217 |
14 |
Data. |
| 218 |
|
|
| 219 |
|
max_file_bytes() -> |
| 220 |
16 |
application:get_env(shurbej, max_upload_bytes, 100 * 1024 * 1024). |
| 221 |
|
|
| 222 |
|
hex_hash(Algorithm, Data) -> |
| 223 |
31 |
binary:encode_hex(crypto:hash(Algorithm, Data), lowercase). |
| 224 |
|
|
| 225 |
|
generate_upload_key() -> |
| 226 |
16 |
binary:encode_hex(crypto:strong_rand_bytes(16), lowercase). |
| 227 |
|
|
| 228 |
:-( |
to_list(B) when is_binary(B) -> binary_to_list(B); |
| 229 |
29 |
to_list(L) when is_list(L) -> L. |