mirror of
https://github.com/KevinMidboe/python-gpiozero.git
synced 2025-10-29 17:50:37 +00:00
Also handle SPI flags in pigpio implementation more elegantly (just store the flags and manipulate them instead of keeping separate fields)
243 lines
7.4 KiB
Python
243 lines
7.4 KiB
Python
from __future__ import (
|
|
unicode_literals,
|
|
absolute_import,
|
|
print_function,
|
|
division,
|
|
)
|
|
str = type('')
|
|
|
|
import io
|
|
import warnings
|
|
|
|
try:
|
|
from spidev import SpiDev
|
|
except ImportError:
|
|
SpiDev = None
|
|
|
|
from . import SPI
|
|
from .pi import PiFactory, PiPin
|
|
from .spi import SPISoftwareBus
|
|
from ..devices import Device, SharedMixin
|
|
from ..output_devices import OutputDevice
|
|
from ..exc import DeviceClosed, PinUnknownPi, SPIInvalidClockMode
|
|
|
|
|
|
class LocalPiFactory(PiFactory):
|
|
"""
|
|
Abstract base class representing pins attached locally to a Pi. This forms
|
|
the base class for local-only pin interfaces (:class:`RPiGPIOPin`,
|
|
:class:`RPIOPin`, and :class:`NativePin`).
|
|
"""
|
|
pins = {}
|
|
|
|
def __init__(self):
|
|
super(LocalPiFactory, self).__init__()
|
|
self.spi_classes = {
|
|
('hardware', 'exclusive'): LocalPiHardwareSPI,
|
|
('hardware', 'shared'): LocalPiHardwareSPIShared,
|
|
('software', 'exclusive'): LocalPiSoftwareSPI,
|
|
('software', 'shared'): LocalPiSoftwareSPIShared,
|
|
}
|
|
# Override the pins dict to be this class' pins dict. This is a bit of
|
|
# a dirty hack, but ensures that anyone evil enough to mix pin
|
|
# implementations doesn't try and control the same pin with different
|
|
# backends
|
|
self.pins = LocalPiFactory.pins
|
|
|
|
def _get_address(self):
|
|
return ('localhost',)
|
|
|
|
def _get_revision(self):
|
|
# Cache the result as we can reasonably assume it won't change during
|
|
# runtime (this is LocalPin after all; descendents that deal with
|
|
# remote Pis should inherit from Pin instead)
|
|
with io.open('/proc/cpuinfo', 'r') as f:
|
|
for line in f:
|
|
if line.startswith('Revision'):
|
|
revision = line.split(':')[1].strip().lower()
|
|
overvolted = revision.startswith('100')
|
|
if overvolted:
|
|
revision = revision[-4:]
|
|
return revision
|
|
raise PinUnknownPi('unable to locate Pi revision in /proc/cpuinfo')
|
|
|
|
|
|
class LocalPiPin(PiPin):
|
|
"""
|
|
Abstract base class representing a multi-function GPIO pin attached to the
|
|
local Raspberry Pi.
|
|
"""
|
|
pass
|
|
|
|
|
|
class LocalPiHardwareSPI(SPI, Device):
|
|
def __init__(self, factory, port, device):
|
|
if SpiDev is None:
|
|
raise ImportError('failed to import spidev')
|
|
self._port = port
|
|
self._device = device
|
|
self._interface = None
|
|
self._address = factory.address + ('SPI(port=%d, device=%d)' % (port, device),)
|
|
super(LocalPiHardwareSPI, self).__init__()
|
|
self._reserve_pins(
|
|
factory.pin_address(11),
|
|
factory.pin_address(10),
|
|
factory.pin_address(9),
|
|
factory.pin_address((8, 7)[device])
|
|
)
|
|
self._interface = SpiDev()
|
|
self._interface.open(port, device)
|
|
self._interface.max_speed_hz = 500000
|
|
|
|
def close(self):
|
|
if self._interface:
|
|
try:
|
|
self._interface.close()
|
|
finally:
|
|
self._interface = None
|
|
self._release_all()
|
|
super(LocalPiHardwareSPI, self).close()
|
|
|
|
@property
|
|
def closed(self):
|
|
return self._interface is None
|
|
|
|
def __repr__(self):
|
|
try:
|
|
self._check_open()
|
|
return 'SPI(port=%d, device=%d)' % (self._port, self._device)
|
|
except DeviceClosed:
|
|
return 'SPI(closed)'
|
|
|
|
def transfer(self, data):
|
|
"""
|
|
Writes data (a list of integer words where each word is assumed to have
|
|
:attr:`bits_per_word` bits or less) to the SPI interface, and reads an
|
|
equivalent number of words, returning them as a list of integers.
|
|
"""
|
|
return self._interface.xfer2(data)
|
|
|
|
def _get_clock_mode(self):
|
|
return self._interface.mode
|
|
|
|
def _set_clock_mode(self, value):
|
|
self._interface.mode = value
|
|
|
|
def _get_lsb_first(self):
|
|
return self._interface.lsbfirst
|
|
|
|
def _set_lsb_first(self, value):
|
|
self._interface.lsbfirst = bool(value)
|
|
|
|
def _get_select_high(self):
|
|
return self._interface.cshigh
|
|
|
|
def _set_select_high(self, value):
|
|
self._interface.cshigh = bool(value)
|
|
|
|
def _get_bits_per_word(self):
|
|
return self._interface.bits_per_word
|
|
|
|
def _set_bits_per_word(self, value):
|
|
self._interface.bits_per_word = value
|
|
|
|
|
|
class LocalPiSoftwareSPI(SPI, OutputDevice):
|
|
def __init__(self, factory, clock_pin, mosi_pin, miso_pin, select_pin):
|
|
self._bus = None
|
|
self._address = factory.address + (
|
|
'SPI(clock_pin=%d, mosi_pin=%d, miso_pin=%d, select_pin=%d)' % (
|
|
clock_pin, mosi_pin, miso_pin, select_pin),
|
|
)
|
|
super(LocalPiSoftwareSPI, self).__init__(select_pin, active_high=False)
|
|
try:
|
|
self._clock_phase = False
|
|
self._lsb_first = False
|
|
self._bits_per_word = 8
|
|
self._bus = SPISoftwareBus(clock_pin, mosi_pin, miso_pin)
|
|
except:
|
|
self.close()
|
|
raise
|
|
|
|
def _conflicts_with(self, other):
|
|
return not (
|
|
isinstance(other, LocalPiSoftwareSPI) and
|
|
(self.pin.number != other.pin.number)
|
|
)
|
|
|
|
def close(self):
|
|
if self._bus:
|
|
self._bus.close()
|
|
self._bus = None
|
|
super(LocalPiSoftwareSPI, self).close()
|
|
|
|
@property
|
|
def closed(self):
|
|
return self._bus is None
|
|
|
|
def __repr__(self):
|
|
try:
|
|
self._check_open()
|
|
return 'SPI(clock_pin=%d, mosi_pin=%d, miso_pin=%d, select_pin=%d)' % (
|
|
self._bus.clock.pin.number,
|
|
self._bus.mosi.pin.number,
|
|
self._bus.miso.pin.number,
|
|
self.pin.number)
|
|
except DeviceClosed:
|
|
return 'SPI(closed)'
|
|
|
|
def transfer(self, data):
|
|
with self._bus.lock:
|
|
self.on()
|
|
try:
|
|
return self._bus.transfer(
|
|
data, self._clock_phase, self._lsb_first, self._bits_per_word)
|
|
finally:
|
|
self.off()
|
|
|
|
def _get_clock_mode(self):
|
|
with self._bus.lock:
|
|
return (not self._bus.clock.active_high) << 1 | self._clock_phase
|
|
|
|
def _set_clock_mode(self, value):
|
|
if not (0 <= value < 4):
|
|
raise SPIInvalidClockMode("%d is not a valid clock mode" % value)
|
|
with self._bus.lock:
|
|
self._bus.clock.active_high = not (value & 2)
|
|
self._clock_phase = bool(value & 1)
|
|
|
|
def _get_lsb_first(self):
|
|
return self._lsb_first
|
|
|
|
def _set_lsb_first(self, value):
|
|
self._lsb_first = bool(value)
|
|
|
|
def _get_bits_per_word(self):
|
|
return self._bits_per_word
|
|
|
|
def _set_bits_per_word(self, value):
|
|
if value < 1:
|
|
raise ValueError('bits_per_word must be positive')
|
|
self._bits_per_word = int(value)
|
|
|
|
def _get_select_high(self):
|
|
return self.active_high
|
|
|
|
def _set_select_high(self, value):
|
|
with self._bus.lock:
|
|
self.active_high = value
|
|
self.off()
|
|
|
|
|
|
class LocalPiHardwareSPIShared(SharedMixin, LocalPiHardwareSPI):
|
|
@classmethod
|
|
def _shared_key(cls, factory, port, device):
|
|
return (port, device)
|
|
|
|
|
|
class LocalPiSoftwareSPIShared(SharedMixin, LocalPiSoftwareSPI):
|
|
@classmethod
|
|
def _shared_key(cls, factory, clock_pin, mosi_pin, miso_pin, select_pin):
|
|
return (select_pin,)
|
|
|