Source code for input.drivers.pcf8574

import smbus
from time import sleep

from skeleton import InputSkeleton

[docs]class InputDevice(InputSkeleton): """ A driver for PCF8574-based I2C IO expanders. They have 8 IO pins available as well as an interrupt pin. This driver treats all 8 pins as button pins, which is often the case. It supports both interrupt-driven mode (as fr now, RPi-only) and polling mode.""" default_mapping = [ "KEY_KPENTER", "KEY_DELETE", "KEY_END", "KEY_DOWN", "KEY_UP", "KEY_HOME", "KEY_LEFT", "KEY_RIGHT"] previous_data = 0
[docs] def __init__(self, addr = 0x27, bus = 1, int_pin = None, **kwargs): """Initialises the ``InputDevice`` object. Kwargs: * ``bus``: I2C bus number. * ``addr``: I2C address of the expander. * ``int_pin``: GPIO pin to which INT pin of the expander is connected. If supplied, interrupt-driven mode is used, otherwise, library reverts to polling mode. """ self.bus_num = bus self.bus = smbus.SMBus(self.bus_num) if type(addr) in [str, unicode]: addr = int(addr, 16) self.addr = addr self.int_pin = int_pin self.init_expander() InputSkeleton.__init__(self, **kwargs)
def init_expander(self): try: self.bus.write_byte(self.addr, 0xff) except IOError: return True else: return False
[docs] def runner(self): """Starts listening on the input device. Initialises the IO expander and runs either interrupt-driven or polling loop.""" self.stop_flag = False if self.int_pin is None: self.loop_polling() else: self.loop_interrupts()
[docs] def loop_interrupts(self): """Interrupt-driven loop. Currently can only use ``RPi.GPIO`` library. Stops when ``stop_flag`` is set to True.""" import RPi.GPIO as GPIO GPIO.setmode(GPIO.BCM) # Broadcom pin-numbering scheme GPIO.setup(self.int_pin, GPIO.IN) button_states = [] while not self.stop_flag: while GPIO.input(self.int_pin) == False and self.enabled: data = (~self.bus.read_byte(self.addr)&0xFF) self.process_data(data) self.previous_data = data sleep(0.1)
[docs] def loop_polling(self): """Polling loop. Stops when ``stop_flag`` is set to True.""" button_states = [] while not self.stop_flag: if self.enabled: data = (~self.bus.read_byte(self.addr)&0xFF) if data != self.previous_data: self.process_data(data) self.previous_data = data sleep(0.1)
[docs] def process_data(self, data): """Checks data received from IO expander and classifies changes as either "button up" or "button down" events. On "button up", calls send_key with the corresponding button name from ``self.mapping``. """ data_difference = data ^ self.previous_data changed_buttons = [] for i in range(8): if data_difference & 1<<i: changed_buttons.append(i) for button_number in changed_buttons: if not data & 1<<button_number: self.send_key(self.mapping[button_number])
if __name__ == "__main__": id = InputDevice(addr = 0x23, threaded=False) id.runner()