#!/usr/bin/python # vim: set fileencoding=utf-8 : # Alexander Grothe 2011 - 2012 # # This script requires python-uinput V 0.6.1. or higher. # Additional required packages are libudev0 and libudev-dev. # ### Fetch the code for python-uinput from git: ### # # git clone git://github.com/tuomasjjrasanen/python-uinput.git # cd python-uinput # git clone git://github.com/tuomasjjrasanen/libsuinput.git # sudo python setup.py install # ### # # This script must be run as superuser or with sufficent rights to create # an uinput device and expects a lircd socket using pid from # /var/run/lirc/lircd.pid under /var/run/lirc/lircd. # if none is given by --lircd-socket /PATH/TO/LIRCD_SOCKET # lircd must not be startet with --uinput, but may be started with # --release="_up" to prevent ghosting events if necessary. import logging as log import logging.handlers as loghandlers import string import socket import gobject import sys import uinput import datetime import time from threading import Timer from optparse import OptionParser class Lirc2uinput: """Sends keystrokes to a virtual uinput device""" def __init__(self, uinput_name="lircd", options=None): self.lastkey = None self.wait_repeats = options.wait_repeats self.max_gap = options.max_gap self.min_gap = options.min_gap self.acceleration = options.acceleration self.lircd_socket = options.lircd_socket self.xbmc = options.xbmc self.gap_delta = (self.max_gap - self.min_gap)*self.acceleration self.current_gap = self.max_gap self.repeat_num = 0 self.timestamp = datetime.datetime.now() self.timeout_value = 1 self.events = [] self.timeout = Timer(self.timeout_value, uinput.KEY_COFFEE) # add all defined KEY_.* to supported key events self.events = [ eval("uinput.%s" % element) for element in dir(uinput) if element.startswith('KEY_')] # create uinput device self.specialkeys = [uinput.KEY_VOLUMEUP, uinput.KEY_VOLUMEDOWN] self.device = uinput.Device(self.events, uinput_name) def is_key(self, entry): if entry.startswith("KEY_"): return eval("uinput.%s" % entry) def get_gap(self, repeat_num): if self.current_gap > self.min_gap: self.current_gap = self.current_gap - self.gap_delta else: logging.debug("minimum gap reached") return self.current_gap def getKeyname(self, key): try: if key[0].islower(): keycmd = eval('uinput.%s' % (key.upper())) k_upper = False else: """'_up' ist a suffix added by lircd to keynames optionally to signal a key release""" keycmd = eval('uinput.%s' % (key.replace('_up', ''))) k_upper = True except: keycmd = uinput.KEY_COFFEE k_upper = True logging.warning( "Key %s is not supported by your input.h, get a coffee and fix \ your lircd.conf;)" % key) logging.debug("%s mapped to %s" % (key, keycmd)) return keycmd, k_upper def send_key_r(self, key): keycmd, k_upper = self.getKeyname(key) logging.debug(keycmd) now = datetime.datetime.now() # repeated keypress if self.lastkey == keycmd and (now - self.timestamp).microseconds < self.current_gap: logging.debug( u"Passing keypress %s... too early" % (unicode(keycmd))) elif self.lastkey == keycmd: logging.debug(u"Repeated keypress %s" % (unicode(keycmd))) if self.repeat_num >= self.wait_repeats: self.current_gap = self.get_gap(self.repeat_num) else: pass if self.repeat_num > 0: self.keypress(keycmd, 2) self.timestamp = datetime.datetime.now() self.repeat_num += 1 else: self.keypress(keycmd, 1) self.repeat_num += 1 self.lastkey = keycmd return keycmd def send_key(self, key): keycmd, k_upper = self.getKeyname(key) logging.debug(keycmd) if self.xbmc and keycmd not in self.specialkeys: self.keypress(keycmd, 1) self.keypress(keycmd, 0) else: if self.lastkey == keycmd and self.repeat_num > 0: self.keypress(keycmd, 2) else: self.keypress(keycmd, 1) self.repeat_num += 1 self.lastkey = keycmd return keycmd def keypress(self, key, value): self.device.emit(key, value) class main: """Listens to a lirc socket and calls a method each time an keypress is received.""" def __init__(self): parser = Options() self.options = parser.get_opts() self.syslog_init() logging.setLevel(getattr(log, self.options.loglevel)) self.timeout = self.options.timeout if (self.timeout * 1000) < self.options.min_gap: self.timeout = self.options.min_gap + 10000 logging.warning( "Warning: timeout < --min-gap: setting timeout 10ms greater than --min-gap") self.repeatfilter = self.options.repeatfilter self.xbmc = self.options.xbmc self.timer = None if not self.options.lircd_socket: # use /var/run/lirc/lircd. as socket self.socket_path = None try: with open("/var/run/lirc/lircd.pid", 'r') as pidfile: pid = int(pidfile.read().strip("\n")) self.socket_path = "/var/run/lirc/lircd.%s" % (pid) except: logging.exception( u'Error reading PID for lircd from /var/run/lirc/lircd.pid,\ will sleep 1 second and exit this script - ERROR:') time.sleep(1) sys.exit("lirc socket not found") finally: if self.socket_path is not None: logging.info(u'lircd_socket = %s', self.socket_path) else: self.socket_path = self.options.lircd_socket self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: self.sock.connect(self.socket_path) except: logging.error("could not connect to socket %s" % self.socket_path) sys.exit("ERROR: could not connect to lirc socket %s." % self.socket_path) self.uinputdev = Lirc2uinput(options=self.options) if self.repeatfilter: self.command = "send_key_r" gobject.io_add_watch(self.sock, gobject.IO_IN, self.handler) else: self.command = "send_key" gobject.io_add_watch(self.sock, gobject.IO_IN, self.handler) def handler(self, sock, *args): try: gobject.source_remove(self.timer) logging.debug("removed timer") except: pass buf = self.sock.recv(1024) lines = string.split(buf, "\n") for line in lines[:-1]: code, count, cmd, device = string.split(line, " ") keycmd = eval("self.uinputdev.%s(cmd)" % (self.command)) self.timer = gobject.timeout_add( self.timeout, self.release_key, keycmd) return True def release_key(self, keycmd): self.uinputdev.keypress(keycmd, 0) self.uinputdev.repeat_num = 0 self.uinputdev.lastkey = None self.uinputdev.current_gap = self.uinputdev.max_gap logging.debug('released key', keycmd) return 0 def syslog_init(self): logging.info('Started lircd2uinput with these options:') logging.info('wait_repeats = %s' % (self.options.wait_repeats)) logging.info('max_gap = %s' % (self.options.max_gap)) logging.info('min_gap = %s' % (self.options.min_gap)) logging.info('acceleration = %s' % (self.options.acceleration)) logging.info('loglevel = %s' % (self.options.loglevel)) if self.options.lircd_socket: logging.info('lirc_socket = %s' % (self.options.lircd_socket)) else: logging.info('lirc_socket = default') class Options: def __init__(self): self.parser = OptionParser() self.parser.add_option( "-f", "--repeat-filter", dest="repeatfilter", action="store_true", help=u'enable repeat-filter') self.parser.add_option( "-s", "--lircd-socket", dest="lircd_socket", default=None, help=u"choose lircd socket to listen on", metavar="LIRCD_SOCKET") self.parser.add_option( "-v", "--loglevel", dest="loglevel", default='WARNING', help=u"--loglevel [DEBUG|INFO|WARNING|ERROR|CRITICAL]", metavar="LOG_LEVEL") self.parser.add_option( "-x", "--xbmc", dest="xbmc", action="store_true", help=u'enable xbmc single keypress mode') self.parser.add_option( "-l", "--min-gap", dest="min_gap", default=150000, type="int", help=u"set minimum gap between repeated keystrokes \ (default 150000 µs) - needs active repeat-filter", metavar="MIN_GAP") self.parser.add_option( "-u", "--max-gap", dest="max_gap", default=300000, type="int", help=u"set maximum gap between repeated keystrokes \ default: 300000 µs - needs active repeat-filter", metavar="MAX_GAP") self.parser.add_option( "-r", "--min-repeats", dest="wait_repeats", default=2, type="int", help=u"number of repeats before using accelerated keypresses \ default: 2 - needs active repeat-filter", metavar="WAIT_REPEATS") self.parser.add_option( "-t", "--timeout", dest="timeout", default=200, type="int", help=u"release key after x ms no following key is received\ default: 200 ms", metavar="TIMEOUT") self.parser.add_option( "-a", "--acceleration", dest="acceleration", default=0.25, type="float", help=u"acceleration to change from MAX_GAP to MIN_GAP.\ Default value of 0.25 equals 4 repeated keystrokes to reach\ maximum speed - needs active repeat-filter", metavar="ACCELERATION") def get_opts(self): (options, args) = self.parser.parse_args() return options if __name__ == "__main__": logging = log.getLogger('lircd2uinput') logging.setLevel(log.DEBUG) handler = loghandlers.SysLogHandler(address='/dev/log') formatter = log.Formatter('%(name)s[%(process)d]: %(levelname)-5s %(message)s') handler.setFormatter(formatter) logging.addHandler(handler) vlirc = main() gobject.MainLoop().run()