/__w/shurbej/shurbej/_build/test/cover/aggregate/shurbej_ws_stream.html

1 -module(shurbej_ws_stream).
2
3 %% Zotero Streaming API — WebSocket handler.
4 %% Clients subscribe to library topics and receive topicUpdated events
5 %% when library versions change. Uses pg (process groups) for pub/sub.
6
7 -include_lib("shurbej_store/include/shurbej_records.hrl").
8
9 -export([init/2, websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]).
10
11 -define(SCOPE, shurbej_stream).
12 -define(RETRY_MS, 10000).
13 -define(PING_INTERVAL, 25000).
14
15 init(Req, State) ->
16 6 ApiKey = shurbej_http_common:extract_api_key(Req),
17 6 {cowboy_websocket, Req, State#{api_key => ApiKey}, #{
18 idle_timeout => 120000,
19 max_frame_size => 1048576
20 }}.
21
22 websocket_init(#{api_key := ApiKey} = State) ->
23 6 erlang:send_after(?PING_INTERVAL, self(), keepalive),
24 6 case ApiKey of
25 undefined ->
26 %% Multi-key connection — no auto-subscribe
27 3 Reply = simdjson:encode(#{<<"event">> => <<"connected">>, <<"retry">> => ?RETRY_MS}),
28 3 {reply, {text, Reply}, State#{subscriptions => #{}, single_key => false}};
29 Key ->
30 %% Single-key connection — auto-subscribe to user's library + groups
31 3 case shurbej_auth:verify(Key) of
32 {ok, UserId} ->
33 2 Topics = allowed_topics(UserId),
34 2 [pg:join(?SCOPE, T, self()) || T <- Topics],
35 2 Reply = simdjson:encode(#{
36 <<"event">> => <<"connected">>,
37 <<"retry">> => ?RETRY_MS,
38 <<"topics">> => Topics
39 }),
40 2 {reply, {text, Reply},
41 State#{subscriptions => #{Key => Topics}, single_key => true}};
42 {error, _} ->
43 1 {reply, {close, 4403, <<"Forbidden">>}, State}
44 end
45 end.
46
47 %% Client messages
48 websocket_handle({text, Msg}, State) ->
49 4 Decoded = try simdjson:decode(Msg)
50 1 catch error:_ -> invalid
51 end,
52 4 case Decoded of
53 #{<<"action">> := Action} = Payload ->
54 3 handle_action(Action, Payload, State);
55 _ ->
56 1 {reply, {close, 4400, <<"Bad request">>}, State}
57 end;
58 websocket_handle(_Frame, State) ->
59
:-(
{ok, State}.
60
61 %% Topic updates from pg
62 websocket_info({topic_updated, Topic, Version}, State) ->
63 1 Reply = simdjson:encode(#{
64 <<"event">> => <<"topicUpdated">>,
65 <<"topic">> => Topic,
66 <<"version">> => Version
67 }),
68 1 {reply, {text, Reply}, State};
69
70 websocket_info(keepalive, State) ->
71
:-(
erlang:send_after(?PING_INTERVAL, self(), keepalive),
72
:-(
{reply, {ping, <<>>}, State};
73
74 websocket_info(_Info, State) ->
75
:-(
{ok, State}.
76
77 terminate(_Reason, _Req, _State) ->
78 %% pg auto-removes the pid from all groups on process exit
79 6 ok.
80
81 %% ===================================================================
82 %% Actions
83 %% ===================================================================
84
85 handle_action(<<"createSubscriptions">>, #{<<"subscriptions">> := Subs}, State)
86 when is_list(Subs) ->
87 2 case maps:get(single_key, State, false) of
88 true ->
89
:-(
{reply, {close, 4405, <<"Single-key connection cannot be modified">>}, State};
90 false ->
91 2 {ResultSubs, Errors, NewState} = process_creates(Subs, State),
92 2 Reply = simdjson:encode(#{
93 <<"event">> => <<"subscriptionsCreated">>,
94 <<"subscriptions">> => ResultSubs,
95 <<"errors">> => Errors
96 }),
97 2 {reply, {text, Reply}, NewState}
98 end;
99
100 handle_action(<<"deleteSubscriptions">>, #{<<"subscriptions">> := Subs}, State)
101 when is_list(Subs) ->
102 1 case maps:get(single_key, State, false) of
103 true ->
104
:-(
{reply, {close, 4405, <<"Single-key connection cannot be modified">>}, State};
105 false ->
106 1 NewState = process_deletes(Subs, State),
107 1 Reply = simdjson:encode(#{
108 <<"event">> => <<"subscriptionsDeleted">>
109 }),
110 1 {reply, {text, Reply}, NewState}
111 end;
112
113 handle_action(_, _, State) ->
114
:-(
{reply, {close, 4400, <<"Unknown action">>}, State}.
115
116 %% ===================================================================
117 %% Subscription management
118 %% ===================================================================
119
120 process_creates(Subs, State) ->
121 2 lists:foldl(fun(Sub, {AccSubs, AccErrs, AccState}) ->
122 2 Key = maps:get(<<"apiKey">>, Sub, undefined),
123 2 ReqTopics = maps:get(<<"topics">>, Sub, undefined),
124 2 case Key of
125 undefined ->
126
:-(
{AccSubs, AccErrs, AccState};
127 _ ->
128 2 case shurbej_auth:verify(Key) of
129 {ok, UserId} ->
130 2 Allowed = allowed_topics(UserId),
131 2 Topics = case ReqTopics of
132 2 undefined -> Allowed;
133 T when is_list(T) ->
134
:-(
[Topic || Topic <- T, lists:member(Topic, Allowed)]
135 end,
136 2 lists:foreach(fun(Topic) ->
137 2 pg:join(?SCOPE, Topic, self())
138 end, Topics),
139 2 SubsMap = maps:get(subscriptions, AccState, #{}),
140 2 Existing = maps:get(Key, SubsMap, []),
141 2 Merged = lists:usort(Existing ++ Topics),
142 2 NewSubsMap = SubsMap#{Key => Merged},
143 2 Entry = #{<<"apiKey">> => Key, <<"topics">> => Merged},
144 2 {[Entry | AccSubs], AccErrs,
145 AccState#{subscriptions => NewSubsMap}};
146 {error, _} ->
147
:-(
Err = #{<<"apiKey">> => Key,
148 <<"error">> => <<"Invalid API key">>},
149
:-(
{AccSubs, [Err | AccErrs], AccState}
150 end
151 end
152 end, {[], [], State}, Subs).
153
154 process_deletes(Subs, State) ->
155 1 lists:foldl(fun(Sub, AccState) ->
156 1 Key = maps:get(<<"apiKey">>, Sub, undefined),
157 1 Topic = maps:get(<<"topic">>, Sub, undefined),
158 1 SubsMap = maps:get(subscriptions, AccState, #{}),
159 1 case Key of
160
:-(
undefined -> AccState;
161 _ ->
162 1 OldTopics = maps:get(Key, SubsMap, []),
163 1 ToRemove = case Topic of
164 1 undefined -> OldTopics;
165
:-(
T -> [T]
166 end,
167 1 lists:foreach(fun(T) ->
168 1 pg:leave(?SCOPE, T, self())
169 end, ToRemove),
170 1 NewTopics = OldTopics -- ToRemove,
171 1 NewSubsMap = case NewTopics of
172 1 [] -> maps:remove(Key, SubsMap);
173
:-(
_ -> SubsMap#{Key => NewTopics}
174 end,
175 1 AccState#{subscriptions => NewSubsMap}
176 end
177 end, State, Subs).
178
179 %% ===================================================================
180 %% Internal
181 %% ===================================================================
182
183 %% Topics the given user is allowed to subscribe to:
184 %% their own /users/:id plus /groups/:id for every group they belong to.
185 allowed_topics(UserId) ->
186 4 UserTopic = <<"/users/", (integer_to_binary(UserId))/binary>>,
187 4 GroupTopics = [group_topic(GroupId)
188 || #shurbej_group_member{id = {GroupId, _}}
189 4 <- shurbej_db:list_user_groups(UserId)],
190 4 [UserTopic | GroupTopics].
191
192 group_topic(GroupId) ->
193
:-(
<<"/groups/", (integer_to_binary(GroupId))/binary>>.
Line Hits Source