"Fossies" - the Fresh Open Source Software Archive

Member "jitsi-meet-4422/resources/prosody-plugins/mod_jibri_queue_component.lua" (21 Sep 2020, 20049 Bytes) of package /linux/misc/jitsi-meet-4422.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Lua source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file.

    1 local st = require "util.stanza";
    2 local jid = require "util.jid";
    3 local http = require "net.http";
    4 local json = require "cjson";
    5 local inspect = require('inspect');
    6 local socket = require "socket";
    7 local uuid_gen = require "util.uuid".generate;
    8 local jwt = require "luajwtjitsi";
    9 local it = require "util.iterators";
   10 local neturl = require "net.url";
   11 local parse = neturl.parseQuery;
   12 
   13 local get_room_from_jid = module:require "util".get_room_from_jid;
   14 local room_jid_match_rewrite = module:require "util".room_jid_match_rewrite;
   15 local is_healthcheck_room = module:require "util".is_healthcheck_room;
   16 local room_jid_split_subdomain = module:require "util".room_jid_split_subdomain;
   17 local internal_room_jid_match_rewrite = module:require "util".internal_room_jid_match_rewrite;
   18 local async_handler_wrapper = module:require "util".async_handler_wrapper;
   19 
   20 -- this basically strips the domain from the conference.domain address
   21 local parentHostName = string.gmatch(tostring(module.host), "%w+.(%w.+)")();
   22 if parentHostName == nil then
   23     log("error", "Failed to start - unable to get parent hostname");
   24     return;
   25 end
   26 
   27 local parentCtx = module:context(parentHostName);
   28 if parentCtx == nil then
   29     log("error",
   30         "Failed to start - unable to get parent context for host: %s",
   31         tostring(parentHostName));
   32     return;
   33 end
   34 local token_util = module:require "token/util".new(parentCtx);
   35 
   36 local ASAPKeyServer;
   37 local ASAPKeyPath;
   38 local ASAPKeyId;
   39 local ASAPIssuer;
   40 local ASAPAudience;
   41 local ASAPAcceptedIssuers;
   42 local ASAPAcceptedAudiences;
   43 local ASAPTTL;
   44 local ASAPTTL_THRESHOLD;
   45 local ASAPKey;
   46 local JibriRegion;
   47 local disableTokenVerification;
   48 local muc_component_host;
   49 local external_api_url;
   50 local jwtKeyCacheSize;
   51 local jwtKeyCache;
   52 
   53 local function load_config()
   54     ASAPKeyServer = module:get_option_string("asap_key_server");
   55 
   56     if ASAPKeyServer then
   57         module:log("debug", "ASAP Public Key URL %s", ASAPKeyServer);
   58         token_util:set_asap_key_server(ASAPKeyServer);
   59     end
   60 
   61     ASAPKeyPath
   62         = module:get_option_string("asap_key_path", '/etc/prosody/certs/asap.key');
   63 
   64     ASAPKeyId
   65         = module:get_option_string("asap_key_id", 'jitsi');        
   66 
   67     ASAPIssuer
   68         = module:get_option_string("asap_issuer", 'jitsi');
   69 
   70     ASAPAudience
   71         = module:get_option_string("asap_audience", 'jibri-queue');
   72 
   73     ASAPAcceptedIssuers
   74         = module:get_option_array('asap_accepted_issuers',{'jibri-queue'});
   75     module:log("debug", "ASAP Accepted Issuers %s", ASAPAcceptedIssuers);
   76     token_util:set_asap_accepted_issuers(ASAPAcceptedIssuers);
   77 
   78     ASAPAcceptedAudiences 
   79         = module:get_option_array('asap_accepted_audiences',{'*'});
   80     module:log("debug", "ASAP Accepted Audiences %s", ASAPAcceptedAudiences);
   81     token_util:set_asap_accepted_audiences(ASAPAcceptedAudiences);
   82 
   83     -- do not require room to be set on tokens for jibri queue
   84     token_util:set_asap_require_room_claim(false);
   85 
   86     ASAPTTL
   87         = module:get_option_number("asap_ttl", 3600);
   88  
   89     ASAPTTL_THRESHOLD
   90         = module:get_option_number("asap_ttl_threshold", 600);
   91 
   92     queueServiceURL
   93         = module:get_option_string("jibri_queue_url");
   94 
   95     JibriRegion
   96         = module:get_option_string("jibri_region", 'default');
   97 
   98     -- option to enable/disable token verifications
   99     disableTokenVerification
  100         = module:get_option_boolean("disable_jibri_queue_token_verification", false);
  101 
  102     muc_component_host 
  103         = module:get_option_string("muc_component");
  104 
  105     external_api_url = module:get_option_string("external_api_url",tostring(parentHostName));
  106     module:log("debug", "External advertised API URL", external_api_url);
  107 
  108 
  109     -- TODO: Figure out a less arbitrary default cache size.
  110     jwtKeyCacheSize 
  111         = module:get_option_number("jwt_pubkey_cache_size", 128);
  112     jwtKeyCache = require"util.cache".new(jwtKeyCacheSize);
  113 
  114     if queueServiceURL == nil then
  115         log("error", "No jibri_queue_url specified. No service to contact!");
  116         return;
  117     end
  118 
  119     if muc_component_host == nil then
  120         log("error", "No muc_component specified. No muc to operate on for jibri queue!");
  121         return;
  122     end
  123 
  124     -- Read ASAP key once on module startup
  125     local f = io.open(ASAPKeyPath, "r");
  126     if f then
  127         ASAPKey = f:read("*all");
  128         f:close();
  129         if not ASAPKey then
  130             module:log("warn", "No ASAP Key read from %s, disabling jibri queue component plugin", ASAPKeyPath);
  131             return
  132         end
  133     else
  134         module:log("warn", "Error reading ASAP Key %s, disabling jibri queue component plugin", ASAPKeyPath);
  135         return
  136     end
  137 
  138     return true;
  139 end
  140 
  141 local function reload_config()
  142     module:log("info", "Reloading configuration for jibri queue component");
  143     local config_success = load_config();
  144 
  145     -- clear ASAP public key cache on config reload
  146     token_util:clear_asap_cache();
  147 
  148     if not config_success then
  149         log("error", "Unsuccessful reconfiguration, jibri queue component may misbehave");
  150     end
  151 end
  152 
  153 local config_success = load_config();
  154 
  155 if not config_success then
  156     log("error", "Unsuccessful configuration step, jibri queue component disabled")
  157     return;
  158 end
  159 
  160 
  161 local http_headers = {
  162     ["User-Agent"] = "Prosody ("..prosody.version.."; "..prosody.platform..")",
  163     ["Content-Type"] = "application/json"
  164 };
  165 
  166 -- we use async to detect Prosody 0.10 and earlier
  167 local have_async = pcall(require, "util.async");
  168 if not have_async then
  169     module:log("warn", "conference duration will not work with Prosody version 0.10 or less.");
  170     return;
  171 end
  172 
  173 
  174 log("info", "Starting jibri queue handling for %s", muc_component_host);
  175 
  176 local function round(num, numDecimalPlaces)
  177     local mult = 10^(numDecimalPlaces or 0)
  178     return math.floor(num * mult + 0.5) / mult
  179 end
  180       
  181 local function generateToken(audience)
  182     audience = audience or ASAPAudience
  183     local t = os.time()
  184     local err
  185     local exp_key = 'asap_exp.'..audience
  186     local token_key = 'asap_token.'..audience
  187     local exp = jwtKeyCache:get(exp_key)
  188     local token = jwtKeyCache:get(token_key)
  189 
  190     --if we find a token and it isn't too far from expiry, then use it
  191     if token ~= nil and exp ~= nil then
  192         exp = tonumber(exp)
  193         if (exp - t) > ASAPTTL_THRESHOLD then
  194             return token
  195         end
  196     end
  197 
  198     --expiry is the current time plus TTL
  199     exp = t + ASAPTTL
  200     local payload = {
  201         iss = ASAPIssuer,
  202         aud = audience,
  203         nbf = t,
  204         exp = exp,
  205     }
  206 
  207     -- encode
  208     local alg = "RS256"
  209     token, err = jwt.encode(payload, ASAPKey, alg, {kid = ASAPKeyId})
  210     if not err then
  211         token = 'Bearer '..token
  212         jwtKeyCache:set(exp_key,exp)
  213         jwtKeyCache:set(token_key,token)
  214         return token
  215     else
  216         return ''
  217     end
  218 end
  219 
  220 local function sendIq(participant,action,requestId,time,position,token)
  221     local iqId = uuid_gen();
  222     local from = module:get_host();
  223     local outStanza = st.iq({type = 'set', from = from, to = participant, id = iqId}):tag("jibri-queue", 
  224        { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId, action = action });
  225 
  226     if token then
  227         outStanza:tag("token"):text(token):up()
  228     end
  229     if time then
  230         outStanza:tag("time"):text(tostring(time)):up()
  231     end
  232     if position then
  233         outStanza:tag("position"):text(tostring(position)):up()
  234     end
  235 
  236     module:send(outStanza);
  237 end
  238 
  239 local function cb(content_, code_, response_, request_)
  240     if code_ == 200 or code_ == 204 then
  241         module:log("debug", "URL Callback: Code %s, Content %s, Request (host %s, path %s, body %s), Response: %s",
  242                 code_, content_, request_.host, request_.path, inspect(request_.body), inspect(response_));
  243     else
  244         module:log("warn", "URL Callback non successful: Code %s, Content %s, Request (%s), Response: %s",
  245                 code_, content_, inspect(request_), inspect(response_));
  246     end
  247 end
  248 
  249 local function sendEvent(type,room_address,participant,requestId,replyIq,replyError)
  250     local event_ts = round(socket.gettime()*1000);
  251     local node, host, resource, target_subdomain = room_jid_split_subdomain(room_address);
  252     local room_param = '';
  253     if target_subdomain then
  254         room_param = target_subdomain..'/'..node;
  255     else
  256         room_param = node;
  257     end
  258 
  259     local out_event = {
  260         ["conference"] = room_address,
  261         ["roomParam"] = room_param,
  262         ["eventType"] = type,
  263         ["participant"] = participant,
  264         ["externalApiUrl"] = external_api_url.."/jibriqueue/update",
  265         ["requestId"] = requestId,
  266         ["region"] = JibriRegion,
  267     }
  268     module:log("debug","Sending event %s",inspect(out_event));
  269 
  270     local headers = http_headers or {}
  271     headers['Authorization'] = generateToken()
  272 
  273     module:log("debug","Sending headers %s",inspect(headers));
  274     local requestURL = queueServiceURL.."/job/recording"
  275     if type=="LeaveQueue" then
  276         requestURL = requestURL .."/cancel"
  277     end
  278     local request = http.request(requestURL, {
  279         headers = headers,
  280         method = "POST",
  281         body = json.encode(out_event)
  282     }, function (content_, code_, response_, request_)
  283         if code_ == 200 or code_ == 204 then
  284             module:log("debug", "URL Callback: Code %s, Content %s, Request (host %s, path %s, body %s), Response: %s",
  285                     code_, content_, request_.host, request_.path, inspect(request_.body), inspect(response_));
  286             if (replyIq) then
  287                 module:log("debug", "sending reply IQ %s",inspect(replyIq));
  288                 module:send(replyIq);
  289             end
  290         else
  291             module:log("warn", "URL Callback non successful: Code %s, Content %s, Request (%s), Response: %s",
  292                     code_, content_, inspect(request_), inspect(response_));
  293             if (replyError) then
  294                 module:log("warn", "sending reply error IQ %s",inspect(replyError));
  295                 module:send(replyError);
  296             end
  297         end
  298     end);
  299 end
  300 
  301 function clearRoomQueueByOccupant(room, occupant)
  302     room.jibriQueue[occupant.jid] = nil;
  303 end
  304 
  305 function addRoomQueueByOccupant(room, occupant, requestId)
  306     room.jibriQueue[occupant.jid] = requestId;
  307 end
  308 
  309 -- receives iq from client currently connected to the room
  310 function on_iq(event)
  311     local requestId;
  312     -- Check the type of the incoming stanza to avoid loops:
  313     if event.stanza.attr.type == "error" then
  314         return; -- We do not want to reply to these, so leave.
  315     end
  316     if event.stanza.attr.to == module:get_host() then
  317         if event.stanza.attr.type == "set" then
  318             local reply = st.reply(event.stanza);
  319             local replyError = st.error_reply(event.stanza,'cancel','internal-server-error',"Queue Server Error");
  320 
  321             local jibriQueue
  322                 = event.stanza:get_child('jibri-queue', 'http://jitsi.org/protocol/jibri-queue');
  323             if jibriQueue then
  324                 module:log("debug", "Received Jibri Queue Request: %s ",inspect(jibriQueue));
  325 
  326                 local roomAddress = jibriQueue.attr.room;
  327                 local room = get_room_from_jid(room_jid_match_rewrite(roomAddress));
  328 
  329                 if not room then
  330                     module:log("warn", "No room found %s", roomAddress);
  331                     return false;
  332                 end
  333 
  334                 local from = event.stanza.attr.from;
  335 
  336                 local occupant = room:get_occupant_by_real_jid(from);
  337                 if not occupant then
  338                     module:log("warn", "No occupant %s found for %s", from, roomAddress);
  339                     return false;
  340                 end
  341 
  342                 local action = jibriQueue.attr.action;
  343                 if action == 'join' then
  344                     -- join action, so send event out
  345                     requestId = uuid_gen();
  346                     module:log("debug","Received join queue request for jid %s occupant %s requestId %s",roomAddress,occupant.jid,requestId);
  347 
  348                     -- now handle new jibri queue message
  349                     addRoomQueueByOccupant(room, occupant, requestId);
  350                     reply:add_child(st.stanza("jibri-queue", { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId})):up()
  351                     replyError:add_child(st.stanza("jibri-queue", { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId})):up()
  352 
  353                     module:log("debug","Sending JoinQueue event for jid %s occupant %s reply %s",roomAddress,occupant.jid,inspect(reply));
  354                     sendEvent('JoinQueue',roomAddress,occupant.jid,requestId,reply,replyError);
  355                 end
  356                 if action == 'leave' then
  357                     requestId = jibriQueue.attr.requestId;
  358                     module:log("debug","Received leave queue request for jid %s occupant %s requestId %s",roomAddress,occupant.jid,requestId);
  359 
  360                     -- TODO: check that requestId is the same as cached value
  361                     clearRoomQueueByOccupant(room, occupant);
  362                     reply:add_child(st.stanza("jibri-queue", { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId})):up()
  363                     replyError:add_child(st.stanza("jibri-queue", { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId})):up()
  364 
  365                     module:log("debug","Sending LeaveQueue event for jid %s occupant %s reply %s",roomAddress,occupant.jid,inspect(reply));
  366                     sendEvent('LeaveQueue',roomAddress,occupant.jid,requestId,reply,replyError);
  367                 end
  368             else
  369                 module:log("warn","Jibri Queue Stanza missing child %s",inspect(event.stanza))
  370             end
  371         end
  372     end
  373     return true
  374 end
  375 
  376 -- create recorder queue cache for the room
  377 function room_created(event)
  378     local room = event.room;
  379 
  380     if is_healthcheck_room(room.jid) then
  381         return;
  382     end
  383 
  384     room.jibriQueue = {};
  385 end
  386 
  387 -- Conference ended, clear all queue cache jids
  388 function room_destroyed(event)
  389     local room = event.room;
  390 
  391     if is_healthcheck_room(room.jid) then
  392         return;
  393     end
  394     for jid, x in pairs(room.jibriQueue) do
  395         if x then
  396             sendEvent('LeaveQueue',internal_room_jid_match_rewrite(room.jid),jid,x);
  397         end
  398     end
  399 end
  400 
  401 -- Occupant left remove it from the queue if it joined the queue
  402 function occupant_leaving(event)
  403     local room = event.room;
  404 
  405     if is_healthcheck_room(room.jid) then
  406         return;
  407     end
  408 
  409     local occupant = event.occupant;
  410     local requestId = room.jibriQueue[occupant.jid];
  411     -- check if user has cached queue request
  412     if requestId then
  413         -- remove occupant from queue cache, signal backend
  414         room.jibriQueue[occupant.jid] = nil;
  415         sendEvent('LeaveQueue',internal_room_jid_match_rewrite(room.jid),occupant.jid,requestId);
  416     end
  417 end
  418 
  419 module:hook("iq/host", on_iq);
  420 
  421 -- executed on every host added internally in prosody, including components
  422 function process_host(host)
  423     if host == muc_component_host then -- the conference muc component
  424         module:log("debug","Hook to muc events on %s", host);
  425 
  426         local muc_module = module:context(host);
  427         muc_module:hook("muc-room-created", room_created, -1);
  428         -- muc_module:hook("muc-occupant-joined", occupant_joined, -1);
  429         muc_module:hook("muc-occupant-pre-leave", occupant_leaving, -1);
  430         muc_module:hook("muc-room-destroyed", room_destroyed, -1);
  431     end
  432 end
  433 
  434 if prosody.hosts[muc_component_host] == nil then
  435     module:log("debug","No muc component found, will listen for it: %s", muc_component_host)
  436 
  437     -- when a host or component is added
  438     prosody.events.add_handler("host-activated", process_host);
  439 else
  440     process_host(muc_component_host);
  441 end
  442 
  443 module:log("info", "Loading jibri_queue_component");
  444 
  445 --- Verifies room name, domain name with the values in the token
  446 -- @param token the token we received
  447 -- @param room_name the room name
  448 -- @param group name of the group (optional)
  449 -- @param session the session to use for storing token specific fields
  450 -- @return true if values are ok or false otherwise
  451 function verify_token(token, room_jid, session)
  452     if disableTokenVerification then
  453         return true;
  454     end
  455 
  456     -- if not disableTokenVerification and we do not have token
  457     -- stop here, cause the main virtual host can have guest access enabled
  458     -- (allowEmptyToken = true) and we will allow access to rooms info without
  459     -- a token
  460     if token == nil then
  461         log("warn", "no token provided");
  462         return false;
  463     end
  464 
  465     session.auth_token = token;
  466     local verified, reason, message = token_util:process_and_verify_token(session);
  467     if not verified then
  468         log("warn", "not a valid token %s: %s", tostring(reason), tostring(message));
  469         log("debug", "invalid token %s", token);
  470         return false;
  471     end
  472 
  473     return true;
  474 end
  475 
  476 --- Handles request for updating jibri queue status
  477 -- @param event the http event, holds the request query
  478 -- @return GET response, containing a json with response details
  479 function handle_update_jibri_queue(event)
  480     local body = json.decode(event.request.body);
  481 
  482     module:log("debug","Update Jibri Queue Event Received: %s",inspect(body));
  483 
  484     local token = event.request.headers["authorization"];
  485     if not token then
  486         token = ''
  487     else
  488         local prefixStart, prefixEnd = token:find("Bearer ");
  489         if prefixStart ~= 1 then
  490             module:log("error", "REST event: Invalid authorization header format. The header must start with the string 'Bearer '");
  491             return { status_code = 403; };
  492         end
  493         token = token:sub(prefixEnd + 1);
  494     end
  495 
  496     local user_jid = body["participant"];
  497     local roomAddress = body["conference"];
  498     local userJWT = body["token"];
  499     local action = body["action"];
  500     local time = body["time"];
  501     local position = body["position"];
  502     local requestId = body["requestId"];
  503 
  504     if not action then
  505         if userJWT then
  506             action = 'token';
  507         else
  508             action = 'info';
  509         end
  510     end
  511 
  512     local room_jid = room_jid_match_rewrite(roomAddress);
  513 
  514     if not verify_token(token, room_jid, {}) then
  515         log("error", "REST event: Invalid token for room %s to route action %s for requestId %s", roomAddress, action, requestId);
  516         return { status_code = 403; };
  517     end
  518 
  519     local room = get_room_from_jid(room_jid);
  520     if (not room) then
  521         log("error", "REST event: no room found %s to route action %s for requestId %s", roomAddress, action, requestId);
  522         return { status_code = 404; };
  523     end
  524 
  525     local occupant = room:get_occupant_by_real_jid(user_jid);
  526     if not occupant then
  527         log("warn", "REST event: No occupant %s found for %s to route action %s for requestId %s", user_jid, roomAddress, action, requestId);
  528         return { status_code = 404; };
  529     end
  530 
  531     if not room.jibriQueue[occupant.jid] then
  532         log("warn", "REST event: No queue request found for occupant %s in conference %s to route action %s for requestId %s",occupant.jid,room.jid, action, requestId)
  533         return { status_code = 404; };
  534     end
  535 
  536     if not requestId then
  537         requestId = room.jibriQueue[occupant.jid];
  538     end
  539 
  540     if action == 'token' and userJWT then
  541         log("debug", "REST event: Token received for occupant %s in conference %s requestId %s, clearing room queue");
  542         clearRoomQueueByOccupant(room, occupant);
  543     end
  544 
  545     log("debug", "REST event: Sending update for occupant %s in conference %s to route action %s for requestId %s",occupant.jid,room.jid, action, requestId);
  546     sendIq(occupant.jid,action,requestId,time,position,userJWT);
  547     return { status_code = 200; };
  548 end
  549 
  550 module:depends("http");
  551 module:provides("http", {
  552     default_path = "/";
  553     name = "jibriqueue";
  554     route = {
  555         ["POST /jibriqueue/update"] = function (event) return async_handler_wrapper(event,handle_update_jibri_queue) end;
  556     };
  557 });
  558 
  559 module:hook_global('config-reloaded', reload_config);