mirror of
https://github.com/KevinMidboe/python-gpiozero.git
synced 2025-10-29 17:50:37 +00:00
Fix #459 - properly support remote SPI with pigpio
Sorry! Dave's messing around with the pin implementations again. Hopefully the last time. The pin_factory is now really a factory object which can be asked to produce individual pins or pin-based interfaces like SPI (which can be supported properly via pigpio).
This commit is contained in:
@@ -13,20 +13,18 @@ import mmap
|
||||
import errno
|
||||
import struct
|
||||
import warnings
|
||||
import weakref
|
||||
from time import sleep
|
||||
from threading import Thread, Event, Lock
|
||||
from collections import Counter
|
||||
|
||||
from . import LocalPin, PINS_CLEANUP
|
||||
from .data import pi_info
|
||||
from .local import LocalPiPin, LocalPiFactory
|
||||
from ..exc import (
|
||||
PinInvalidPull,
|
||||
PinInvalidEdges,
|
||||
PinInvalidFunction,
|
||||
PinFixedPull,
|
||||
PinSetInput,
|
||||
PinNonPhysical,
|
||||
PinNoPins,
|
||||
)
|
||||
|
||||
|
||||
@@ -149,7 +147,7 @@ class GPIOFS(object):
|
||||
f.write(str(pin).encode('ascii'))
|
||||
|
||||
|
||||
class NativePin(LocalPin):
|
||||
class NativeFactory(LocalPiFactory):
|
||||
"""
|
||||
Uses a built-in pure Python implementation to interface to the Pi's GPIO
|
||||
pins. This is the default pin implementation if no third-party libraries
|
||||
@@ -169,10 +167,17 @@ class NativePin(LocalPin):
|
||||
|
||||
led = LED(NativePin(12))
|
||||
"""
|
||||
def __init__(self):
|
||||
super(NativeFactory, self).__init__()
|
||||
self.mem = GPIOMemory()
|
||||
self.pin_class = NativePin
|
||||
|
||||
_MEM = None
|
||||
_PINS = {}
|
||||
def close(self):
|
||||
super(NativeFactory, self).close()
|
||||
self.mem.close()
|
||||
|
||||
|
||||
class NativePin(LocalPiPin):
|
||||
GPIO_FUNCTIONS = {
|
||||
'input': 0b000,
|
||||
'output': 0b001,
|
||||
@@ -202,89 +207,62 @@ class NativePin(LocalPin):
|
||||
GPIO_PULL_UP_NAMES = {v: k for (k, v) in GPIO_PULL_UPS.items()}
|
||||
GPIO_EDGES_NAMES = {v: k for (k, v) in GPIO_EDGES.items()}
|
||||
|
||||
PI_INFO = None
|
||||
|
||||
def __new__(cls, number):
|
||||
if not cls._PINS:
|
||||
cls._MEM = GPIOMemory()
|
||||
PINS_CLEANUP.append(cls._MEM.close)
|
||||
if cls.PI_INFO is None:
|
||||
cls.PI_INFO = pi_info()
|
||||
if not (0 <= number < 54):
|
||||
raise ValueError('invalid pin %d specified (must be 0..53)' % number)
|
||||
try:
|
||||
return cls._PINS[number]
|
||||
except KeyError:
|
||||
self = super(NativePin, cls).__new__(cls)
|
||||
try:
|
||||
cls.PI_INFO.physical_pin('GPIO%d' % number)
|
||||
except PinNoPins:
|
||||
warnings.warn(
|
||||
PinNonPhysical(
|
||||
'no physical pins exist for GPIO%d' % number))
|
||||
self._number = number
|
||||
self._func_offset = self._MEM.GPFSEL_OFFSET + (number // 10)
|
||||
self._func_shift = (number % 10) * 3
|
||||
self._set_offset = self._MEM.GPSET_OFFSET + (number // 32)
|
||||
self._set_shift = number % 32
|
||||
self._clear_offset = self._MEM.GPCLR_OFFSET + (number // 32)
|
||||
self._clear_shift = number % 32
|
||||
self._level_offset = self._MEM.GPLEV_OFFSET + (number // 32)
|
||||
self._level_shift = number % 32
|
||||
self._pull_offset = self._MEM.GPPUDCLK_OFFSET + (number // 32)
|
||||
self._pull_shift = number % 32
|
||||
self._edge_offset = self._MEM.GPEDS_OFFSET + (number // 32)
|
||||
self._edge_shift = number % 32
|
||||
self._rising_offset = self._MEM.GPREN_OFFSET + (number // 32)
|
||||
self._rising_shift = number % 32
|
||||
self._falling_offset = self._MEM.GPFEN_OFFSET + (number // 32)
|
||||
self._falling_shift = number % 32
|
||||
self._when_changed = None
|
||||
self._change_thread = None
|
||||
self._change_event = Event()
|
||||
self.function = 'input'
|
||||
self.pull = 'up' if cls.PI_INFO.pulled_up('GPIO%d' % number) else 'floating'
|
||||
self.bounce = None
|
||||
self.edges = 'both'
|
||||
cls._PINS[number] = self
|
||||
return self
|
||||
|
||||
def __repr__(self):
|
||||
return "GPIO%d" % self._number
|
||||
|
||||
@property
|
||||
def number(self):
|
||||
return self._number
|
||||
def __init__(self, factory, number):
|
||||
super(NativePin, self).__init__(factory, number)
|
||||
self._func_offset = self.factory.mem.GPFSEL_OFFSET + (number // 10)
|
||||
self._func_shift = (number % 10) * 3
|
||||
self._set_offset = self.factory.mem.GPSET_OFFSET + (number // 32)
|
||||
self._set_shift = number % 32
|
||||
self._clear_offset = self.factory.mem.GPCLR_OFFSET + (number // 32)
|
||||
self._clear_shift = number % 32
|
||||
self._level_offset = self.factory.mem.GPLEV_OFFSET + (number // 32)
|
||||
self._level_shift = number % 32
|
||||
self._pull_offset = self.factory.mem.GPPUDCLK_OFFSET + (number // 32)
|
||||
self._pull_shift = number % 32
|
||||
self._edge_offset = self.factory.mem.GPEDS_OFFSET + (number // 32)
|
||||
self._edge_shift = number % 32
|
||||
self._rising_offset = self.factory.mem.GPREN_OFFSET + (number // 32)
|
||||
self._rising_shift = number % 32
|
||||
self._falling_offset = self.factory.mem.GPFEN_OFFSET + (number // 32)
|
||||
self._falling_shift = number % 32
|
||||
self._when_changed = None
|
||||
self._change_thread = None
|
||||
self._change_event = Event()
|
||||
self.function = 'input'
|
||||
self.pull = 'up' if factory.pi_info.pulled_up('GPIO%d' % number) else 'floating'
|
||||
self.bounce = None
|
||||
self.edges = 'both'
|
||||
|
||||
def close(self):
|
||||
self.frequency = None
|
||||
self.when_changed = None
|
||||
self.function = 'input'
|
||||
self.pull = 'up' if self.PI_INFO.pulled_up('GPIO%d' % self.number) else 'floating'
|
||||
self.pull = 'up' if self.factory.pi_info.pulled_up('GPIO%d' % self.number) else 'floating'
|
||||
|
||||
def _get_function(self):
|
||||
return self.GPIO_FUNCTION_NAMES[(self._MEM[self._func_offset] >> self._func_shift) & 7]
|
||||
return self.GPIO_FUNCTION_NAMES[(self.factory.mem[self._func_offset] >> self._func_shift) & 7]
|
||||
|
||||
def _set_function(self, value):
|
||||
try:
|
||||
value = self.GPIO_FUNCTIONS[value]
|
||||
except KeyError:
|
||||
raise PinInvalidFunction('invalid function "%s" for pin %r' % (value, self))
|
||||
self._MEM[self._func_offset] = (
|
||||
self._MEM[self._func_offset]
|
||||
self.factory.mem[self._func_offset] = (
|
||||
self.factory.mem[self._func_offset]
|
||||
& ~(7 << self._func_shift)
|
||||
| (value << self._func_shift)
|
||||
)
|
||||
|
||||
def _get_state(self):
|
||||
return bool(self._MEM[self._level_offset] & (1 << self._level_shift))
|
||||
return bool(self.factory.mem[self._level_offset] & (1 << self._level_shift))
|
||||
|
||||
def _set_state(self, value):
|
||||
if self.function == 'input':
|
||||
raise PinSetInput('cannot set state of pin %r' % self)
|
||||
if value:
|
||||
self._MEM[self._set_offset] = 1 << self._set_shift
|
||||
self.factory.mem[self._set_offset] = 1 << self._set_shift
|
||||
else:
|
||||
self._MEM[self._clear_offset] = 1 << self._clear_shift
|
||||
self.factory.mem[self._clear_offset] = 1 << self._clear_shift
|
||||
|
||||
def _get_pull(self):
|
||||
return self.GPIO_PULL_UP_NAMES[self._pull]
|
||||
@@ -292,23 +270,23 @@ class NativePin(LocalPin):
|
||||
def _set_pull(self, value):
|
||||
if self.function != 'input':
|
||||
raise PinFixedPull('cannot set pull on non-input pin %r' % self)
|
||||
if value != 'up' and self.PI_INFO.pulled_up('GPIO%d' % self.number):
|
||||
if value != 'up' and self.factory.pi_info.pulled_up('GPIO%d' % self.number):
|
||||
raise PinFixedPull('%r has a physical pull-up resistor' % self)
|
||||
try:
|
||||
value = self.GPIO_PULL_UPS[value]
|
||||
except KeyError:
|
||||
raise PinInvalidPull('invalid pull direction "%s" for pin %r' % (value, self))
|
||||
self._MEM[self._MEM.GPPUD_OFFSET] = value
|
||||
self.factory.mem[self.factory.mem.GPPUD_OFFSET] = value
|
||||
sleep(0.000000214)
|
||||
self._MEM[self._pull_offset] = 1 << self._pull_shift
|
||||
self.factory.mem[self._pull_offset] = 1 << self._pull_shift
|
||||
sleep(0.000000214)
|
||||
self._MEM[self._MEM.GPPUD_OFFSET] = 0
|
||||
self._MEM[self._pull_offset] = 0
|
||||
self.factory.mem[self.factory.mem.GPPUD_OFFSET] = 0
|
||||
self.factory.mem[self._pull_offset] = 0
|
||||
self._pull = value
|
||||
|
||||
def _get_edges(self):
|
||||
rising = bool(self._MEM[self._rising_offset] & (1 << self._rising_shift))
|
||||
falling = bool(self._MEM[self._falling_offset] & (1 << self._falling_shift))
|
||||
rising = bool(self.factory.mem[self._rising_offset] & (1 << self._rising_shift))
|
||||
falling = bool(self.factory.mem[self._falling_offset] & (1 << self._falling_shift))
|
||||
return self.GPIO_EDGES_NAMES[(rising, falling)]
|
||||
|
||||
def _set_edges(self, value):
|
||||
@@ -319,13 +297,13 @@ class NativePin(LocalPin):
|
||||
f = self.when_changed
|
||||
self.when_changed = None
|
||||
try:
|
||||
self._MEM[self._rising_offset] = (
|
||||
self._MEM[self._rising_offset]
|
||||
self.factory.mem[self._rising_offset] = (
|
||||
self.factory.mem[self._rising_offset]
|
||||
& ~(1 << self._rising_shift)
|
||||
| (rising << self._rising_shift)
|
||||
)
|
||||
self._MEM[self._falling_offset] = (
|
||||
self._MEM[self._falling_offset]
|
||||
self.factory.mem[self._falling_offset] = (
|
||||
self.factory.mem[self._falling_offset]
|
||||
& ~(1 << self._falling_shift)
|
||||
| (falling << self._falling_shift)
|
||||
)
|
||||
@@ -353,9 +331,9 @@ class NativePin(LocalPin):
|
||||
def _change_watch(self):
|
||||
offset = self._edge_offset
|
||||
mask = 1 << self._edge_shift
|
||||
self._MEM[offset] = mask # clear any existing detection bit
|
||||
self.factory.mem[offset] = mask # clear any existing detection bit
|
||||
while not self._change_event.wait(0.001):
|
||||
if self._MEM[offset] & mask:
|
||||
self._MEM[offset] = mask
|
||||
if self.factory.mem[offset] & mask:
|
||||
self.factory.mem[offset] = mask
|
||||
self._when_changed()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user