mirror of
https://github.com/KevinMidboe/python-gpiozero.git
synced 2025-10-29 17:50:37 +00:00
This commit is a fairly major piece of work that abstracts all pin operations (function, state, edge detection, PWM, etc.) into a base "Pin" class which is then used by input/output/composite devices to perform all required configuration. The idea is to pave the way for I2C based IO extenders which can present additional GPIO ports with similar capabilities to the Pi's "native" GPIO ports. As a bonus it also abstracts away the reliance on the RPi.GPIO library to allow alternative pin implementations (e.g. using RPIO to take advantage of DMA based PWM), or even pure Python implementations.
337 lines
11 KiB
Python
337 lines
11 KiB
Python
from __future__ import (
|
|
unicode_literals,
|
|
absolute_import,
|
|
print_function,
|
|
division,
|
|
)
|
|
nstr = str
|
|
str = type('')
|
|
|
|
import io
|
|
import os
|
|
import mmap
|
|
import errno
|
|
import struct
|
|
from time import sleep
|
|
from threading import Thread, Event, Lock
|
|
from collections import Counter
|
|
|
|
from . import Pin, PINS_CLEANUP
|
|
from .exc import (
|
|
PinInvalidPull,
|
|
PinInvalidEdges,
|
|
PinInvalidFunction,
|
|
PinFixedPull,
|
|
)
|
|
|
|
|
|
class GPIOMemory(object):
|
|
|
|
GPIO_BASE_OFFSET = 0x200000
|
|
PERI_BASE_OFFSET = {
|
|
'BCM2708': 0x20000000,
|
|
'BCM2835': 0x20000000,
|
|
'BCM2709': 0x3f000000,
|
|
'BCM2836': 0x3f000000,
|
|
}
|
|
|
|
# From BCM2835 data-sheet, p.91
|
|
GPFSEL_OFFSET = 0x00 >> 2
|
|
GPSET_OFFSET = 0x1c >> 2
|
|
GPCLR_OFFSET = 0x28 >> 2
|
|
GPLEV_OFFSET = 0x34 >> 2
|
|
GPEDS_OFFSET = 0x40 >> 2
|
|
GPREN_OFFSET = 0x4c >> 2
|
|
GPFEN_OFFSET = 0x58 >> 2
|
|
GPHEN_OFFSET = 0x64 >> 2
|
|
GPLEN_OFFSET = 0x70 >> 2
|
|
GPAREN_OFFSET = 0x7c >> 2
|
|
GPAFEN_OFFSET = 0x88 >> 2
|
|
GPPUD_OFFSET = 0x94 >> 2
|
|
GPPUDCLK_OFFSET = 0x98 >> 2
|
|
|
|
def __init__(self):
|
|
try:
|
|
self.fd = os.open('/dev/gpiomem', os.O_RDWR | os.O_SYNC)
|
|
except OSError:
|
|
try:
|
|
self.fd = os.open('/dev/mem', os.O_RDWR | os.O_SYNC)
|
|
except OSError:
|
|
raise IOError(
|
|
'unable to open /dev/gpiomem or /dev/mem; '
|
|
'upgrade your kernel or run as root')
|
|
else:
|
|
offset = self.peripheral_base() + self.GPIO_BASE_OFFSET
|
|
else:
|
|
offset = 0
|
|
self.mem = mmap.mmap(self.fd, 4096, offset=offset)
|
|
|
|
def close(self):
|
|
self.mem.close()
|
|
os.close(self.fd)
|
|
|
|
def peripheral_base(self):
|
|
try:
|
|
with io.open('/proc/device-tree/soc/ranges', 'rb') as f:
|
|
f.seek(4)
|
|
return struct.unpack(nstr('>L'), f.read(4))[0]
|
|
except IOError:
|
|
with io.open('/proc/cpuinfo', 'r') as f:
|
|
for line in f:
|
|
if line.startswith('Hardware'):
|
|
try:
|
|
return self.PERI_BASE_OFFSET[line.split(':')[1].strip()]
|
|
except KeyError:
|
|
raise IOError('unable to determine RPi revision')
|
|
raise IOError('unable to determine peripheral base')
|
|
|
|
def __getitem__(self, index):
|
|
return struct.unpack_from(nstr('<L'), self.mem, index * 4)[0]
|
|
|
|
def __setitem__(self, index, value):
|
|
struct.pack_into(nstr('<L'), self.mem, index * 4, value)
|
|
|
|
|
|
class GPIOFS(object):
|
|
|
|
GPIO_PATH = '/sys/class/gpio'
|
|
|
|
def __init__(self):
|
|
self._lock = Lock()
|
|
self._pin_refs = Counter()
|
|
|
|
def path(self, name):
|
|
return os.path.join(self.GPIO_PATH, name)
|
|
|
|
def export(self, pin):
|
|
with self._lock:
|
|
if self._pin_refs[pin] == 0:
|
|
# Set the count to 1 to indicate the GPIO is already exported
|
|
# (we'll correct this if we find it isn't, but this enables us
|
|
# to "leave the system in the state we found it")
|
|
self._pin_refs[pin] = 1
|
|
result = None
|
|
# Dirty hack to wait for udev to set permissions on
|
|
# gpioN/direction; there's no other way around this as there's
|
|
# no synchronous mechanism for setting permissions on sysfs
|
|
for i in range(10):
|
|
try:
|
|
result = io.open(self.path('gpio%d/value' % pin), 'w+b', buffering=0)
|
|
except IOError as e:
|
|
if e.errno == errno.ENOENT:
|
|
with io.open(self.path('export'), 'wb') as f:
|
|
f.write(str(pin).encode('ascii'))
|
|
# Pin wasn't exported, so correct the ref-count
|
|
self._pin_refs[pin] = 0
|
|
elif e.errno == errno.EACCES:
|
|
sleep(i / 100)
|
|
else:
|
|
raise
|
|
else:
|
|
break
|
|
if not result:
|
|
raise RuntimeError('failed to export pin %d' % pin)
|
|
else:
|
|
result = io.open(self.path('gpio%d/value' % pin), 'w+b', buffering=0)
|
|
self._pin_refs[pin] += 1
|
|
return result
|
|
|
|
def unexport(self, pin):
|
|
with self._lock:
|
|
self._pin_refs[pin] -= 1
|
|
if self._pin_refs[pin] == 0:
|
|
with io.open(self.path('unexport'), 'wb') as f:
|
|
f.write(str(pin).encode('ascii'))
|
|
|
|
|
|
class NativePin(Pin):
|
|
"""
|
|
Uses a built-in pure Python implementation to interface to the Pi's GPIO
|
|
pins. This is the default pin implementation if no third-party libraries
|
|
are discovered.
|
|
|
|
.. warning::
|
|
|
|
This implementation does *not* currently support PWM. Attempting to
|
|
use any class which requests PWM will raise an exception. This
|
|
implementation is also experimental; we make no guarantees it will
|
|
not eat your Pi for breakfast!
|
|
"""
|
|
|
|
_MEM = None
|
|
_PINS = {}
|
|
|
|
PULL_NAMES = {
|
|
0b00: 'floating',
|
|
0b01: 'down',
|
|
0b10: 'up',
|
|
0b11: 'reserved',
|
|
}
|
|
|
|
FUNCTION_NAMES = {
|
|
0b000: 'input',
|
|
0b001: 'output',
|
|
0b100: 'alt0',
|
|
0b101: 'alt1',
|
|
0b110: 'alt2',
|
|
0b111: 'alt3',
|
|
0b011: 'alt4',
|
|
0b010: 'alt5',
|
|
}
|
|
|
|
EDGE_NAMES = {
|
|
(True, True): 'both',
|
|
(True, False): 'rising',
|
|
(False, True): 'falling',
|
|
(False, False): 'none',
|
|
}
|
|
|
|
PULL_VALUES = {v: k for (k, v) in PULL_NAMES.items()}
|
|
FUNCTION_VALUES = {v: k for (k, v) in FUNCTION_NAMES.items()}
|
|
EDGE_VALUES = {v: k for (k, v) in EDGE_NAMES.items()}
|
|
|
|
def __new__(cls, number):
|
|
if not cls._PINS:
|
|
cls._MEM = GPIOMemory()
|
|
PINS_CLEANUP.append(cls._MEM.close)
|
|
if not (0 <= number < 54):
|
|
raise ValueError('invalid pin %d specified (must be 0..53)' % number)
|
|
try:
|
|
return cls._PINS[number]
|
|
except KeyError:
|
|
self = super(NativePin, cls).__new__(cls)
|
|
cls._PINS[number] = self
|
|
self._number = number
|
|
self._func_offset = self._MEM.GPFSEL_OFFSET + (number // 10)
|
|
self._func_shift = (number % 10) * 3
|
|
self._set_offset = self._MEM.GPSET_OFFSET + (number // 32)
|
|
self._set_shift = number % 32
|
|
self._clear_offset = self._MEM.GPCLR_OFFSET + (number // 32)
|
|
self._clear_shift = number % 32
|
|
self._level_offset = self._MEM.GPLEV_OFFSET + (number // 32)
|
|
self._level_shift = number % 32
|
|
self._pull_offset = self._MEM.GPPUDCLK_OFFSET + (number // 32)
|
|
self._pull_shift = number % 32
|
|
self._edge_offset = self._MEM.GPEDS_OFFSET + (number // 32)
|
|
self._edge_shift = number % 32
|
|
self._rising_offset = self._MEM.GPREN_OFFSET + (number // 32)
|
|
self._rising_shift = number % 32
|
|
self._falling_offset = self._MEM.GPFEN_OFFSET + (number // 32)
|
|
self._falling_shift = number % 32
|
|
self._when_changed = None
|
|
self._change_thread = None
|
|
self._change_event = Event()
|
|
self.function = 'input'
|
|
self.pull = 'up' if number in (2, 3) else 'floating'
|
|
self.bounce = None
|
|
self.edges = 'both'
|
|
return self
|
|
|
|
def __repr__(self):
|
|
return "GPIO%d" % self._number
|
|
|
|
@property
|
|
def number(self):
|
|
return self._number
|
|
|
|
def close(self):
|
|
self.when_changed = None
|
|
self.function = 'input'
|
|
|
|
def _get_function(self):
|
|
return self.FUNCTION_NAMES[(self._MEM[self._func_offset] >> self._func_shift) & 7]
|
|
|
|
def _set_function(self, value):
|
|
try:
|
|
value = self.FUNCTION_VALUES[value]
|
|
except KeyError:
|
|
raise PinInvalidFunction('invalid function "%s" for pin %r' % self)
|
|
self._MEM[self._func_offset] = (
|
|
self._MEM[self._func_offset]
|
|
& ~(7 << self._func_shift)
|
|
| (value << self._func_shift)
|
|
)
|
|
|
|
def _get_state(self):
|
|
return bool(self._MEM[self._level_offset] & (1 << self._level_shift))
|
|
|
|
def _set_state(self, value):
|
|
if value:
|
|
self._MEM[self._set_offset] = 1 << self._set_shift
|
|
else:
|
|
self._MEM[self._clear_offset] = 1 << self._clear_shift
|
|
|
|
def _get_pull(self):
|
|
return self.PULL_NAMES[self._pull]
|
|
|
|
def _set_pull(self, value):
|
|
if self.function != 'input':
|
|
raise PinFixedPull('cannot set pull on non-input pin %r' % self)
|
|
if value != 'up' and self._number in (2, 3):
|
|
raise PinFixedPull('%r has a physical pull-up resistor' % self)
|
|
try:
|
|
value = self.PULL_VALUES[value]
|
|
except KeyError:
|
|
raise PinInvalidPull('invalid pull direction "%s" for pin %r' % self)
|
|
self._MEM[self._MEM.GPPUD_OFFSET] = value
|
|
sleep(0.000000214)
|
|
self._MEM[self._pull_offset] = 1 << self._pull_shift
|
|
sleep(0.000000214)
|
|
self._MEM[self._MEM.GPPUD_OFFSET] = 0
|
|
self._MEM[self._pull_offset] = 0
|
|
self._pull = value
|
|
|
|
def _get_edges(self):
|
|
rising = bool(self._MEM[self._rising_offset] & (1 << self._rising_shift))
|
|
falling = bool(self._MEM[self._falling_offset] & (1 << self._falling_shift))
|
|
return self.EDGE_NAMES[(rising, falling)]
|
|
|
|
def _set_edges(self, value):
|
|
try:
|
|
rising, falling = self.EDGE_VALUES[value]
|
|
except KeyError:
|
|
raise PinInvalidEdges('invalid edge specification "%s" for pin %r' % self)
|
|
f = self.when_changed
|
|
self.when_changed = None
|
|
try:
|
|
self._MEM[self._rising_offset] = (
|
|
self._MEM[self._rising_offset]
|
|
& ~(1 << self._rising_shift)
|
|
| (rising << self._rising_shift)
|
|
)
|
|
self._MEM[self._falling_offset] = (
|
|
self._MEM[self._falling_offset]
|
|
& ~(1 << self._falling_shift)
|
|
| (falling << self._falling_shift)
|
|
)
|
|
finally:
|
|
self.when_changed = f
|
|
|
|
def _get_when_changed(self):
|
|
return self._when_changed
|
|
|
|
def _set_when_changed(self, value):
|
|
if self._when_changed is None and value is not None:
|
|
self._when_changed = value
|
|
self._change_thread = Thread(target=self._change_watch)
|
|
self._change_thread.daemon = True
|
|
self._change_event.clear()
|
|
self._change_thread.start()
|
|
elif self._when_changed is not None and value is None:
|
|
self._change_event.set()
|
|
self._change_thread.join()
|
|
self._change_thread = None
|
|
self._when_changed = None
|
|
else:
|
|
self._when_changed = value
|
|
|
|
def _change_watch(self):
|
|
offset = self._edge_offset
|
|
mask = 1 << self._edge_shift
|
|
self._MEM[offset] = mask # clear any existing detection bit
|
|
while not self._change_event.wait(0.001):
|
|
if self._MEM[offset] & mask:
|
|
self._MEM[offset] = mask
|
|
self._when_changed()
|
|
|