from __future__ import ( unicode_literals, absolute_import, print_function, division, ) nstr = str str = type('') import io import os import mmap import errno import struct from time import sleep from threading import Thread, Event, Lock from collections import Counter from . import Pin, PINS_CLEANUP from ..exc import ( PinInvalidPull, PinInvalidEdges, PinInvalidFunction, PinFixedPull, ) class GPIOMemory(object): GPIO_BASE_OFFSET = 0x200000 PERI_BASE_OFFSET = { 'BCM2708': 0x20000000, 'BCM2835': 0x20000000, 'BCM2709': 0x3f000000, 'BCM2836': 0x3f000000, } # From BCM2835 data-sheet, p.91 GPFSEL_OFFSET = 0x00 >> 2 GPSET_OFFSET = 0x1c >> 2 GPCLR_OFFSET = 0x28 >> 2 GPLEV_OFFSET = 0x34 >> 2 GPEDS_OFFSET = 0x40 >> 2 GPREN_OFFSET = 0x4c >> 2 GPFEN_OFFSET = 0x58 >> 2 GPHEN_OFFSET = 0x64 >> 2 GPLEN_OFFSET = 0x70 >> 2 GPAREN_OFFSET = 0x7c >> 2 GPAFEN_OFFSET = 0x88 >> 2 GPPUD_OFFSET = 0x94 >> 2 GPPUDCLK_OFFSET = 0x98 >> 2 def __init__(self): try: self.fd = os.open('/dev/gpiomem', os.O_RDWR | os.O_SYNC) except OSError: try: self.fd = os.open('/dev/mem', os.O_RDWR | os.O_SYNC) except OSError: raise IOError( 'unable to open /dev/gpiomem or /dev/mem; ' 'upgrade your kernel or run as root') else: offset = self.peripheral_base() + self.GPIO_BASE_OFFSET else: offset = 0 self.mem = mmap.mmap(self.fd, 4096, offset=offset) def close(self): self.mem.close() os.close(self.fd) def peripheral_base(self): try: with io.open('/proc/device-tree/soc/ranges', 'rb') as f: f.seek(4) return struct.unpack(nstr('>L'), f.read(4))[0] except IOError: with io.open('/proc/cpuinfo', 'r') as f: for line in f: if line.startswith('Hardware'): try: return self.PERI_BASE_OFFSET[line.split(':')[1].strip()] except KeyError: raise IOError('unable to determine RPi revision') raise IOError('unable to determine peripheral base') def __getitem__(self, index): return struct.unpack_from(nstr('> self._func_shift) & 7] def _set_function(self, value): try: value = self.FUNCTION_VALUES[value] except KeyError: raise PinInvalidFunction('invalid function "%s" for pin %r' % self) self._MEM[self._func_offset] = ( self._MEM[self._func_offset] & ~(7 << self._func_shift) | (value << self._func_shift) ) def _get_state(self): return bool(self._MEM[self._level_offset] & (1 << self._level_shift)) def _set_state(self, value): if value: self._MEM[self._set_offset] = 1 << self._set_shift else: self._MEM[self._clear_offset] = 1 << self._clear_shift def _get_pull(self): return self.PULL_NAMES[self._pull] def _set_pull(self, value): if self.function != 'input': raise PinFixedPull('cannot set pull on non-input pin %r' % self) if value != 'up' and self._number in (2, 3): raise PinFixedPull('%r has a physical pull-up resistor' % self) try: value = self.PULL_VALUES[value] except KeyError: raise PinInvalidPull('invalid pull direction "%s" for pin %r' % self) self._MEM[self._MEM.GPPUD_OFFSET] = value sleep(0.000000214) self._MEM[self._pull_offset] = 1 << self._pull_shift sleep(0.000000214) self._MEM[self._MEM.GPPUD_OFFSET] = 0 self._MEM[self._pull_offset] = 0 self._pull = value def _get_edges(self): rising = bool(self._MEM[self._rising_offset] & (1 << self._rising_shift)) falling = bool(self._MEM[self._falling_offset] & (1 << self._falling_shift)) return self.EDGE_NAMES[(rising, falling)] def _set_edges(self, value): try: rising, falling = self.EDGE_VALUES[value] except KeyError: raise PinInvalidEdges('invalid edge specification "%s" for pin %r' % self) f = self.when_changed self.when_changed = None try: self._MEM[self._rising_offset] = ( self._MEM[self._rising_offset] & ~(1 << self._rising_shift) | (rising << self._rising_shift) ) self._MEM[self._falling_offset] = ( self._MEM[self._falling_offset] & ~(1 << self._falling_shift) | (falling << self._falling_shift) ) finally: self.when_changed = f def _get_when_changed(self): return self._when_changed def _set_when_changed(self, value): if self._when_changed is None and value is not None: self._when_changed = value self._change_thread = Thread(target=self._change_watch) self._change_thread.daemon = True self._change_event.clear() self._change_thread.start() elif self._when_changed is not None and value is None: self._change_event.set() self._change_thread.join() self._change_thread = None self._when_changed = None else: self._when_changed = value def _change_watch(self): offset = self._edge_offset mask = 1 << self._edge_shift self._MEM[offset] = mask # clear any existing detection bit while not self._change_event.wait(0.001): if self._MEM[offset] & mask: self._MEM[offset] = mask self._when_changed()