"Fossies" - the Fresh Open Source Software Archive

Member "screenkey-1.1/Screenkey/inputlistener.py" (17 May 2020, 15407 Bytes) of package /linux/privat/screenkey-1.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "inputlistener.py" see the Fossies "Dox" file reference documentation.

    1 # -*- coding: utf-8 -*-
    2 # Distributed under the GNU GPLv3+ license, WITHOUT ANY WARRANTY.
    3 # Copyright(c) 2015: wave++ "Yuri D'Elia" <wavexx@thregr.org>
    4 #
    5 # Outputting translated X11 keystrokes is not a simple problem as soon as XIM
    6 # is introduced: getting an updated keyboard/modifier map is not enough to
    7 # replicate the [complex] logic hidden in the input method.
    8 #
    9 # For this reason we use a fairly convoluted mechanism: we record keystrokes
   10 # using the XRecord extension, but we relay them to another fake window running
   11 # on the same server. By manipulating the event, we trick the input method to
   12 # perform composition for us, and poll for translated output events using
   13 # Xutf8LookupString. Since we cannot determine the state of the input context
   14 # for the target window (we're recording blindly), we also need to reset
   15 # context state carefully when the user switches the active focus. 3(!) extra
   16 # connections to the display server are required for this task, and since we're
   17 # using blocking APIs, having to run on our own thread means we cannot share
   18 # any of those with the regular process. On the other hand, many other keycode
   19 # translation issues are avoided by using the string lookup directly.
   20 #
   21 # This is, of course, never going to be always identical to the final output,
   22 # since we're guessing the state of the client (we do the same when guessing
   23 # the result of backspace anyway). But incidentally this method would also
   24 # allow us to poll the input mechanism while composing, to better reflect the
   25 # actual typing on the keyboard.
   26 #
   27 # Some of the code /could/ have been simplified by using XCB for protocol
   28 # translation, but since there's no equivalent to XKB/XIM, I found the exercise
   29 # futile. Needing to use XIM directly also barred pure-python equivalents. As
   30 # a result, we have to drop back to ctypes for _extra_ phun.
   31 #
   32 # Drop me a line if you ever find this comment helpful, as finding a decent
   33 # solution was not trivial -- YD 21/08/2015.
   34 
   35 if __name__ == '__main__':
   36     import xlib
   37     import keysyms
   38 else:
   39     from . import xlib
   40     from . import keysyms
   41 
   42 import sys
   43 if sys.version_info.major < 3:
   44     import glib
   45 else:
   46     from gi.repository import GLib as glib
   47 
   48 import threading
   49 import warnings
   50 import select
   51 
   52 
   53 # convenience wrappers
   54 def coalesce_ranges(ranges):
   55     ranges = sorted(ranges, key=lambda x: x[0])
   56     ret = ranges[:1]
   57     for r in ranges[1:]:
   58         if ret[-1][1] < r[0] - 1:
   59             ret.append(r)
   60         else:
   61             ret[-1][1] = max(ret[-1][1], r[1])
   62     return ret
   63 
   64 
   65 def record_context(dpy, ev_ranges, dev_ranges):
   66     ev_ranges = coalesce_ranges(ev_ranges)
   67     dev_ranges = coalesce_ranges(dev_ranges)
   68 
   69     specs = max(len(ev_ranges), len(dev_ranges))
   70     range_specs = (xlib.POINTER(xlib.XRecordRange) * specs)()
   71 
   72     for i in range(specs):
   73         range_specs[i] = xlib.XRecordAllocRange()
   74         if len(ev_ranges) > i:
   75             range_specs[i].contents.delivered_events.first = ev_ranges[i][0]
   76             range_specs[i].contents.delivered_events.last = ev_ranges[i][1]
   77         if len(dev_ranges) > i:
   78             range_specs[i].contents.device_events.first = dev_ranges[i][0]
   79             range_specs[i].contents.device_events.last = dev_ranges[i][1]
   80 
   81     rec_ctx = xlib.XRecordCreateContext(
   82         dpy, 0,
   83         xlib.byref(xlib.c_ulong(xlib.XRecordAllClients)), 1,
   84         range_specs, specs)
   85 
   86     for i in range(specs):
   87         xlib.XFree(range_specs[i])
   88 
   89     return rec_ctx
   90 
   91 
   92 def record_enable(dpy, rec_ctx, callback):
   93     def intercept(data):
   94         if data.category != xlib.XRecordFromServer:
   95             return
   96         if data.client_swapped:
   97             warnings.warn("cannot handle swapped protocol data")
   98             return
   99         ev = xlib.XWireToEvent(dpy, data.data)
  100         callback(ev)
  101 
  102     def intercept_(_, data):
  103         intercept(data.contents)
  104         xlib.XRecordFreeData(data)
  105 
  106     proc = xlib.XRecordInterceptProc(intercept_)
  107     xlib.XRecordEnableContextAsync(dpy, rec_ctx, proc, None)
  108     return proc
  109 
  110 
  111 def create_replay_window(dpy):
  112     win_attr = xlib.XSetWindowAttributes()
  113     win_attr.override_redirect = True
  114     win = xlib.XCreateWindow(dpy, xlib.XDefaultRootWindow(dpy),
  115                              0, 0, 1, 1, 0,
  116                              xlib.CopyFromParent, xlib.InputOnly, None,
  117                              xlib.CWOverrideRedirect,
  118                              xlib.byref(win_attr))
  119     return win
  120 
  121 
  122 def phantom_release(dpy, kev):
  123     if not xlib.XPending(dpy):
  124         return False
  125     ev = xlib.XEvent()
  126     xlib.XPeekEvent(dpy, xlib.byref(ev))
  127     return (ev.type == xlib.KeyPress and \
  128             ev.xkey.state == kev.state and \
  129             ev.xkey.keycode == kev.keycode and \
  130             ev.xkey.time == kev.time)
  131 
  132 
  133 def keysym_to_unicode(keysym):
  134     if 0x01000000 <= keysym <= 0x0110FFFF:
  135         return unichr(keysym - 0x01000000)
  136     keydata = keysyms.KEYSYMS.get(keysym)
  137     if keydata is not None:
  138         return keydata[0]
  139     return None
  140 
  141 
  142 
  143 class KeyData():
  144     def __init__(self, pressed=None, filtered=None, repeated=None,
  145                  string=None, keysym=None, status=None, symbol=None,
  146                  mods_mask=None, modifiers=None):
  147         self.pressed = pressed
  148         self.filtered = filtered
  149         self.repeated = repeated
  150         self.string = string
  151         self.keysym = keysym
  152         self.status = status
  153         self.symbol = symbol
  154         self.mods_mask = mods_mask
  155         self.modifiers = modifiers
  156 
  157 
  158 class InputType:
  159     keyboard = 0b001
  160     button   = 0b010
  161     movement = 0b100
  162     all      = 0b111
  163 
  164 
  165 
  166 class InputListener(threading.Thread):
  167     def __init__(self, callback, input_types=InputType.all, kbd_compose=True, kbd_translate=True):
  168         super(InputListener, self).__init__()
  169         self.callback = callback
  170         self.input_types = input_types
  171         self.kbd_compose = kbd_compose
  172         self.kbd_translate = kbd_translate
  173         self.lock = threading.Lock()
  174         self.stopped = True
  175         self.error = None
  176 
  177 
  178     def _event_received(self, ev):
  179         if xlib.KeyPress <= ev.type <= xlib.MotionNotify:
  180             xlib.XSendEvent(self.replay_dpy, self.replay_win, False, 0, ev)
  181         elif ev.type in [xlib.FocusIn, xlib.FocusOut]:
  182             # Forward the event as a custom message in the same queue instead
  183             # of resetting the XIC directly, in order to preserve queued events
  184             fwd_ev = xlib.XEvent()
  185             fwd_ev.type = xlib.ClientMessage
  186             fwd_ev.xclient.message_type = self.custom_atom
  187             fwd_ev.xclient.format = 32
  188             fwd_ev.xclient.data[0] = ev.type
  189             xlib.XSendEvent(self.replay_dpy, self.replay_win, False, 0, fwd_ev)
  190 
  191 
  192     def _event_callback(self, data):
  193         self.callback(data)
  194         return False
  195 
  196     def _event_processed(self, data):
  197         data.symbol = xlib.XKeysymToString(data.keysym)
  198         if data.string is None:
  199             data.string = keysym_to_unicode(data.keysym)
  200         glib.idle_add(self._event_callback, data)
  201 
  202 
  203     def _event_modifiers(self, kev, data):
  204         data.modifiers = modifiers = {}
  205         modifiers['shift'] = bool(kev.state & xlib.ShiftMask)
  206         modifiers['caps_lock'] = bool(kev.state & xlib.LockMask)
  207         modifiers['ctrl'] = bool(kev.state & xlib.ControlMask)
  208         modifiers['alt'] = bool(kev.state & xlib.Mod1Mask)
  209         modifiers['num_lock'] = bool(kev.state & xlib.Mod2Mask)
  210         modifiers['hyper'] = bool(kev.state & xlib.Mod3Mask)
  211         modifiers['super'] = bool(kev.state & xlib.Mod4Mask)
  212         modifiers['alt_gr'] = bool(kev.state & xlib.Mod5Mask)
  213 
  214 
  215     def _event_keypress(self, kev, data):
  216         buf = xlib.create_string_buffer(16)
  217         keysym = xlib.KeySym()
  218         status = xlib.Status()
  219         ret = xlib.Xutf8LookupString(self._kbd_replay_xic, kev, buf, len(buf),
  220                                      xlib.byref(keysym), xlib.byref(status))
  221         if ret != xlib.NoSymbol:
  222             if 32 <= keysym.value <= 126:
  223                 # avoid ctrl sequences, just take the character value
  224                 data.string = chr(keysym.value)
  225             else:
  226                 try:
  227                     data.string = buf.value.decode('utf-8')
  228                 except UnicodeDecodeError:
  229                     pass
  230         data.keysym = keysym.value
  231         data.status = status.value
  232 
  233 
  234     def _event_lookup(self, kev, data):
  235         # this is mostly for debugging: we do not account for group/level
  236         data.keysym = xlib.XkbKeycodeToKeysym(kev.display, kev.keycode, 0, 0)
  237 
  238 
  239     def start(self):
  240         self.lock.acquire()
  241         self.stopped = False
  242         self.error = None
  243         super(InputListener, self).start()
  244 
  245 
  246     def stop(self):
  247         with self.lock:
  248             if not self.stopped:
  249                 self.stopped = True
  250                 xlib.XRecordDisableContext(self.control_dpy, self.record_ctx)
  251 
  252 
  253     def _kbd_init(self):
  254         self._kbd_last_ev = xlib.XEvent()
  255 
  256         if self.kbd_compose:
  257             style = xlib.XIMPreeditNothing | xlib.XIMStatusNothing
  258         else:
  259             style = xlib.XIMPreeditNone | xlib.XIMStatusNone
  260 
  261         # TODO: implement preedit callbacks for on-the-spot composition
  262         #       (this would fix focus-stealing for the global IM state)
  263         self._kbd_replay_xim = xlib.XOpenIM(self.replay_dpy, None, None, None)
  264         if not self._kbd_replay_xim:
  265             raise Exception("Cannot initialize input method")
  266 
  267         self._kbd_replay_xic = xlib.XCreateIC(self._kbd_replay_xim,
  268                                               xlib.XNClientWindow, self.replay_win,
  269                                               xlib.XNInputStyle, style,
  270                                               None)
  271         xlib.XSetICFocus(self._kbd_replay_xic)
  272 
  273 
  274     def _kbd_del(self):
  275         xlib.XDestroyIC(self._kbd_replay_xic)
  276         xlib.XCloseIM(self._kbd_replay_xim)
  277 
  278 
  279     def _kbd_process(self, ev):
  280         if ev.type == xlib.ClientMessage and \
  281            ev.xclient.message_type == self.custom_atom:
  282             if ev.xclient.data[0] in [xlib.FocusIn, xlib.FocusOut]:
  283                 # we do not keep track of multiple XICs, just reset
  284                 xic = xlib.Xutf8ResetIC(self._kbd_replay_xic)
  285                 if xic is not None: xlib.XFree(xic)
  286             return
  287         elif ev.type in [xlib.KeyPress, xlib.KeyRelease]:
  288             # fake keyboard event data for XFilterEvent
  289             ev.xkey.send_event = False
  290             ev.xkey.window = self.replay_win
  291 
  292         # pass _all_ events to XFilterEvent
  293         filtered = bool(xlib.XFilterEvent(ev, 0))
  294         if ev.type == xlib.KeyRelease and \
  295            phantom_release(self.replay_dpy, ev.xkey):
  296             return
  297         if ev.type not in [xlib.KeyPress, xlib.KeyRelease]:
  298             return
  299 
  300         # generate new keyboard event
  301         data = KeyData()
  302         data.filtered = filtered
  303         data.pressed = (ev.type == xlib.KeyPress)
  304         data.repeated = (ev.type == self._kbd_last_ev.type and \
  305                          ev.xkey.state == self._kbd_last_ev.xkey.state and \
  306                          ev.xkey.keycode == self._kbd_last_ev.xkey.keycode)
  307         data.mods_mask = ev.xkey.state
  308         self._event_modifiers(ev.xkey, data)
  309         if not data.filtered and data.pressed and self.kbd_translate:
  310             self._event_keypress(ev.xkey, data)
  311         else:
  312             self._event_lookup(ev.xkey, data)
  313         self._event_processed(data)
  314         self._kbd_last_ev = ev
  315 
  316 
  317     def run(self):
  318         # control connection
  319         self.control_dpy = xlib.XOpenDisplay(None)
  320         xlib.XSynchronize(self.control_dpy, True)
  321 
  322         # unmapped replay window
  323         self.replay_dpy = xlib.XOpenDisplay(None)
  324         self.custom_atom = xlib.XInternAtom(self.replay_dpy, b"SCREENKEY", False)
  325         replay_fd = xlib.XConnectionNumber(self.replay_dpy)
  326         self.replay_win = create_replay_window(self.replay_dpy)
  327 
  328         # bail during initialization errors
  329         try:
  330             if self.input_types & InputType.keyboard:
  331                 self._kbd_init()
  332         except Exception as e:
  333             self.error = e
  334             xlib.XCloseDisplay(self.control_dpy)
  335             xlib.XDestroyWindow(self.replay_dpy, self.replay_win)
  336             xlib.XCloseDisplay(self.replay_dpy)
  337 
  338             # cheap wakeup() equivalent for compatibility
  339             glib.idle_add(self._event_callback, None)
  340 
  341             self.stopped = True
  342             self.lock.release()
  343             return
  344 
  345         # initialize recording context
  346         ev_ranges = []
  347         dev_ranges = []
  348         if self.input_types & InputType.keyboard:
  349             ev_ranges.append([xlib.FocusIn, xlib.FocusOut])
  350             dev_ranges.append([xlib.KeyPress, xlib.KeyRelease])
  351         if self.input_types & InputType.button:
  352             dev_ranges.append([xlib.ButtonPress, xlib.ButtonRelease])
  353         if self.input_types & InputType.movement:
  354             dev_ranges.append([xlib.MotionNotify, xlib.MotionNotify])
  355         self.record_ctx = record_context(self.control_dpy, ev_ranges, dev_ranges);
  356 
  357         record_dpy = xlib.XOpenDisplay(None)
  358         record_fd = xlib.XConnectionNumber(record_dpy)
  359         # we need to keep the record_ref alive(!)
  360         record_ref = record_enable(record_dpy, self.record_ctx, self._event_received)
  361 
  362         # event loop
  363         self.lock.release()
  364         while True:
  365             with self.lock:
  366                 if self.stopped:
  367                     break
  368 
  369             r_fd = []
  370             if xlib.XPending(record_dpy):
  371                 r_fd.append(record_fd)
  372             if xlib.XPending(self.replay_dpy):
  373                 r_fd.append(replay_fd)
  374             if not r_fd:
  375                 r_fd, _, _ = select.select([record_fd, replay_fd], [], [])
  376             if not r_fd:
  377                 break
  378 
  379             if record_fd in r_fd:
  380                 xlib.XRecordProcessReplies(record_dpy)
  381                 xlib.XFlush(self.replay_dpy)
  382 
  383             if replay_fd in r_fd:
  384                 ev = xlib.XEvent()
  385                 xlib.XNextEvent(self.replay_dpy, xlib.byref(ev))
  386                 if self.input_types & InputType.keyboard:
  387                     self._kbd_process(ev)
  388 
  389         # finalize
  390         self.lock.acquire()
  391 
  392         xlib.XRecordFreeContext(self.control_dpy, self.record_ctx)
  393         xlib.XCloseDisplay(self.control_dpy)
  394         xlib.XCloseDisplay(record_dpy)
  395         del record_ref
  396 
  397         if self.input_types & InputType.keyboard:
  398             self._kbd_del()
  399 
  400         xlib.XDestroyWindow(self.replay_dpy, self.replay_win)
  401         xlib.XCloseDisplay(self.replay_dpy)
  402 
  403         self.stopped = True
  404         self.lock.release()
  405 
  406 
  407 
  408 if __name__ == '__main__':
  409     def callback(data):
  410         values = {}
  411         for k in dir(data):
  412             if k[0] == '_': continue
  413             values[k] = getattr(data, k)
  414         print(values)
  415 
  416     glib.threads_init()
  417     kl = InputListener(callback)
  418     try:
  419         # keep running only while the listener is alive
  420         kl.start()
  421         while kl.is_alive():
  422             glib.main_context_default().iteration()
  423     except KeyboardInterrupt:
  424         pass
  425 
  426     # check if the thread terminated unexpectedly
  427     if kl.is_alive():
  428         kl.stop()
  429         kl.join()
  430     elif kl.error:
  431         print("initialization error: {}".format(kl.error))
  432         if '__traceback__' in dir(kl.error):
  433             import traceback
  434             traceback.print_tb(kl.error.__traceback__)
  435         exit(1)