mirror of
https://github.com/KevinMidboe/python-gpiozero.git
synced 2025-10-29 17:50:37 +00:00
Also handle SPI flags in pigpio implementation more elegantly (just store the flags and manipulate them instead of keeping separate fields)
507 lines
18 KiB
Python
507 lines
18 KiB
Python
from __future__ import (
|
|
unicode_literals,
|
|
absolute_import,
|
|
print_function,
|
|
division,
|
|
)
|
|
str = type('')
|
|
|
|
import weakref
|
|
import pigpio
|
|
import os
|
|
|
|
from . import SPI
|
|
from .pi import PiPin, PiFactory
|
|
from .data import pi_info
|
|
from ..devices import Device
|
|
from ..mixins import SharedMixin
|
|
from ..exc import (
|
|
PinInvalidFunction,
|
|
PinSetInput,
|
|
PinFixedPull,
|
|
PinInvalidPull,
|
|
PinInvalidBounce,
|
|
PinInvalidState,
|
|
SPIBadArgs,
|
|
SPIInvalidClockMode,
|
|
)
|
|
|
|
|
|
class PiGPIOFactory(PiFactory):
|
|
"""
|
|
Uses the `pigpio`_ library to interface to the Pi's GPIO pins. The pigpio
|
|
library relies on a daemon (``pigpiod``) to be running as root to provide
|
|
access to the GPIO pins, and communicates with this daemon over a network
|
|
socket.
|
|
|
|
While this does mean only the daemon itself should control the pins, the
|
|
architecture does have several advantages:
|
|
|
|
* Pins can be remote controlled from another machine (the other
|
|
machine doesn't even have to be a Raspberry Pi; it simply needs the
|
|
`pigpio`_ client library installed on it)
|
|
* The daemon supports hardware PWM via the DMA controller
|
|
* Your script itself doesn't require root privileges; it just needs to
|
|
be able to communicate with the daemon
|
|
|
|
You can construct pigpiod pins manually like so::
|
|
|
|
from gpiozero.pins.pigpiod import PiGPIOPin
|
|
from gpiozero import LED
|
|
|
|
led = LED(PiGPIOPin(12))
|
|
|
|
This is particularly useful for controlling pins on a remote machine. To
|
|
accomplish this simply specify the host (and optionally port) when
|
|
constructing the pin::
|
|
|
|
from gpiozero.pins.pigpiod import PiGPIOPin
|
|
from gpiozero import LED
|
|
from signal import pause
|
|
|
|
led = LED(PiGPIOPin(12, host='192.168.0.2'))
|
|
|
|
.. note::
|
|
|
|
In some circumstances, especially when playing with PWM, it does appear
|
|
to be possible to get the daemon into "unusual" states. We would be
|
|
most interested to hear any bug reports relating to this (it may be a
|
|
bug in our pin implementation). A workaround for now is simply to
|
|
restart the ``pigpiod`` daemon.
|
|
|
|
.. _pigpio: http://abyz.co.uk/rpi/pigpio/
|
|
"""
|
|
def __init__(
|
|
self, host=os.getenv('PIGPIO_ADDR', 'localhost'),
|
|
port=int(os.getenv('PIGPIO_PORT', 8888))):
|
|
super(PiGPIOFactory, self).__init__()
|
|
self.pin_class = PiGPIOPin
|
|
self.spi_classes = {
|
|
('hardware', 'exclusive'): PiGPIOHardwareSPI,
|
|
('hardware', 'shared'): PiGPIOHardwareSPIShared,
|
|
('software', 'exclusive'): PiGPIOSoftwareSPI,
|
|
('software', 'shared'): PiGPIOSoftwareSPIShared,
|
|
}
|
|
self._connection = pigpio.pi(host, port)
|
|
self._host = host
|
|
self._port = port
|
|
self._spis = []
|
|
|
|
def close(self):
|
|
super(PiGPIOFactory, self).close()
|
|
# We *have* to keep track of SPI interfaces constructed with pigpio;
|
|
# if we fail to close them they prevent future interfaces from using
|
|
# the same pins
|
|
if self.connection:
|
|
while self._spis:
|
|
self._spis[0].close()
|
|
self.connection.stop()
|
|
self._connection = None
|
|
|
|
@property
|
|
def connection(self):
|
|
# If we're shutting down, the connection may have disconnected itself
|
|
# already. Unfortunately, the connection's "connected" property is
|
|
# rather buggy - disconnecting doesn't set it to False! So we're
|
|
# naughty and check an internal variable instead...
|
|
try:
|
|
if self._connection.sl.s is not None:
|
|
return self._connection
|
|
except AttributeError:
|
|
pass
|
|
|
|
@property
|
|
def host(self):
|
|
return self._host
|
|
|
|
@property
|
|
def port(self):
|
|
return self._port
|
|
|
|
def _get_revision(self):
|
|
return self.connection.get_hardware_revision()
|
|
|
|
def _get_address(self):
|
|
return ("%s:%d" % (self.host, self.port),)
|
|
|
|
def spi(self, **spi_args):
|
|
intf = super(PiGPIOFactory, self).spi(**spi_args)
|
|
self._spis.append(intf)
|
|
return intf
|
|
|
|
|
|
class PiGPIOPin(PiPin):
|
|
_CONNECTIONS = {} # maps (host, port) to (connection, pi_info)
|
|
GPIO_FUNCTIONS = {
|
|
'input': pigpio.INPUT,
|
|
'output': pigpio.OUTPUT,
|
|
'alt0': pigpio.ALT0,
|
|
'alt1': pigpio.ALT1,
|
|
'alt2': pigpio.ALT2,
|
|
'alt3': pigpio.ALT3,
|
|
'alt4': pigpio.ALT4,
|
|
'alt5': pigpio.ALT5,
|
|
}
|
|
|
|
GPIO_PULL_UPS = {
|
|
'up': pigpio.PUD_UP,
|
|
'down': pigpio.PUD_DOWN,
|
|
'floating': pigpio.PUD_OFF,
|
|
}
|
|
|
|
GPIO_EDGES = {
|
|
'both': pigpio.EITHER_EDGE,
|
|
'rising': pigpio.RISING_EDGE,
|
|
'falling': pigpio.FALLING_EDGE,
|
|
}
|
|
|
|
GPIO_FUNCTION_NAMES = {v: k for (k, v) in GPIO_FUNCTIONS.items()}
|
|
GPIO_PULL_UP_NAMES = {v: k for (k, v) in GPIO_PULL_UPS.items()}
|
|
GPIO_EDGES_NAMES = {v: k for (k, v) in GPIO_EDGES.items()}
|
|
|
|
def __init__(self, factory, number):
|
|
super(PiGPIOPin, self).__init__(factory, number)
|
|
self._pull = 'up' if factory.pi_info.pulled_up(self.address[-1]) else 'floating'
|
|
self._pwm = False
|
|
self._bounce = None
|
|
self._when_changed = None
|
|
self._callback = None
|
|
self._edges = pigpio.EITHER_EDGE
|
|
try:
|
|
self.factory.connection.set_mode(self.number, pigpio.INPUT)
|
|
except pigpio.error as e:
|
|
raise ValueError(e)
|
|
self.factory.connection.set_pull_up_down(self.number, self.GPIO_PULL_UPS[self._pull])
|
|
self.factory.connection.set_glitch_filter(self.number, 0)
|
|
|
|
def close(self):
|
|
if self.factory.connection:
|
|
self.frequency = None
|
|
self.when_changed = None
|
|
self.function = 'input'
|
|
self.pull = 'up' if self.factory.pi_info.pulled_up(self.address[-1]) else 'floating'
|
|
|
|
def _get_function(self):
|
|
return self.GPIO_FUNCTION_NAMES[self.factory.connection.get_mode(self.number)]
|
|
|
|
def _set_function(self, value):
|
|
if value != 'input':
|
|
self._pull = 'floating'
|
|
try:
|
|
self.factory.connection.set_mode(self.number, self.GPIO_FUNCTIONS[value])
|
|
except KeyError:
|
|
raise PinInvalidFunction('invalid function "%s" for pin %r' % (value, self))
|
|
|
|
def _get_state(self):
|
|
if self._pwm:
|
|
return (
|
|
self.factory.connection.get_PWM_dutycycle(self.number) /
|
|
self.factory.connection.get_PWM_range(self.number)
|
|
)
|
|
else:
|
|
return bool(self.factory.connection.read(self.number))
|
|
|
|
def _set_state(self, value):
|
|
if self._pwm:
|
|
try:
|
|
value = int(value * self.factory.connection.get_PWM_range(self.number))
|
|
if value != self.factory.connection.get_PWM_dutycycle(self.number):
|
|
self.factory.connection.set_PWM_dutycycle(self.number, value)
|
|
except pigpio.error:
|
|
raise PinInvalidState('invalid state "%s" for pin %r' % (value, self))
|
|
elif self.function == 'input':
|
|
raise PinSetInput('cannot set state of pin %r' % self)
|
|
else:
|
|
# write forces pin to OUTPUT, hence the check above
|
|
self.factory.connection.write(self.number, bool(value))
|
|
|
|
def _get_pull(self):
|
|
return self._pull
|
|
|
|
def _set_pull(self, value):
|
|
if self.function != 'input':
|
|
raise PinFixedPull('cannot set pull on non-input pin %r' % self)
|
|
if value != 'up' and self.factory.pi_info.pulled_up(self.address[-1]):
|
|
raise PinFixedPull('%r has a physical pull-up resistor' % self)
|
|
try:
|
|
self.factory.connection.set_pull_up_down(self.number, self.GPIO_PULL_UPS[value])
|
|
self._pull = value
|
|
except KeyError:
|
|
raise PinInvalidPull('invalid pull "%s" for pin %r' % (value, self))
|
|
|
|
def _get_frequency(self):
|
|
if self._pwm:
|
|
return self.factory.connection.get_PWM_frequency(self.number)
|
|
return None
|
|
|
|
def _set_frequency(self, value):
|
|
if not self._pwm and value is not None:
|
|
self.factory.connection.set_PWM_frequency(self.number, value)
|
|
self.factory.connection.set_PWM_range(self.number, 10000)
|
|
self.factory.connection.set_PWM_dutycycle(self.number, 0)
|
|
self._pwm = True
|
|
elif self._pwm and value is not None:
|
|
if value != self.factory.connection.get_PWM_frequency(self.number):
|
|
self.factory.connection.set_PWM_frequency(self.number, value)
|
|
self.factory.connection.set_PWM_range(self.number, 10000)
|
|
elif self._pwm and value is None:
|
|
self.factory.connection.write(self.number, 0)
|
|
self._pwm = False
|
|
|
|
def _get_bounce(self):
|
|
return None if not self._bounce else self._bounce / 1000000
|
|
|
|
def _set_bounce(self, value):
|
|
if value is None:
|
|
value = 0
|
|
elif value < 0:
|
|
raise PinInvalidBounce('bounce must be 0 or greater')
|
|
self.factory.connection.set_glitch_filter(self.number, int(value * 1000000))
|
|
|
|
def _get_edges(self):
|
|
return self.GPIO_EDGES_NAMES[self._edges]
|
|
|
|
def _set_edges(self, value):
|
|
f = self.when_changed
|
|
self.when_changed = None
|
|
try:
|
|
self._edges = self.GPIO_EDGES[value]
|
|
finally:
|
|
self.when_changed = f
|
|
|
|
def _get_when_changed(self):
|
|
if self._callback is None:
|
|
return None
|
|
return self._callback.callb.func
|
|
|
|
def _set_when_changed(self, value):
|
|
if self._callback is not None:
|
|
self._callback.cancel()
|
|
self._callback = None
|
|
if value is not None:
|
|
self._callback = self.factory.connection.callback(
|
|
self.number, self._edges,
|
|
lambda gpio, level, tick: value())
|
|
|
|
|
|
class PiGPIOHardwareSPI(SPI, Device):
|
|
def __init__(self, factory, port, device):
|
|
self._port = port
|
|
self._device = device
|
|
self._factory = weakref.proxy(factory)
|
|
super(PiGPIOHardwareSPI, self).__init__()
|
|
self._reserve_pins(*(
|
|
factory.address + ('GPIO%d' % pin,)
|
|
for pin in (11, 10, 9, (8, 7)[device])
|
|
))
|
|
self._spi_flags = 8 << 16
|
|
self._baud = 500000
|
|
self._handle = self._factory.connection.spi_open(
|
|
device, self._baud, self._spi_flags())
|
|
|
|
def _conflicts_with(self, other):
|
|
return not (
|
|
isinstance(other, PiGPIOHardwareSPI) and
|
|
(self._port, self._device) != (other._port, other._device)
|
|
)
|
|
|
|
def close(self):
|
|
try:
|
|
self._factory._spis.remove(self)
|
|
except (ReferenceError, ValueError):
|
|
# If the factory has died already or we're not present in its
|
|
# internal list, ignore the error
|
|
pass
|
|
if not self.closed:
|
|
self._factory.connection.spi_close(self._handle)
|
|
self._handle = None
|
|
self._release_all()
|
|
super(PiGPIOHardwareSPI, self).close()
|
|
|
|
@property
|
|
def closed(self):
|
|
return self._handle is None or self._factory.connection is None
|
|
|
|
@property
|
|
def factory(self):
|
|
return self._factory
|
|
|
|
def __repr__(self):
|
|
try:
|
|
self._check_open()
|
|
return 'SPI(port=%d, device=%d)' % (self._port, self._device)
|
|
except DeviceClosed:
|
|
return 'SPI(closed)'
|
|
|
|
def _get_clock_mode(self):
|
|
return self._spi_flags & 0x3
|
|
|
|
def _set_clock_mode(self, value):
|
|
self._check_open()
|
|
if not 0 <= value < 4:
|
|
raise SPIInvalidClockMode("%d is not a valid SPI clock mode" % value)
|
|
self._factory.connection.spi_close(self._handle)
|
|
self._spi_flags = (self._spi_flags & ~0x3) | value
|
|
self._handle = self._factory.connection.spi_open(
|
|
self._device, self._baud, self._spi_flags)
|
|
|
|
def _get_select_high(self):
|
|
return bool((self._spi_flags >> (2 + self._device)) & 0x1)
|
|
|
|
def _set_select_high(self, value):
|
|
self._check_open()
|
|
self._factory.connection.spi_close(self._handle)
|
|
self._spi_flags = (self._spi_flags & ~0x1c) | (bool(value) << (2 + self._device))
|
|
self._handle = self._factory.connection.spi_open(
|
|
self._device, self._baud, self._spi_flags)
|
|
|
|
def _get_bits_per_word(self):
|
|
return (self._spi_flags >> 16) & 0x3f
|
|
|
|
def _set_bits_per_word(self, value):
|
|
self._check_open()
|
|
self._factory.connection.spi_close(self._handle)
|
|
self._spi_flags = (self._spi_flags & ~0x3f0000) | ((value & 0x3f) << 16)
|
|
self._handle = self._factory.connection.spi_open(
|
|
self._device, self._baud, self._spi_flags)
|
|
|
|
def transfer(self, data):
|
|
self._check_open()
|
|
count, data = self._factory.connection.spi_xfer(self._handle, data)
|
|
if count < 0:
|
|
raise IOError('SPI transfer error %d' % count)
|
|
# Convert returned bytearray to list of ints. XXX Not sure how non-byte
|
|
# sized words (aux intf only) are returned ... padded to 16/32-bits?
|
|
return [int(b) for b in data]
|
|
|
|
|
|
class PiGPIOSoftwareSPI(SPI, Device):
|
|
def __init__(self, factory, clock_pin, mosi_pin, miso_pin, select_pin):
|
|
self._closed = True
|
|
self._select_pin = select_pin
|
|
self._clock_pin = clock_pin
|
|
self._mosi_pin = mosi_pin
|
|
self._miso_pin = miso_pin
|
|
self._factory = weakref.proxy(factory)
|
|
super(PiGPIOSoftwareSPI, self).__init__()
|
|
self._reserve_pins(
|
|
factory.pin_address(clock_pin),
|
|
factory.pin_address(mosi_pin),
|
|
factory.pin_address(miso_pin),
|
|
factory.pin_address(select_pin),
|
|
)
|
|
self._spi_flags = 0
|
|
self._baud = 100000
|
|
try:
|
|
self._factory.connection.bb_spi_open(
|
|
select_pin, miso_pin, mosi_pin, clock_pin,
|
|
self._baud, self._spi_flags)
|
|
# Only set after opening bb_spi; if that fails then close() will
|
|
# also fail if bb_spi_close is attempted on an un-open interface
|
|
self._closed = False
|
|
except:
|
|
self.close()
|
|
raise
|
|
|
|
def _conflicts_with(self, other):
|
|
return not (
|
|
isinstance(other, PiGPIOHardwareSPI) and
|
|
(self._select_pin) != (other._select_pin)
|
|
)
|
|
|
|
def close(self):
|
|
try:
|
|
self._factory._spis.remove(self)
|
|
except (ReferenceError, ValueError):
|
|
# If the factory has died already or we're not present in its
|
|
# internal list, ignore the error
|
|
pass
|
|
if not self.closed:
|
|
self._closed = True
|
|
self._factory.connection.bb_spi_close(self._select_pin)
|
|
self._release_all()
|
|
super(PiGPIOSoftwareSPI, self).close()
|
|
|
|
@property
|
|
def closed(self):
|
|
return self._closed
|
|
|
|
def __repr__(self):
|
|
try:
|
|
self._check_open()
|
|
return (
|
|
'SPI(clock_pin=%d, mosi_pin=%d, miso_pin=%d, select_pin=%d)' % (
|
|
self._clock_pin, self._mosi_pin, self._miso_pin, self._select_pin
|
|
))
|
|
except DeviceClosed:
|
|
return 'SPI(closed)'
|
|
|
|
def _spi_flags(self):
|
|
return (
|
|
self._mode << 0 |
|
|
self._select_high << 2 |
|
|
self._lsb_first << 14 |
|
|
self._lsb_first << 15
|
|
)
|
|
|
|
def _get_clock_mode(self):
|
|
return self._spi_flags & 0x3
|
|
|
|
def _set_clock_mode(self, value):
|
|
self._check_open()
|
|
if not 0 <= value < 4:
|
|
raise SPIInvalidClockmode("%d is not a valid SPI clock mode" % value)
|
|
self._factory.connection.bb_spi_close(self._select_pin)
|
|
self._spi_flags = (self._spi_flags & ~0x3) | value
|
|
self._factory.connection.bb_spi_open(
|
|
self._select_pin, self._miso_pin, self._mosi_pin, self._clock_pin,
|
|
self._baud, self._spi_flags)
|
|
|
|
def _get_select_high(self):
|
|
return bool(self._spi_flags & 0x4)
|
|
|
|
def _set_select_high(self, value):
|
|
self._check_open()
|
|
self._factory.connection.bb_spi_close(self._select_pin)
|
|
self._spi_flags = (self._spi_flags & ~0x4) | (bool(value) << 2)
|
|
self._factory.connection.bb_spi_open(
|
|
self._select_pin, self._miso_pin, self._mosi_pin, self._clock_pin,
|
|
self._baud, self._spi_flags)
|
|
|
|
def _get_lsb_first(self):
|
|
return bool(self._spi_flags & 0xc000)
|
|
|
|
def _set_lsb_first(self, value):
|
|
self._check_open()
|
|
self._factory.connection.bb_spi_close(self._select_pin)
|
|
self._spi_flags = (
|
|
(self._spi_flags & ~0xc000)
|
|
| (bool(value) << 14)
|
|
| (bool(value) << 15)
|
|
)
|
|
self._factory.connection.bb_spi_open(
|
|
self._select_pin, self._miso_pin, self._mosi_pin, self._clock_pin,
|
|
self._baud, self._spi_flags)
|
|
|
|
def transfer(self, data):
|
|
self._check_open()
|
|
count, data = self._factory.connection.bb_spi_xfer(self._select_pin, data)
|
|
if count < 0:
|
|
raise IOError('SPI transfer error %d' % count)
|
|
# Convert returned bytearray to list of ints. bb_spi only supports
|
|
# byte-sized words so no issues here
|
|
return [int(b) for b in data]
|
|
|
|
|
|
class PiGPIOHardwareSPIShared(SharedMixin, PiGPIOHardwareSPI):
|
|
@classmethod
|
|
def _shared_key(cls, factory, port, device):
|
|
return (factory, port, device)
|
|
|
|
|
|
class PiGPIOSoftwareSPIShared(SharedMixin, PiGPIOSoftwareSPI):
|
|
@classmethod
|
|
def _shared_key(cls, factory, clock_pin, mosi_pin, miso_pin, select_pin):
|
|
return (factory, select_pin)
|
|
|