"Fossies" - the Fresh Open Source Software Archive 
Member "jitsi-meet-7307/resources/prosody-plugins/mod_muc_rate_limit.lua" (30 May 2023, 5570 Bytes) of package /linux/misc/jitsi-meet-7307.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Lua source code syntax highlighting (style:
standard) with prefixed line numbers and
code folding option.
Alternatively you can here
view or
download the uninterpreted source code file.
See also the last
Fossies "Diffs" side-by-side code changes report for "mod_muc_rate_limit.lua":
jitsi-meet_8319_vs_jitsi-meet_8615.
1 -- enable under the main muc component
2
3 local queue = require "util.queue";
4 local new_throttle = require "util.throttle".create;
5 local timer = require "util.timer";
6
7 -- we max to 500 participants per meeting so this should be enough, we are not suppose to handle all
8 -- participants in one meeting
9 local PRESENCE_QUEUE_MAX_SIZE = 1000;
10
11 -- default to 5 participants per second
12 local join_rate_per_conference = module:get_option_number("muc_rate_joins", 5);
13
14 -- Measure/monitor the room rate limiting queue
15 local measure = require "core.statsmanager".measure;
16 local measure_longest_queue = measure("distribution",
17 "/mod_" .. module.name .. "/longest_queue");
18 local measure_rooms_with_queue = measure("rate",
19 "/mod_" .. module.name .. "/rooms_with_queue");
20
21 -- throws a stat that the queue was full, counts the total number of times we hit it
22 local measure_full_queue = measure("rate",
23 "/mod_" .. module.name .. "/full_queue");
24
25 -- keeps track of the total times we had an error processing the queue
26 local measure_errors_processing_queue = measure("rate",
27 "/mod_" .. module.name .. "/errors_processing_queue");
28
29 -- we keep track here what was the longest queue we have seen
30 local stat_longest_queue = 0;
31
32 -- Adds item to the queue
33 -- @returns false if queue is full and item was not added, true otherwise
34 local function add_item_to_queue(joining_queue, item, room, from)
35 if not joining_queue:push(item) then
36 module:log('error', 'Error pushing presence in queue for %s in %s', from, room.jid);
37
38 measure_full_queue();
39 return false;
40 else
41 -- check is this the longest queue and if so throws a stat
42 if joining_queue:count() > stat_longest_queue then
43 stat_longest_queue = joining_queue:count();
44 measure_longest_queue(stat_longest_queue);
45 end
46
47 return true;
48 end
49 end
50
51 -- process join_rate_presence_queue in the room and pops element passing them to handle_normal_presence
52 -- returns 1 if we want to reschedule it after 1 second
53 local function timer_process_queue_elements (room)
54 local presence_queue = room.join_rate_presence_queue;
55
56 if not presence_queue or presence_queue:count() == 0 then
57 return;
58 end
59
60 for _ = 1, join_rate_per_conference do
61 local ev = presence_queue:pop();
62 if ev then
63 -- we mark what we pass here so we can skip it on the next muc-occupant-pre-join event
64 ev.stanza.delayed_join_skip = true;
65 room:handle_normal_presence(ev.origin, ev.stanza);
66 end
67 end
68
69 -- if there are elements left, schedule an execution in a second
70 if presence_queue:count() > 0 then
71 return 1;
72 else
73 room.join_rate_queue_timer = false;
74 end
75 end
76
77 -- we check join rate before occupant joins. If rate is exceeded we queue the events and start a timer
78 -- that will run every second processing the events passing them to the room handling function handle_normal_presence
79 -- from where those arrived, this way we keep a maximum rate of joining
80 module:hook("muc-occupant-pre-join", function (event)
81 local room, stanza = event.room, event.stanza;
82
83 -- skipping events we had produced and clear our flag
84 if stanza.delayed_join_skip == true then
85 event.stanza.delayed_join_skip = nil;
86 return nil;
87 end
88
89 local throttle = room.join_rate_throttle;
90 if not room.join_rate_throttle then
91 throttle = new_throttle(join_rate_per_conference, 1); -- rate per one second
92 room.join_rate_throttle = throttle;
93 end
94
95 if not throttle:poll(1) then
96 if not room.join_rate_presence_queue then
97 -- if this is the first item for a room we increment the stat for rooms with queues
98 measure_rooms_with_queue();
99 room.join_rate_presence_queue = queue.new(PRESENCE_QUEUE_MAX_SIZE);
100 end
101
102 if not add_item_to_queue(room.join_rate_presence_queue, event, room, stanza.attr.from) then
103 -- let's not stop processing the event
104 return nil;
105 end
106
107 if not room.join_rate_queue_timer then
108 timer.add_task(1, function ()
109 local status, result = pcall(timer_process_queue_elements, room);
110 if not status then
111 -- there was an error in the timer function
112 module:log('error', 'Error processing queue: %s', result);
113
114 measure_errors_processing_queue();
115
116 -- let's re-schedule timer so we do not lose the queue
117 return 1;
118 end
119
120 return result;
121 end);
122 room.join_rate_queue_timer = true;
123 end
124
125 return true; -- we stop execution, so we do not process this join at the moment
126 end
127
128 if room.join_rate_queue_timer then
129 -- there is timer so we need to order the presences, put it in the queue
130
131 -- if add fails as queue is full we return false and the event will continue processing, we risk re-order
132 -- but not losing it
133 return add_item_to_queue(room.join_rate_presence_queue, event, room, stanza.attr.from);
134 end
135
136 end, 9); -- as we will rate limit joins we need to be the first to execute
137 -- we ran it after muc_max_occupants which is with priority 10, there is nothing to rate limit
138 -- if max number of occupants is reached
139
140 -- clear queue on room destroy so timer will skip next run if any
141 module:hook('muc-room-destroyed',function(event)
142 event.room.join_rate_presence_queue = nil;
143 end);