mirror of
				https://github.com/KevinMidboe/python-gpiozero.git
				synced 2025-10-29 17:50:37 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			422 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			422 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from __future__ import (
 | 
						|
    unicode_literals,
 | 
						|
    print_function,
 | 
						|
    absolute_import,
 | 
						|
    division,
 | 
						|
    )
 | 
						|
str = type('')
 | 
						|
 | 
						|
 | 
						|
import warnings
 | 
						|
import operator
 | 
						|
from threading import RLock
 | 
						|
 | 
						|
try:
 | 
						|
    from spidev import SpiDev
 | 
						|
except ImportError:
 | 
						|
    SpiDev = None
 | 
						|
 | 
						|
from .devices import Device, SharedMixin, _PINS, _PINS_LOCK
 | 
						|
from .input_devices import InputDevice
 | 
						|
from .output_devices import OutputDevice
 | 
						|
from .exc import SPIBadArgs, SPISoftwareFallback, GPIOPinInUse, DeviceClosed
 | 
						|
 | 
						|
 | 
						|
class SPIHardwareInterface(Device):
 | 
						|
    def __init__(self, port, device):
 | 
						|
        self._device = None
 | 
						|
        super(SPIHardwareInterface, self).__init__()
 | 
						|
        # XXX How can we detect conflicts with existing GPIO instances? This
 | 
						|
        # isn't ideal ... in fact, it's downright crap and doesn't guard
 | 
						|
        # against conflicts created *after* this instance, but it's all I can
 | 
						|
        # come up with right now ...
 | 
						|
        conflicts = (11, 10, 9, (8, 7)[device])
 | 
						|
        with _PINS_LOCK:
 | 
						|
            for pin in _PINS:
 | 
						|
                if pin.number in conflicts:
 | 
						|
                    raise GPIOPinInUse(
 | 
						|
                        'pin %r is already in use by another gpiozero object' % pin
 | 
						|
                    )
 | 
						|
        self._device_num = device
 | 
						|
        self._device = SpiDev()
 | 
						|
        self._device.open(port, device)
 | 
						|
        self._device.max_speed_hz = 500000
 | 
						|
 | 
						|
    def close(self):
 | 
						|
        if self._device:
 | 
						|
            try:
 | 
						|
                self._device.close()
 | 
						|
            finally:
 | 
						|
                self._device = None
 | 
						|
        super(SPIHardwareInterface, self).close()
 | 
						|
 | 
						|
    @property
 | 
						|
    def closed(self):
 | 
						|
        return self._device is None
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        try:
 | 
						|
            self._check_open()
 | 
						|
            return (
 | 
						|
                "hardware SPI on clock_pin=11, mosi_pin=10, miso_pin=9, "
 | 
						|
                "select_pin=%d" % (
 | 
						|
                    8 if self._device_num == 0 else 7))
 | 
						|
        except DeviceClosed:
 | 
						|
            return "hardware SPI closed"
 | 
						|
 | 
						|
    def read(self, n):
 | 
						|
        return self.transfer((0,) * n)
 | 
						|
 | 
						|
    def write(self, data):
 | 
						|
        return len(self.transfer(data))
 | 
						|
 | 
						|
    def transfer(self, data):
 | 
						|
        """
 | 
						|
        Writes data (a list of integer words where each word is assumed to have
 | 
						|
        :attr:`bits_per_word` bits or less) to the SPI interface, and reads an
 | 
						|
        equivalent number of words, returning them as a list of integers.
 | 
						|
        """
 | 
						|
        return self._device.xfer2(data)
 | 
						|
 | 
						|
    def _get_clock_mode(self):
 | 
						|
        return self._device.mode
 | 
						|
 | 
						|
    def _set_clock_mode(self, value):
 | 
						|
        self._device.mode = value
 | 
						|
 | 
						|
    def _get_clock_polarity(self):
 | 
						|
        return bool(self.mode & 2)
 | 
						|
 | 
						|
    def _set_clock_polarity(self, value):
 | 
						|
        self.mode = self.mode & (~2) | (bool(value) << 1)
 | 
						|
 | 
						|
    def _get_clock_phase(self):
 | 
						|
        return bool(self.mode & 1)
 | 
						|
 | 
						|
    def _set_clock_phase(self, value):
 | 
						|
        self.mode = self.mode & (~1) | bool(value)
 | 
						|
 | 
						|
    def _get_lsb_first(self):
 | 
						|
        return self._device.lsbfirst
 | 
						|
 | 
						|
    def _set_lsb_first(self, value):
 | 
						|
        self._device.lsbfirst = bool(value)
 | 
						|
 | 
						|
    def _get_select_high(self):
 | 
						|
        return self._device.cshigh
 | 
						|
 | 
						|
    def _set_select_high(self, value):
 | 
						|
        self._device.cshigh = bool(value)
 | 
						|
 | 
						|
    def _get_bits_per_word(self):
 | 
						|
        return self._device.bits_per_word
 | 
						|
 | 
						|
    def _set_bits_per_word(self, value):
 | 
						|
        self._device.bits_per_word = value
 | 
						|
 | 
						|
    clock_polarity = property(_get_clock_polarity, _set_clock_polarity)
 | 
						|
    clock_phase = property(_get_clock_phase, _set_clock_phase)
 | 
						|
    clock_mode = property(_get_clock_mode, _set_clock_mode)
 | 
						|
    lsb_first = property(_get_lsb_first, _set_lsb_first)
 | 
						|
    select_high = property(_get_select_high, _set_select_high)
 | 
						|
    bits_per_word = property(_get_bits_per_word, _set_bits_per_word)
 | 
						|
 | 
						|
 | 
						|
class SPISoftwareBus(SharedMixin, Device):
 | 
						|
    def __init__(self, clock_pin, mosi_pin, miso_pin):
 | 
						|
        self.lock = None
 | 
						|
        self.clock = None
 | 
						|
        self.mosi = None
 | 
						|
        self.miso = None
 | 
						|
        super(SPISoftwareBus, self).__init__()
 | 
						|
        self.lock = RLock()
 | 
						|
        self.clock_phase = False
 | 
						|
        self.lsb_first = False
 | 
						|
        self.bits_per_word = 8
 | 
						|
        try:
 | 
						|
            self.clock = OutputDevice(clock_pin, active_high=True)
 | 
						|
            if mosi_pin is not None:
 | 
						|
                self.mosi = OutputDevice(mosi_pin)
 | 
						|
            if miso_pin is not None:
 | 
						|
                self.miso = InputDevice(miso_pin)
 | 
						|
        except:
 | 
						|
            self.close()
 | 
						|
            raise
 | 
						|
 | 
						|
    def close(self):
 | 
						|
        super(SPISoftwareBus, self).close()
 | 
						|
        if self.lock:
 | 
						|
            with self.lock:
 | 
						|
                if self.miso is not None:
 | 
						|
                    self.miso.close()
 | 
						|
                    self.miso = None
 | 
						|
                if self.mosi is not None:
 | 
						|
                    self.mosi.close()
 | 
						|
                    self.mosi = None
 | 
						|
                if self.clock is not None:
 | 
						|
                    self.clock.close()
 | 
						|
                    self.clock = None
 | 
						|
            self.lock = None
 | 
						|
 | 
						|
    @property
 | 
						|
    def closed(self):
 | 
						|
        return self.lock is None
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def _shared_key(self, clock_pin, mosi_pin, miso_pin):
 | 
						|
        return (clock_pin, mosi_pin, miso_pin)
 | 
						|
 | 
						|
    def read(self, n):
 | 
						|
        return self.transfer((0,) * n)
 | 
						|
 | 
						|
    def write(self, data):
 | 
						|
        return len(self.transfer(data))
 | 
						|
 | 
						|
    def transfer(self, data):
 | 
						|
        """
 | 
						|
        Writes data (a list of integer words where each word is assumed to have
 | 
						|
        :attr:`bits_per_word` bits or less) to the SPI interface, and reads an
 | 
						|
        equivalent number of words, returning them as a list of integers.
 | 
						|
        """
 | 
						|
        result = []
 | 
						|
        with self.lock:
 | 
						|
            shift = operator.lshift if self.lsb_first else operator.rshift
 | 
						|
            for write_word in data:
 | 
						|
                mask = 1 if self.lsb_first else 1 << (self.bits_per_word - 1)
 | 
						|
                read_word = 0
 | 
						|
                for _ in range(self.bits_per_word):
 | 
						|
                    if self.mosi is not None:
 | 
						|
                        self.mosi.value = bool(write_word & mask)
 | 
						|
                    self.clock.on()
 | 
						|
                    if self.miso is not None and not self.clock_phase:
 | 
						|
                        if self.miso.value:
 | 
						|
                            read_word |= mask
 | 
						|
                    self.clock.off()
 | 
						|
                    if self.miso is not None and self.clock_phase:
 | 
						|
                        if self.miso.value:
 | 
						|
                            read_word |= mask
 | 
						|
                    mask = shift(mask, 1)
 | 
						|
                result.append(read_word)
 | 
						|
        return result
 | 
						|
 | 
						|
 | 
						|
class SPISoftwareInterface(OutputDevice):
 | 
						|
    def __init__(self, clock_pin, mosi_pin, miso_pin, select_pin):
 | 
						|
        self._bus = None
 | 
						|
        super(SPISoftwareInterface, self).__init__(select_pin, active_high=False)
 | 
						|
        try:
 | 
						|
            self._bus = SPISoftwareBus(clock_pin, mosi_pin, miso_pin)
 | 
						|
        except:
 | 
						|
            self.close()
 | 
						|
            raise
 | 
						|
 | 
						|
    def close(self):
 | 
						|
        if self._bus:
 | 
						|
            self._bus.close()
 | 
						|
            self._bus = None
 | 
						|
        super(SPISoftwareInterface, self).close()
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        try:
 | 
						|
            self._check_open()
 | 
						|
            return (
 | 
						|
                "software SPI on clock_pin=%d, mosi_pin=%d, miso_pin=%d, "
 | 
						|
                "select_pin=%d" % (
 | 
						|
                    self._bus.clock.pin.number,
 | 
						|
                    self._bus.mosi.pin.number,
 | 
						|
                    self._bus.miso.pin.number,
 | 
						|
                    self.pin.number))
 | 
						|
        except DeviceClosed:
 | 
						|
            return "software SPI closed"
 | 
						|
 | 
						|
    def read(self, n):
 | 
						|
        return self._bus.read(n)
 | 
						|
 | 
						|
    def write(self, data):
 | 
						|
        return self._bus.write(data)
 | 
						|
 | 
						|
    def transfer(self, data):
 | 
						|
        with self._bus.lock:
 | 
						|
            self.on()
 | 
						|
            try:
 | 
						|
                return self._bus.transfer(data)
 | 
						|
            finally:
 | 
						|
                self.off()
 | 
						|
 | 
						|
    def _get_clock_mode(self):
 | 
						|
        return (self.clock_polarity << 1) | self.clock_phase
 | 
						|
 | 
						|
    def _set_clock_mode(self, value):
 | 
						|
        value = int(value)
 | 
						|
        if not 0 <= value <= 3:
 | 
						|
            raise ValueError('clock_mode must be a value between 0 and 3 inclusive')
 | 
						|
        with self._bus.lock:
 | 
						|
            self._bus.clock.active_high = not (value & 2)
 | 
						|
            self._bus.clock.off()
 | 
						|
            self._bus.clock_phase = bool(value & 1)
 | 
						|
 | 
						|
    def _get_clock_polarity(self):
 | 
						|
        return not self._bus.clock.active_high
 | 
						|
 | 
						|
    def _set_clock_polarity(self, value):
 | 
						|
        with self._bus.lock:
 | 
						|
            self._bus.clock.active_high = not value
 | 
						|
 | 
						|
    def _get_clock_phase(self):
 | 
						|
        return self._bus.clock_phase
 | 
						|
 | 
						|
    def _set_clock_phase(self, value):
 | 
						|
        with self._bus.lock:
 | 
						|
            self._bus.clock_phase = bool(value)
 | 
						|
 | 
						|
    def _get_lsb_first(self):
 | 
						|
        return self._bus.lsb_first
 | 
						|
 | 
						|
    def _set_lsb_first(self, value):
 | 
						|
        with self._bus.lock:
 | 
						|
            self._bus.lsb_first = bool(value)
 | 
						|
 | 
						|
    def _get_bits_per_word(self):
 | 
						|
        return self._bus.bits_per_word
 | 
						|
 | 
						|
    def _set_bits_per_word(self, value):
 | 
						|
        if value < 1:
 | 
						|
            raise ValueError('bits_per_word must be positive')
 | 
						|
        with self._bus.lock:
 | 
						|
            self._bus.bits_per_word = int(value)
 | 
						|
 | 
						|
    def _get_select_high(self):
 | 
						|
        return self.active_high
 | 
						|
 | 
						|
    def _set_select_high(self, value):
 | 
						|
        with self._bus.lock:
 | 
						|
            self.active_high = value
 | 
						|
            self.off()
 | 
						|
 | 
						|
    clock_polarity = property(_get_clock_polarity, _set_clock_polarity)
 | 
						|
    clock_phase = property(_get_clock_phase, _set_clock_phase)
 | 
						|
    clock_mode = property(_get_clock_mode, _set_clock_mode)
 | 
						|
    lsb_first = property(_get_lsb_first, _set_lsb_first)
 | 
						|
    bits_per_word = property(_get_bits_per_word, _set_bits_per_word)
 | 
						|
    select_high = property(_get_select_high, _set_select_high)
 | 
						|
 | 
						|
 | 
						|
class SharedSPIHardwareInterface(SharedMixin, SPIHardwareInterface):
 | 
						|
    @classmethod
 | 
						|
    def _shared_key(cls, port, device):
 | 
						|
        return (port, device)
 | 
						|
 | 
						|
 | 
						|
class SharedSPISoftwareInterface(SharedMixin, SPISoftwareInterface):
 | 
						|
    @classmethod
 | 
						|
    def _shared_key(cls, clock_pin, mosi_pin, miso_pin, select_pin):
 | 
						|
        return (clock_pin, mosi_pin, miso_pin, select_pin)
 | 
						|
 | 
						|
 | 
						|
def extract_spi_args(**kwargs):
 | 
						|
    """
 | 
						|
    Given a set of keyword arguments, splits it into those relevant to SPI
 | 
						|
    implementations and all the rest. SPI arguments are augmented with defaults
 | 
						|
    and converted into the pin format (from the port/device format) if
 | 
						|
    necessary.
 | 
						|
 | 
						|
    Returns a tuple of ``(spi_args, other_args)``.
 | 
						|
    """
 | 
						|
    pin_defaults = {
 | 
						|
        'clock_pin': 11,
 | 
						|
        'mosi_pin': 10,
 | 
						|
        'miso_pin': 9,
 | 
						|
        'select_pin': 8,
 | 
						|
        }
 | 
						|
    dev_defaults = {
 | 
						|
        'port': 0,
 | 
						|
        'device': 0,
 | 
						|
        }
 | 
						|
    spi_args = {
 | 
						|
        key: value for (key, value) in kwargs.items()
 | 
						|
        if key in pin_defaults or key in dev_defaults
 | 
						|
        }
 | 
						|
    kwargs = {
 | 
						|
        key: value for (key, value) in kwargs.items()
 | 
						|
        if key not in spi_args
 | 
						|
        }
 | 
						|
    if not spi_args:
 | 
						|
        spi_args = pin_defaults
 | 
						|
    elif set(spi_args) <= set(pin_defaults):
 | 
						|
        spi_args = {
 | 
						|
            key: spi_args.get(key, default)
 | 
						|
            for key, default in pin_defaults.items()
 | 
						|
            }
 | 
						|
    elif set(spi_args) <= set(dev_defaults):
 | 
						|
        spi_args = {
 | 
						|
            key: spi_args.get(key, default)
 | 
						|
            for key, default in dev_defaults.items()
 | 
						|
            }
 | 
						|
        if spi_args['port'] != 0:
 | 
						|
            raise SPIBadArgs('port 0 is the only valid SPI port')
 | 
						|
        if spi_args['device'] not in (0, 1):
 | 
						|
            raise SPIBadArgs('device must be 0 or 1')
 | 
						|
        spi_args = {
 | 
						|
            key: value if key != 'select_pin' else (8, 7)[spi_args['device']]
 | 
						|
            for key, value in pin_defaults.items()
 | 
						|
            }
 | 
						|
    else:
 | 
						|
        raise SPIBadArgs(
 | 
						|
            'you must either specify port and device, or clock_pin, mosi_pin, '
 | 
						|
            'miso_pin, and select_pin; combinations of the two schemes (e.g. '
 | 
						|
            'port and clock_pin) are not permitted')
 | 
						|
    return spi_args, kwargs
 | 
						|
 | 
						|
 | 
						|
def SPI(**spi_args):
 | 
						|
    """
 | 
						|
    Returns an SPI interface, for the specified SPI *port* and *device*, or for
 | 
						|
    the specified pins (*clock_pin*, *mosi_pin*, *miso_pin*, and *select_pin*).
 | 
						|
    Only one of the schemes can be used; attempting to mix *port* and *device*
 | 
						|
    with pin numbers will raise :exc:`SPIBadArgs`.
 | 
						|
 | 
						|
    If the pins specified match the hardware SPI pins (clock on GPIO11, MOSI on
 | 
						|
    GPIO10, MISO on GPIO9, and chip select on GPIO8 or GPIO7), and the spidev
 | 
						|
    module can be imported, a :class:`SPIHardwareInterface` instance will be
 | 
						|
    returned. Otherwise, a :class:`SPISoftwareInterface` will be returned which
 | 
						|
    will use simple bit-banging to communicate.
 | 
						|
 | 
						|
    Both interfaces have the same API, support clock polarity and phase
 | 
						|
    attributes, and can handle half and full duplex communications, but the
 | 
						|
    hardware interface is significantly faster (though for many things this
 | 
						|
    doesn't matter).
 | 
						|
 | 
						|
    Finally, the *shared* keyword argument specifies whether the resulting
 | 
						|
    SPI interface can be repeatedly created and used by multiple devices
 | 
						|
    (useful with multi-channel devices like numerous ADCs).
 | 
						|
    """
 | 
						|
    spi_args, kwargs = extract_spi_args(**spi_args)
 | 
						|
    shared = kwargs.pop('shared', False)
 | 
						|
    if kwargs:
 | 
						|
        raise SPIBadArgs(
 | 
						|
            'unrecognized keyword argument %s' % kwargs.popitem()[0])
 | 
						|
    if all((
 | 
						|
            SpiDev is not None,
 | 
						|
            spi_args['clock_pin'] == 11,
 | 
						|
            spi_args['mosi_pin'] == 10,
 | 
						|
            spi_args['miso_pin'] == 9,
 | 
						|
            spi_args['select_pin'] in (7, 8),
 | 
						|
            )):
 | 
						|
        try:
 | 
						|
            if shared:
 | 
						|
                return SharedSPIHardwareInterface(
 | 
						|
                        port=0, device={8: 0, 7: 1}[spi_args['select_pin']])
 | 
						|
            else:
 | 
						|
                return SPIHardwareInterface(
 | 
						|
                        port=0, device={8: 0, 7: 1}[spi_args['select_pin']])
 | 
						|
        except Exception as e:
 | 
						|
            warnings.warn(
 | 
						|
                SPISoftwareFallback(
 | 
						|
                    'failed to initialize hardware SPI, falling back to '
 | 
						|
                    'software (error was: %s)' % str(e)))
 | 
						|
    if shared:
 | 
						|
        return SharedSPISoftwareInterface(**spi_args)
 | 
						|
    else:
 | 
						|
        return SPISoftwareInterface(**spi_args)
 | 
						|
 |