"Fossies" - the Fresh Open Source Software Archive

Member "jitsi-meet-7307/resources/prosody-plugins/mod_muc_rate_limit.lua" (30 May 2023, 5570 Bytes) of package /linux/misc/jitsi-meet-7307.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Lua source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. See also the last Fossies "Diffs" side-by-side code changes report for "mod_muc_rate_limit.lua": jitsi-meet_8319_vs_jitsi-meet_8615.

    1 -- enable under the main muc component
    2 
    3 local queue = require "util.queue";
    4 local new_throttle = require "util.throttle".create;
    5 local timer = require "util.timer";
    6 
    7 -- we max to 500 participants per meeting so this should be enough, we are not suppose to handle all
    8 -- participants in one meeting
    9 local PRESENCE_QUEUE_MAX_SIZE = 1000;
   10 
   11 -- default to 5 participants per second
   12 local join_rate_per_conference = module:get_option_number("muc_rate_joins", 5);
   13 
   14 -- Measure/monitor the room rate limiting queue
   15 local measure = require "core.statsmanager".measure;
   16 local measure_longest_queue = measure("distribution",
   17         "/mod_" .. module.name .. "/longest_queue");
   18 local measure_rooms_with_queue = measure("rate",
   19         "/mod_" .. module.name .. "/rooms_with_queue");
   20 
   21 -- throws a stat that the queue was full, counts the total number of times we hit it
   22 local measure_full_queue = measure("rate",
   23         "/mod_" .. module.name .. "/full_queue");
   24 
   25 -- keeps track of the total times we had an error processing the queue
   26 local measure_errors_processing_queue = measure("rate",
   27         "/mod_" .. module.name .. "/errors_processing_queue");
   28 
   29 -- we keep track here what was the longest queue we have seen
   30 local stat_longest_queue = 0;
   31 
   32 -- Adds item to the queue
   33 -- @returns false if queue is full and item was not added, true otherwise
   34 local function add_item_to_queue(joining_queue, item, room, from)
   35     if not joining_queue:push(item) then
   36         module:log('error', 'Error pushing presence in queue for %s in %s', from, room.jid);
   37 
   38         measure_full_queue();
   39         return false;
   40     else
   41         -- check is this the longest queue and if so throws a stat
   42         if joining_queue:count() > stat_longest_queue then
   43             stat_longest_queue = joining_queue:count();
   44             measure_longest_queue(stat_longest_queue);
   45         end
   46 
   47         return true;
   48     end
   49 end
   50 
   51 -- process join_rate_presence_queue in the room and pops element passing them to handle_normal_presence
   52 -- returns 1 if we want to reschedule it after 1 second
   53 local function timer_process_queue_elements (room)
   54     local presence_queue = room.join_rate_presence_queue;
   55 
   56     if not presence_queue or presence_queue:count() == 0 then
   57         return;
   58     end
   59 
   60     for _ = 1, join_rate_per_conference do
   61         local ev = presence_queue:pop();
   62         if ev then
   63             -- we mark what we pass here so we can skip it on the next muc-occupant-pre-join event
   64             ev.stanza.delayed_join_skip = true;
   65             room:handle_normal_presence(ev.origin, ev.stanza);
   66         end
   67     end
   68 
   69     -- if there are elements left, schedule an execution in a second
   70     if presence_queue:count() > 0 then
   71         return 1;
   72     else
   73         room.join_rate_queue_timer = false;
   74     end
   75 end
   76 
   77 -- we check join rate before occupant joins. If rate is exceeded we queue the events and start a timer
   78 -- that will run every second processing the events passing them to the room handling function handle_normal_presence
   79 -- from where those arrived, this way we keep a maximum rate of joining
   80 module:hook("muc-occupant-pre-join", function (event)
   81     local room, stanza = event.room, event.stanza;
   82 
   83     -- skipping events we had produced and clear our flag
   84     if stanza.delayed_join_skip == true then
   85         event.stanza.delayed_join_skip = nil;
   86         return nil;
   87     end
   88 
   89     local throttle = room.join_rate_throttle;
   90     if not room.join_rate_throttle then
   91         throttle = new_throttle(join_rate_per_conference, 1); -- rate per one second
   92         room.join_rate_throttle = throttle;
   93     end
   94 
   95     if not throttle:poll(1) then
   96         if not room.join_rate_presence_queue then
   97             -- if this is the first item for a room we increment the stat for rooms with queues
   98             measure_rooms_with_queue();
   99             room.join_rate_presence_queue = queue.new(PRESENCE_QUEUE_MAX_SIZE);
  100         end
  101 
  102         if not add_item_to_queue(room.join_rate_presence_queue, event, room, stanza.attr.from) then
  103             -- let's not stop processing the event
  104             return nil;
  105         end
  106 
  107         if not room.join_rate_queue_timer then
  108             timer.add_task(1, function ()
  109                 local status, result = pcall(timer_process_queue_elements, room);
  110                 if not status then
  111                     -- there was an error in the timer function
  112                     module:log('error', 'Error processing queue: %s', result);
  113 
  114                     measure_errors_processing_queue();
  115 
  116                     -- let's re-schedule timer so we do not lose the queue
  117                     return 1;
  118                 end
  119 
  120                 return result;
  121             end);
  122             room.join_rate_queue_timer = true;
  123         end
  124 
  125         return true; -- we stop execution, so we do not process this join at the moment
  126     end
  127 
  128     if room.join_rate_queue_timer then
  129         -- there is timer so we need to order the presences, put it in the queue
  130 
  131         -- if add fails as queue is full we return false and the event will continue processing, we risk re-order
  132         -- but not losing it
  133         return add_item_to_queue(room.join_rate_presence_queue, event, room, stanza.attr.from);
  134     end
  135 
  136 end, 9); -- as we will rate limit joins we need to be the first to execute
  137          -- we ran it after muc_max_occupants which is with priority 10, there is nothing to rate limit
  138          -- if max number of occupants is reached
  139 
  140 -- clear queue on room destroy so timer will skip next run if any
  141 module:hook('muc-room-destroyed',function(event)
  142     event.room.join_rate_presence_queue = nil;
  143 end);