| 1 |
|
-module(shurbej_ws_stream). |
| 2 |
|
|
| 3 |
|
%% Zotero Streaming API — WebSocket handler. |
| 4 |
|
%% Clients subscribe to library topics and receive topicUpdated events |
| 5 |
|
%% when library versions change. Uses pg (process groups) for pub/sub. |
| 6 |
|
|
| 7 |
|
-include_lib("shurbej_store/include/shurbej_records.hrl"). |
| 8 |
|
|
| 9 |
|
-export([init/2, websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]). |
| 10 |
|
|
| 11 |
|
-define(SCOPE, shurbej_stream). |
| 12 |
|
-define(RETRY_MS, 10000). |
| 13 |
|
-define(PING_INTERVAL, 25000). |
| 14 |
|
|
| 15 |
|
init(Req, State) -> |
| 16 |
6 |
ApiKey = shurbej_http_common:extract_api_key(Req), |
| 17 |
6 |
{cowboy_websocket, Req, State#{api_key => ApiKey}, #{ |
| 18 |
|
idle_timeout => 120000, |
| 19 |
|
max_frame_size => 1048576 |
| 20 |
|
}}. |
| 21 |
|
|
| 22 |
|
websocket_init(#{api_key := ApiKey} = State) -> |
| 23 |
6 |
erlang:send_after(?PING_INTERVAL, self(), keepalive), |
| 24 |
6 |
case ApiKey of |
| 25 |
|
undefined -> |
| 26 |
|
%% Multi-key connection — no auto-subscribe |
| 27 |
3 |
Reply = simdjson:encode(#{<<"event">> => <<"connected">>, <<"retry">> => ?RETRY_MS}), |
| 28 |
3 |
{reply, {text, Reply}, State#{subscriptions => #{}, single_key => false}}; |
| 29 |
|
Key -> |
| 30 |
|
%% Single-key connection — auto-subscribe to user's library + groups |
| 31 |
3 |
case shurbej_auth:verify(Key) of |
| 32 |
|
{ok, UserId} -> |
| 33 |
2 |
Topics = allowed_topics(UserId), |
| 34 |
2 |
[pg:join(?SCOPE, T, self()) || T <- Topics], |
| 35 |
2 |
Reply = simdjson:encode(#{ |
| 36 |
|
<<"event">> => <<"connected">>, |
| 37 |
|
<<"retry">> => ?RETRY_MS, |
| 38 |
|
<<"topics">> => Topics |
| 39 |
|
}), |
| 40 |
2 |
{reply, {text, Reply}, |
| 41 |
|
State#{subscriptions => #{Key => Topics}, single_key => true}}; |
| 42 |
|
{error, _} -> |
| 43 |
1 |
{reply, {close, 4403, <<"Forbidden">>}, State} |
| 44 |
|
end |
| 45 |
|
end. |
| 46 |
|
|
| 47 |
|
%% Client messages |
| 48 |
|
websocket_handle({text, Msg}, State) -> |
| 49 |
4 |
Decoded = try simdjson:decode(Msg) |
| 50 |
1 |
catch error:_ -> invalid |
| 51 |
|
end, |
| 52 |
4 |
case Decoded of |
| 53 |
|
#{<<"action">> := Action} = Payload -> |
| 54 |
3 |
handle_action(Action, Payload, State); |
| 55 |
|
_ -> |
| 56 |
1 |
{reply, {close, 4400, <<"Bad request">>}, State} |
| 57 |
|
end; |
| 58 |
|
websocket_handle(_Frame, State) -> |
| 59 |
:-( |
{ok, State}. |
| 60 |
|
|
| 61 |
|
%% Topic updates from pg |
| 62 |
|
websocket_info({topic_updated, Topic, Version}, State) -> |
| 63 |
1 |
Reply = simdjson:encode(#{ |
| 64 |
|
<<"event">> => <<"topicUpdated">>, |
| 65 |
|
<<"topic">> => Topic, |
| 66 |
|
<<"version">> => Version |
| 67 |
|
}), |
| 68 |
1 |
{reply, {text, Reply}, State}; |
| 69 |
|
|
| 70 |
|
websocket_info(keepalive, State) -> |
| 71 |
:-( |
erlang:send_after(?PING_INTERVAL, self(), keepalive), |
| 72 |
:-( |
{reply, {ping, <<>>}, State}; |
| 73 |
|
|
| 74 |
|
websocket_info(_Info, State) -> |
| 75 |
:-( |
{ok, State}. |
| 76 |
|
|
| 77 |
|
terminate(_Reason, _Req, _State) -> |
| 78 |
|
%% pg auto-removes the pid from all groups on process exit |
| 79 |
6 |
ok. |
| 80 |
|
|
| 81 |
|
%% =================================================================== |
| 82 |
|
%% Actions |
| 83 |
|
%% =================================================================== |
| 84 |
|
|
| 85 |
|
handle_action(<<"createSubscriptions">>, #{<<"subscriptions">> := Subs}, State) |
| 86 |
|
when is_list(Subs) -> |
| 87 |
2 |
case maps:get(single_key, State, false) of |
| 88 |
|
true -> |
| 89 |
:-( |
{reply, {close, 4405, <<"Single-key connection cannot be modified">>}, State}; |
| 90 |
|
false -> |
| 91 |
2 |
{ResultSubs, Errors, NewState} = process_creates(Subs, State), |
| 92 |
2 |
Reply = simdjson:encode(#{ |
| 93 |
|
<<"event">> => <<"subscriptionsCreated">>, |
| 94 |
|
<<"subscriptions">> => ResultSubs, |
| 95 |
|
<<"errors">> => Errors |
| 96 |
|
}), |
| 97 |
2 |
{reply, {text, Reply}, NewState} |
| 98 |
|
end; |
| 99 |
|
|
| 100 |
|
handle_action(<<"deleteSubscriptions">>, #{<<"subscriptions">> := Subs}, State) |
| 101 |
|
when is_list(Subs) -> |
| 102 |
1 |
case maps:get(single_key, State, false) of |
| 103 |
|
true -> |
| 104 |
:-( |
{reply, {close, 4405, <<"Single-key connection cannot be modified">>}, State}; |
| 105 |
|
false -> |
| 106 |
1 |
NewState = process_deletes(Subs, State), |
| 107 |
1 |
Reply = simdjson:encode(#{ |
| 108 |
|
<<"event">> => <<"subscriptionsDeleted">> |
| 109 |
|
}), |
| 110 |
1 |
{reply, {text, Reply}, NewState} |
| 111 |
|
end; |
| 112 |
|
|
| 113 |
|
handle_action(_, _, State) -> |
| 114 |
:-( |
{reply, {close, 4400, <<"Unknown action">>}, State}. |
| 115 |
|
|
| 116 |
|
%% =================================================================== |
| 117 |
|
%% Subscription management |
| 118 |
|
%% =================================================================== |
| 119 |
|
|
| 120 |
|
process_creates(Subs, State) -> |
| 121 |
2 |
lists:foldl(fun(Sub, {AccSubs, AccErrs, AccState}) -> |
| 122 |
2 |
Key = maps:get(<<"apiKey">>, Sub, undefined), |
| 123 |
2 |
ReqTopics = maps:get(<<"topics">>, Sub, undefined), |
| 124 |
2 |
case Key of |
| 125 |
|
undefined -> |
| 126 |
:-( |
{AccSubs, AccErrs, AccState}; |
| 127 |
|
_ -> |
| 128 |
2 |
case shurbej_auth:verify(Key) of |
| 129 |
|
{ok, UserId} -> |
| 130 |
2 |
Allowed = allowed_topics(UserId), |
| 131 |
2 |
Topics = case ReqTopics of |
| 132 |
2 |
undefined -> Allowed; |
| 133 |
|
T when is_list(T) -> |
| 134 |
:-( |
[Topic || Topic <- T, lists:member(Topic, Allowed)] |
| 135 |
|
end, |
| 136 |
2 |
lists:foreach(fun(Topic) -> |
| 137 |
2 |
pg:join(?SCOPE, Topic, self()) |
| 138 |
|
end, Topics), |
| 139 |
2 |
SubsMap = maps:get(subscriptions, AccState, #{}), |
| 140 |
2 |
Existing = maps:get(Key, SubsMap, []), |
| 141 |
2 |
Merged = lists:usort(Existing ++ Topics), |
| 142 |
2 |
NewSubsMap = SubsMap#{Key => Merged}, |
| 143 |
2 |
Entry = #{<<"apiKey">> => Key, <<"topics">> => Merged}, |
| 144 |
2 |
{[Entry | AccSubs], AccErrs, |
| 145 |
|
AccState#{subscriptions => NewSubsMap}}; |
| 146 |
|
{error, _} -> |
| 147 |
:-( |
Err = #{<<"apiKey">> => Key, |
| 148 |
|
<<"error">> => <<"Invalid API key">>}, |
| 149 |
:-( |
{AccSubs, [Err | AccErrs], AccState} |
| 150 |
|
end |
| 151 |
|
end |
| 152 |
|
end, {[], [], State}, Subs). |
| 153 |
|
|
| 154 |
|
process_deletes(Subs, State) -> |
| 155 |
1 |
lists:foldl(fun(Sub, AccState) -> |
| 156 |
1 |
Key = maps:get(<<"apiKey">>, Sub, undefined), |
| 157 |
1 |
Topic = maps:get(<<"topic">>, Sub, undefined), |
| 158 |
1 |
SubsMap = maps:get(subscriptions, AccState, #{}), |
| 159 |
1 |
case Key of |
| 160 |
:-( |
undefined -> AccState; |
| 161 |
|
_ -> |
| 162 |
1 |
OldTopics = maps:get(Key, SubsMap, []), |
| 163 |
1 |
ToRemove = case Topic of |
| 164 |
1 |
undefined -> OldTopics; |
| 165 |
:-( |
T -> [T] |
| 166 |
|
end, |
| 167 |
1 |
lists:foreach(fun(T) -> |
| 168 |
1 |
pg:leave(?SCOPE, T, self()) |
| 169 |
|
end, ToRemove), |
| 170 |
1 |
NewTopics = OldTopics -- ToRemove, |
| 171 |
1 |
NewSubsMap = case NewTopics of |
| 172 |
1 |
[] -> maps:remove(Key, SubsMap); |
| 173 |
:-( |
_ -> SubsMap#{Key => NewTopics} |
| 174 |
|
end, |
| 175 |
1 |
AccState#{subscriptions => NewSubsMap} |
| 176 |
|
end |
| 177 |
|
end, State, Subs). |
| 178 |
|
|
| 179 |
|
%% =================================================================== |
| 180 |
|
%% Internal |
| 181 |
|
%% =================================================================== |
| 182 |
|
|
| 183 |
|
%% Topics the given user is allowed to subscribe to: |
| 184 |
|
%% their own /users/:id plus /groups/:id for every group they belong to. |
| 185 |
|
allowed_topics(UserId) -> |
| 186 |
4 |
UserTopic = <<"/users/", (integer_to_binary(UserId))/binary>>, |
| 187 |
4 |
GroupTopics = [group_topic(GroupId) |
| 188 |
|
|| #shurbej_group_member{id = {GroupId, _}} |
| 189 |
4 |
<- shurbej_db:list_user_groups(UserId)], |
| 190 |
4 |
[UserTopic | GroupTopics]. |
| 191 |
|
|
| 192 |
|
group_topic(GroupId) -> |
| 193 |
:-( |
<<"/groups/", (integer_to_binary(GroupId))/binary>>. |