"Fossies" - the Fresh Open Source Software Archive

Member "jitsi-meet-5079/resources/prosody-plugins/mod_smacks.lua" (17 Jun 2021, 28255 Bytes) of package /linux/misc/jitsi-meet-5079.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Lua source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file.

    1 -- XEP-0198: Stream Management for Prosody IM
    2 --
    3 -- Copyright (C) 2010-2015 Matthew Wild
    4 -- Copyright (C) 2010 Waqas Hussain
    5 -- Copyright (C) 2012-2015 Kim Alvefur
    6 -- Copyright (C) 2012 Thijs Alkemade
    7 -- Copyright (C) 2014 Florian Zeitz
    8 -- Copyright (C) 2016-2020 Thilo Molitor
    9 --
   10 -- This project is MIT/X11 licensed. Please see the
   11 -- COPYING file in the source package for more information.
   12 --
   13 
   14 local st = require "util.stanza";
   15 local dep = require "util.dependencies";
   16 local cache = dep.softreq("util.cache");    -- only available in prosody 0.10+
   17 local uuid_generate = require "util.uuid".generate;
   18 local jid = require "util.jid";
   19 
   20 local t_remove = table.remove;
   21 local math_min = math.min;
   22 local math_max = math.max;
   23 local os_time = os.time;
   24 local tonumber, tostring = tonumber, tostring;
   25 local add_filter = require "util.filters".add_filter;
   26 local timer = require "util.timer";
   27 local datetime = require "util.datetime";
   28 
   29 local xmlns_mam2 = "urn:xmpp:mam:2";
   30 local xmlns_sm2 = "urn:xmpp:sm:2";
   31 local xmlns_sm3 = "urn:xmpp:sm:3";
   32 local xmlns_errors = "urn:ietf:params:xml:ns:xmpp-stanzas";
   33 local xmlns_delay = "urn:xmpp:delay";
   34 
   35 local sm2_attr = { xmlns = xmlns_sm2 };
   36 local sm3_attr = { xmlns = xmlns_sm3 };
   37 
   38 local resume_timeout = module:get_option_number("smacks_hibernation_time", 600);
   39 local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", false);
   40 local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false);
   41 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0);
   42 local delayed_ack_timeout = module:get_option_number("smacks_max_ack_delay", 30);
   43 local max_hibernated_sessions = module:get_option_number("smacks_max_hibernated_sessions", 10);
   44 local max_old_sessions = module:get_option_number("smacks_max_old_sessions", 10);
   45 local core_process_stanza = prosody.core_process_stanza;
   46 local sessionmanager = require"core.sessionmanager";
   47 
   48 assert(max_hibernated_sessions > 0, "smacks_max_hibernated_sessions must be greater than 0");
   49 assert(max_old_sessions > 0, "smacks_max_old_sessions must be greater than 0");
   50 
   51 local c2s_sessions = module:shared("/*/c2s/sessions");
   52 
   53 local function init_session_cache(max_entries, evict_callback)
   54     -- old prosody version < 0.10 (no limiting at all!)
   55     if not cache then
   56         local store = {};
   57         return {
   58             get = function(user, key)
   59                 if not user then return nil; end
   60                 if not key then return nil; end
   61                 return store[key];
   62             end;
   63             set = function(user, key, value)
   64                 if not user then return nil; end
   65                 if not key then return nil; end
   66                 store[key] = value;
   67             end;
   68         };
   69     end
   70 
   71     -- use per user limited cache for prosody >= 0.10
   72     local stores = {};
   73     return {
   74             get = function(user, key)
   75                 if not user then return nil; end
   76                 if not key then return nil; end
   77                 if not stores[user] then
   78                     stores[user] = cache.new(max_entries, evict_callback);
   79                 end
   80                 return stores[user]:get(key);
   81             end;
   82             set = function(user, key, value)
   83                 if not user then return nil; end
   84                 if not key then return nil; end
   85                 if not stores[user] then stores[user] = cache.new(max_entries, evict_callback); end
   86                 stores[user]:set(key, value);
   87                 -- remove empty caches completely
   88                 if not stores[user]:count() then stores[user] = nil; end
   89             end;
   90         };
   91 end
   92 local old_session_registry = init_session_cache(max_old_sessions, nil);
   93 local session_registry = init_session_cache(max_hibernated_sessions, function(resumption_token, session)
   94     if session.destroyed then return true; end      -- destroyed session can always be removed from cache
   95     session.log("warn", "User has too much hibernated sessions, removing oldest session (token: %s)", resumption_token);
   96     -- store old session's h values on force delete
   97     -- save only actual h value and username/host (for security)
   98     old_session_registry.set(session.username, resumption_token, {
   99         h = session.handled_stanza_count,
  100         username = session.username,
  101         host = session.host
  102     });
  103     return true;    -- allow session to be removed from full cache to make room for new one
  104 end);
  105 
  106 local function stoppable_timer(delay, callback)
  107     local stopped = false;
  108     local timer = module:add_timer(delay, function (t)
  109         if stopped then return; end
  110         return callback(t);
  111     end);
  112     if timer and timer.stop then return timer; end      -- new prosody api includes stop() function
  113     return {
  114         stop = function(self) stopped = true end;
  115         timer;
  116     };
  117 end
  118 
  119 local function delayed_ack_function(session)
  120     -- fire event only if configured to do so and our session is not already hibernated or destroyed
  121     if delayed_ack_timeout > 0 and session.awaiting_ack
  122     and not session.hibernating and not session.destroyed then
  123         session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d",
  124             session.outgoing_stanza_queue and #session.outgoing_stanza_queue or 0);
  125         module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue});
  126     end
  127     session.delayed_ack_timer = nil;
  128 end
  129 
  130 local function can_do_smacks(session, advertise_only)
  131     if session.smacks then return false, "unexpected-request", "Stream management is already enabled"; end
  132 
  133     local session_type = session.type;
  134     if session.username then
  135         if not(advertise_only) and not(session.resource) then -- Fail unless we're only advertising sm
  136             return false, "unexpected-request", "Client must bind a resource before enabling stream management";
  137         end
  138         return true;
  139     elseif s2s_smacks and (session_type == "s2sin" or session_type == "s2sout") then
  140         return true;
  141     end
  142     return false, "service-unavailable", "Stream management is not available for this stream";
  143 end
  144 
  145 module:hook("stream-features",
  146         function (event)
  147             if can_do_smacks(event.origin, true) then
  148                 event.features:tag("sm", sm2_attr):tag("optional"):up():up();
  149                 event.features:tag("sm", sm3_attr):tag("optional"):up():up();
  150             end
  151         end);
  152 
  153 module:hook("s2s-stream-features",
  154         function (event)
  155             if can_do_smacks(event.origin, true) then
  156                 event.features:tag("sm", sm2_attr):tag("optional"):up():up();
  157                 event.features:tag("sm", sm3_attr):tag("optional"):up():up();
  158             end
  159         end);
  160 
  161 local function request_ack_if_needed(session, force, reason)
  162     local queue = session.outgoing_stanza_queue;
  163     local expected_h = session.last_acknowledged_stanza + #queue;
  164     -- session.log("debug", "*** SMACKS(1) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating));
  165     if session.awaiting_ack == nil and not session.hibernating then
  166         -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong
  167         -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any
  168         -- further requests until a higher h-value would be expected.
  169         -- session.log("debug", "*** SMACKS(2) ***: #queue=%s, max_unacked_stanzas=%s, expected_h=%s, last_requested_h=%s", tostring(#queue), tostring(max_unacked_stanzas), tostring(expected_h), tostring(session.last_requested_h));
  170         if (#queue > max_unacked_stanzas and expected_h ~= session.last_requested_h) or force then
  171             session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue);
  172             session.awaiting_ack = false;
  173             session.awaiting_ack_timer = stoppable_timer(1e-06, function ()
  174                 -- session.log("debug", "*** SMACKS(3) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating));
  175                 -- only request ack if needed and our session is not already hibernated or destroyed
  176                 if not session.awaiting_ack and not session.hibernating and not session.destroyed then
  177                     session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue);
  178                     (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
  179                     session.awaiting_ack = true;
  180                     -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
  181                     session.last_requested_h = session.last_acknowledged_stanza + #queue;
  182                     session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue);
  183                     if not session.delayed_ack_timer then
  184                         session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function()
  185                             delayed_ack_function(session);
  186                         end);
  187                     end
  188                 end
  189             end);
  190         end
  191     end
  192 
  193     -- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue
  194     -- and there isn't already a timer for this event running.
  195     -- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event
  196     -- would not trigger this event (again).
  197     if #queue > max_unacked_stanzas and session.awaiting_ack and session.delayed_ack_timer == nil then
  198         session.log("debug", "Calling delayed_ack_function directly (still waiting for ack)");
  199         delayed_ack_function(session);
  200     end
  201 end
  202 
  203 local function outgoing_stanza_filter(stanza, session)
  204     -- XXX: Normally you wouldn't have to check the xmlns for a stanza as it's
  205     -- supposed to be nil.
  206     -- However, when using mod_smacks with mod_websocket, then mod_websocket's
  207     -- stanzas/out filter can get called before this one and adds the xmlns.
  208     local is_stanza = stanza.attr and
  209         (not stanza.attr.xmlns or stanza.attr.xmlns == 'jabber:client')
  210         and not stanza.name:find":";
  211 
  212     if is_stanza and not stanza._cached then
  213         local queue = session.outgoing_stanza_queue;
  214         local cached_stanza = st.clone(stanza);
  215         cached_stanza._cached = true;
  216 
  217         if cached_stanza and cached_stanza.name ~= "iq" and cached_stanza:get_child("delay", xmlns_delay) == nil then
  218             cached_stanza = cached_stanza:tag("delay", {
  219                 xmlns = xmlns_delay,
  220                 from = jid.bare(session.full_jid or session.host),
  221                 stamp = datetime.datetime()
  222             });
  223         end
  224 
  225         queue[#queue+1] = cached_stanza;
  226         if session.hibernating then
  227             session.log("debug", "hibernating, stanza queued");
  228             module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue, stanza = cached_stanza});
  229             return nil;
  230         end
  231         request_ack_if_needed(session, false, "outgoing_stanza_filter");
  232     end
  233     return stanza;
  234 end
  235 
  236 local function count_incoming_stanzas(stanza, session)
  237     if not stanza.attr.xmlns then
  238         session.handled_stanza_count = session.handled_stanza_count + 1;
  239         session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count);
  240     end
  241     return stanza;
  242 end
  243 
  244 local function wrap_session_out(session, resume)
  245     if not resume then
  246         session.outgoing_stanza_queue = {};
  247         session.last_acknowledged_stanza = 0;
  248     end
  249 
  250     add_filter(session, "stanzas/out", outgoing_stanza_filter, -999);
  251 
  252     local session_close = session.close;
  253     function session.close(...)
  254         if session.resumption_token then
  255             session_registry.set(session.username, session.resumption_token, nil);
  256             old_session_registry.set(session.username, session.resumption_token, nil);
  257             session.resumption_token = nil;
  258         end
  259         -- send out last ack as per revision 1.5.2 of XEP-0198
  260         if session.smacks and session.conn then
  261             (session.sends2s or session.send)(st.stanza("a", { xmlns = session.smacks, h = string.format("%d", session.handled_stanza_count) }));
  262         end
  263         return session_close(...);
  264     end
  265     return session;
  266 end
  267 
  268 local function wrap_session_in(session, resume)
  269     if not resume then
  270         session.handled_stanza_count = 0;
  271     end
  272     add_filter(session, "stanzas/in", count_incoming_stanzas, 999);
  273 
  274     return session;
  275 end
  276 
  277 local function wrap_session(session, resume)
  278     wrap_session_out(session, resume);
  279     wrap_session_in(session, resume);
  280     return session;
  281 end
  282 
  283 function handle_enable(session, stanza, xmlns_sm)
  284     local ok, err, err_text = can_do_smacks(session);
  285     if not ok then
  286         session.log("warn", "Failed to enable smacks: %s", err_text); -- TODO: XEP doesn't say we can send error text, should it?
  287         (session.sends2s or session.send)(st.stanza("failed", { xmlns = xmlns_sm }):tag(err, { xmlns = xmlns_errors}));
  288         return true;
  289     end
  290 
  291     module:log("debug", "Enabling stream management");
  292     session.smacks = xmlns_sm;
  293 
  294     wrap_session(session, false);
  295 
  296     local resume_token;
  297     local resume = stanza.attr.resume;
  298     if resume == "true" or resume == "1" then
  299         resume_token = uuid_generate();
  300         session_registry.set(session.username, resume_token, session);
  301         session.resumption_token = resume_token;
  302     end
  303     (session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume, max = tostring(resume_timeout) }));
  304     return true;
  305 end
  306 module:hook_stanza(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100);
  307 module:hook_stanza(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100);
  308 
  309 module:hook_stanza("http://etherx.jabber.org/streams", "features",
  310         function (session, stanza)
  311             stoppable_timer(1e-6, function ()
  312                 if can_do_smacks(session) then
  313                     if stanza:get_child("sm", xmlns_sm3) then
  314                         session.sends2s(st.stanza("enable", sm3_attr));
  315                         session.smacks = xmlns_sm3;
  316                     elseif stanza:get_child("sm", xmlns_sm2) then
  317                         session.sends2s(st.stanza("enable", sm2_attr));
  318                         session.smacks = xmlns_sm2;
  319                     else
  320                         return;
  321                     end
  322                     wrap_session_out(session, false);
  323                 end
  324             end);
  325         end);
  326 
  327 function handle_enabled(session, stanza, xmlns_sm)
  328     module:log("debug", "Enabling stream management");
  329     session.smacks = xmlns_sm;
  330 
  331     wrap_session_in(session, false);
  332 
  333     -- FIXME Resume?
  334 
  335     return true;
  336 end
  337 module:hook_stanza(xmlns_sm2, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm2); end, 100);
  338 module:hook_stanza(xmlns_sm3, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm3); end, 100);
  339 
  340 function handle_r(origin, stanza, xmlns_sm)
  341     if not origin.smacks then
  342         module:log("debug", "Received ack request from non-smack-enabled session");
  343         return;
  344     end
  345     module:log("debug", "Received ack request, acking for %d", origin.handled_stanza_count);
  346     -- Reply with <a>
  347     (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = string.format("%d", origin.handled_stanza_count) }));
  348     -- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h)
  349     local expected_h = origin.last_acknowledged_stanza + #origin.outgoing_stanza_queue;
  350     if #origin.outgoing_stanza_queue > 0 and expected_h ~= origin.last_requested_h then
  351         request_ack_if_needed(origin, true, "piggybacked by handle_r");
  352     end
  353     return true;
  354 end
  355 module:hook_stanza(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end);
  356 module:hook_stanza(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end);
  357 
  358 function handle_a(origin, stanza)
  359     if not origin.smacks then return; end
  360     origin.awaiting_ack = nil;
  361     if origin.awaiting_ack_timer then
  362         origin.awaiting_ack_timer:stop();
  363     end
  364     if origin.delayed_ack_timer then
  365         origin.delayed_ack_timer:stop();
  366         origin.delayed_ack_timer = nil;
  367     end
  368     -- Remove handled stanzas from outgoing_stanza_queue
  369     -- origin.log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or "");
  370     local h = tonumber(stanza.attr.h);
  371     if not h then
  372         origin:close{ condition = "invalid-xml"; text = "Missing or invalid 'h' attribute"; };
  373         return;
  374     end
  375     local handled_stanza_count = h-origin.last_acknowledged_stanza;
  376     local queue = origin.outgoing_stanza_queue;
  377     if handled_stanza_count > #queue then
  378         origin.log("warn", "The client says it handled %d new stanzas, but we only sent %d :)",
  379             handled_stanza_count, #queue);
  380         origin.log("debug", "Client h: %d, our h: %d", tonumber(stanza.attr.h), origin.last_acknowledged_stanza);
  381         for i=1,#queue do
  382             origin.log("debug", "Q item %d: %s", i, tostring(queue[i]));
  383         end
  384     end
  385 
  386     for i=1,math_min(handled_stanza_count,#queue) do
  387         local handled_stanza = t_remove(origin.outgoing_stanza_queue, 1);
  388         module:fire_event("delivery/success", { session = origin, stanza = handled_stanza });
  389     end
  390 
  391     origin.log("debug", "#queue = %d", #queue);
  392     origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count;
  393     request_ack_if_needed(origin, false, "handle_a")
  394     return true;
  395 end
  396 module:hook_stanza(xmlns_sm2, "a", handle_a);
  397 module:hook_stanza(xmlns_sm3, "a", handle_a);
  398 
  399 --TODO: Optimise... incoming stanzas should be handled by a per-session
  400 -- function that has a counter as an upvalue (no table indexing for increments,
  401 -- and won't slow non-198 sessions). We can also then remove the .handled flag
  402 -- on stanzas
  403 
  404 local function handle_unacked_stanzas(session)
  405     local queue = session.outgoing_stanza_queue;
  406     local error_attr = { type = "cancel" };
  407     if #queue > 0 then
  408         session.outgoing_stanza_queue = {};
  409         for i=1,#queue do
  410             if not module:fire_event("delivery/failure", { session = session, stanza = queue[i] }) then
  411                 if queue[i].attr.type ~= "error" then
  412                     local reply = st.reply(queue[i]);
  413                     if reply.attr.to ~= session.full_jid then
  414                         reply.attr.type = "error";
  415                         reply:tag("error", error_attr)
  416                             :tag("recipient-unavailable", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"});
  417                         core_process_stanza(session, reply);
  418                     end
  419                 end
  420             end
  421         end
  422     end
  423 end
  424 
  425 -- don't send delivery errors for messages which will be delivered by mam later on
  426 -- check if stanza was archived --> this will allow us to send back errors for stanzas not archived
  427 -- because the user configured the server to do so ("no-archive"-setting for one special contact for example)
  428 local function get_stanza_id(stanza, by_jid)
  429     for tag in stanza:childtags("stanza-id", "urn:xmpp:sid:0") do
  430         if tag.attr.by == by_jid then
  431             return tag.attr.id;
  432         end
  433     end
  434     return nil;
  435 end
  436 module:hook("delivery/failure", function(event)
  437     local session, stanza = event.session, event.stanza;
  438     -- Only deal with authenticated (c2s) sessions
  439     if session.username then
  440         if stanza.name == "message" and stanza.attr.xmlns == nil and
  441                 ( stanza.attr.type == "chat" or ( stanza.attr.type or "normal" ) == "normal" ) then
  442             -- don't store messages in offline store if they are mam results
  443             local mam_result = stanza:get_child("result", xmlns_mam2);
  444             if mam_result ~= nil then
  445                 return true;        -- stanza already "handled", don't send an error and don't add it to offline storage
  446             end
  447             -- do nothing here for normal messages and don't send out "message delivery errors",
  448             -- because messages are already in MAM at this point (no need to frighten users)
  449             local stanza_id = get_stanza_id(stanza, jid.bare(session.full_jid));
  450             if session.mam_requested and stanza_id ~= nil then
  451                 session.log("debug", "mod_smacks delivery/failure returning true for mam-handled stanza: mam-archive-id=%s", tostring(stanza_id));
  452                 return true;        -- stanza handled, don't send an error
  453             end
  454             -- store message in offline store, if this client does not use mam *and* was the last client online
  455             local sessions = prosody.hosts[module.host].sessions[session.username] and
  456                     prosody.hosts[module.host].sessions[session.username].sessions or nil;
  457             if sessions and next(sessions) == session.resource and next(sessions, session.resource) == nil then
  458                 local ok = module:fire_event("message/offline/handle", { origin = session, stanza = stanza } );
  459                 session.log("debug", "mod_smacks delivery/failuere returning %s for offline-handled stanza", tostring(ok));
  460                 return ok;          -- if stanza was handled, don't send an error
  461             end
  462         end
  463     end
  464 end);
  465 
  466 module:hook("pre-resource-unbind", function (event)
  467     local session, err = event.session, event.error;
  468     if session.smacks then
  469         if not session.resumption_token then
  470             local queue = session.outgoing_stanza_queue;
  471             if #queue > 0 then
  472                 session.log("debug", "Destroying session with %d unacked stanzas", #queue);
  473                 handle_unacked_stanzas(session);
  474             end
  475         else
  476             session.log("debug", "mod_smacks hibernating session for up to %d seconds", resume_timeout);
  477             local hibernate_time = os_time(); -- Track the time we went into hibernation
  478             session.hibernating = hibernate_time;
  479             local resumption_token = session.resumption_token;
  480             module:fire_event("smacks-hibernation-start", {origin = session, queue = session.outgoing_stanza_queue});
  481             timer.add_task(resume_timeout, function ()
  482                 session.log("debug", "mod_smacks hibernation timeout reached...");
  483                 -- We need to check the current resumption token for this resource
  484                 -- matches the smacks session this timer is for in case it changed
  485                 -- (for example, the client may have bound a new resource and
  486                 -- started a new smacks session, or not be using smacks)
  487                 local curr_session = full_sessions[session.full_jid];
  488                 if session.destroyed then
  489                     session.log("debug", "The session has already been destroyed");
  490                 elseif curr_session and curr_session.resumption_token == resumption_token
  491                 -- Check the hibernate time still matches what we think it is,
  492                 -- otherwise the session resumed and re-hibernated.
  493                 and session.hibernating == hibernate_time then
  494                     -- wait longer if the timeout isn't reached because push was enabled for this session
  495                     -- session.first_hibernated_push is the starting point for hibernation timeouts of those push enabled clients
  496                     -- wait for an additional resume_timeout seconds if no push occured since hibernation at all
  497                     local current_time = os_time();
  498                     local timeout_start = math_max(session.hibernating, session.first_hibernated_push or session.hibernating);
  499                     if session.push_identifier ~= nil and not session.first_hibernated_push then
  500                         session.log("debug", "No push happened since hibernation started, hibernating session for up to %d extra seconds", resume_timeout);
  501                         return resume_timeout;
  502                     end
  503                     if current_time-timeout_start < resume_timeout and session.push_identifier ~= nil then
  504                         session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds", current_time-timeout_start);
  505                         return current_time-timeout_start;      -- time left to wait
  506                     end
  507                     session.log("debug", "Destroying session for hibernating too long");
  508                     session_registry.set(session.username, session.resumption_token, nil);
  509                     -- save only actual h value and username/host (for security)
  510                     old_session_registry.set(session.username, session.resumption_token, {
  511                         h = session.handled_stanza_count,
  512                         username = session.username,
  513                         host = session.host
  514                     });
  515                     session.resumption_token = nil;
  516                     sessionmanager.destroy_session(session);
  517                 else
  518                     session.log("debug", "Session resumed before hibernation timeout, all is well")
  519                 end
  520             end);
  521             return true; -- Postpone destruction for now
  522         end
  523     end
  524 end);
  525 
  526 local function handle_s2s_destroyed(event)
  527     local session = event.session;
  528     local queue = session.outgoing_stanza_queue;
  529     if queue and #queue > 0 then
  530         session.log("warn", "Destroying session with %d unacked stanzas", #queue);
  531         if s2s_resend then
  532             for i = 1, #queue do
  533                 module:send(queue[i]);
  534             end
  535             session.outgoing_stanza_queue = nil;
  536         else
  537             handle_unacked_stanzas(session);
  538         end
  539     end
  540 end
  541 
  542 module:hook("s2sout-destroyed", handle_s2s_destroyed);
  543 module:hook("s2sin-destroyed", handle_s2s_destroyed);
  544 
  545 local function get_session_id(session)
  546     return session.id or (tostring(session):match("[a-f0-9]+$"));
  547 end
  548 
  549 function handle_resume(session, stanza, xmlns_sm)
  550     if session.full_jid then
  551         session.log("warn", "Tried to resume after resource binding");
  552         session.send(st.stanza("failed", { xmlns = xmlns_sm })
  553             :tag("unexpected-request", { xmlns = xmlns_errors })
  554         );
  555         return true;
  556     end
  557 
  558     local id = stanza.attr.previd;
  559     local original_session = session_registry.get(session.username, id);
  560     if not original_session then
  561         session.log("debug", "Tried to resume non-existent session with id %s", id);
  562         local old_session = old_session_registry.get(session.username, id);
  563         if old_session and session.username == old_session.username
  564         and session.host == old_session.host
  565         and old_session.h then
  566             session.send(st.stanza("failed", { xmlns = xmlns_sm, h = string.format("%d", old_session.h) })
  567                 :tag("item-not-found", { xmlns = xmlns_errors })
  568             );
  569         else
  570             session.send(st.stanza("failed", { xmlns = xmlns_sm })
  571                 :tag("item-not-found", { xmlns = xmlns_errors })
  572             );
  573         end;
  574     elseif session.username == original_session.username
  575     and session.host == original_session.host then
  576         session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session));
  577         original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session));
  578         -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session))
  579         if original_session.conn then
  580             original_session.log("debug", "mod_smacks closing an old connection for this session");
  581             local conn = original_session.conn;
  582             c2s_sessions[conn] = nil;
  583             conn:close();
  584         end
  585         local migrated_session_log = session.log;
  586         original_session.ip = session.ip;
  587         original_session.conn = session.conn;
  588         original_session.send = session.send;
  589         original_session.close = session.close;
  590         original_session.filter = session.filter;
  591         original_session.filter.session = original_session;
  592         original_session.filters = session.filters;
  593         original_session.stream = session.stream;
  594         original_session.secure = session.secure;
  595         original_session.hibernating = nil;
  596         session.log = original_session.log;
  597         session.type = original_session.type;
  598         wrap_session(original_session, true);
  599         -- Inform xmppstream of the new session (passed to its callbacks)
  600         original_session.stream:set_session(original_session);
  601         -- Similar for connlisteners
  602         c2s_sessions[session.conn] = original_session;
  603 
  604         original_session.send(st.stanza("resumed", { xmlns = xmlns_sm,
  605             h = string.format("%d", original_session.handled_stanza_count), previd = id }));
  606 
  607         -- Fake an <a> with the h of the <resume/> from the client
  608         original_session:dispatch_stanza(st.stanza("a", { xmlns = xmlns_sm,
  609             h = stanza.attr.h }));
  610 
  611         -- Ok, we need to re-send any stanzas that the client didn't see
  612         -- ...they are what is now left in the outgoing stanza queue
  613         -- We have to use the send of "session" because we don't want to add our resent stanzas
  614         -- to the outgoing queue again
  615         local queue = original_session.outgoing_stanza_queue;
  616         session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", #queue);
  617         for i=1,#queue do
  618             session.send(queue[i]);
  619         end
  620         session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", #queue);
  621         function session.send(stanza)
  622             migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza));
  623             return false;
  624         end
  625         module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue});
  626         request_ack_if_needed(original_session, true, "handle_resume");
  627     else
  628         module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]",
  629             session.username or "?", session.host or "?", session.type,
  630             original_session.username or "?", original_session.host or "?", original_session.type);
  631         session.send(st.stanza("failed", { xmlns = xmlns_sm })
  632             :tag("not-authorized", { xmlns = xmlns_errors }));
  633     end
  634     return true;
  635 end
  636 module:hook_stanza(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
  637 module:hook_stanza(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);
  638 
  639 local function handle_read_timeout(event)
  640     local session = event.session;
  641     if session.smacks then
  642         if session.awaiting_ack then
  643             if session.awaiting_ack_timer then
  644                 session.awaiting_ack_timer:stop();
  645             end
  646             if session.delayed_ack_timer then
  647                 session.delayed_ack_timer:stop();
  648                 session.delayed_ack_timer = nil;
  649             end
  650             return false; -- Kick the session
  651         end
  652         session.log("debug", "Sending <r> (read timeout)");
  653         (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }));
  654         session.awaiting_ack = true;
  655         if not session.delayed_ack_timer then
  656             session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function()
  657                 delayed_ack_function(session);
  658             end);
  659         end
  660         return true;
  661     end
  662 end
  663 
  664 module:hook("s2s-read-timeout", handle_read_timeout);
  665 module:hook("c2s-read-timeout", handle_read_timeout);