mirror of
				https://github.com/KevinMidboe/python-gpiozero.git
				synced 2025-10-29 17:50:37 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			306 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			306 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from __future__ import (
 | |
|     unicode_literals,
 | |
|     absolute_import,
 | |
|     print_function,
 | |
|     division,
 | |
|     )
 | |
| str = type('')
 | |
| 
 | |
| import io
 | |
| from threading import RLock, Lock
 | |
| from types import MethodType
 | |
| from collections import defaultdict
 | |
| try:
 | |
|     from weakref import ref, WeakMethod
 | |
| except ImportError:
 | |
| 
 | |
|     from ..compat import WeakMethod
 | |
| import warnings
 | |
| 
 | |
| try:
 | |
|     from spidev import SpiDev
 | |
| except ImportError:
 | |
|     SpiDev = None
 | |
| 
 | |
| from . import Factory, Pin
 | |
| from .data import pi_info
 | |
| from ..exc import (
 | |
|     PinNoPins,
 | |
|     PinNonPhysical,
 | |
|     PinInvalidPin,
 | |
|     SPIBadArgs,
 | |
|     SPISoftwareFallback,
 | |
|     )
 | |
| 
 | |
| 
 | |
| SPI_HARDWARE_PINS = {
 | |
|     0: {
 | |
|         'clock':  11,
 | |
|         'mosi':   10,
 | |
|         'miso':   9,
 | |
|         'select': (8, 7),
 | |
|     },
 | |
| }
 | |
| 
 | |
| 
 | |
| class PiFactory(Factory):
 | |
|     """
 | |
|     Abstract base class representing hardware attached to a Raspberry Pi. This
 | |
|     forms the base of :class:`~gpiozero.pins.local.LocalPiFactory`.
 | |
|     """
 | |
|     def __init__(self):
 | |
|         super(PiFactory, self).__init__()
 | |
|         self._info = None
 | |
|         self.pins = {}
 | |
|         self.pin_class = None
 | |
|         self.spi_classes = {
 | |
|             ('hardware', 'exclusive'): None,
 | |
|             ('hardware', 'shared'):    None,
 | |
|             ('software', 'exclusive'): None,
 | |
|             ('software', 'shared'):    None,
 | |
|             }
 | |
| 
 | |
|     def close(self):
 | |
|         for pin in self.pins.values():
 | |
|             pin.close()
 | |
|         self.pins.clear()
 | |
| 
 | |
|     def pin(self, spec):
 | |
|         n = self._to_gpio(spec)
 | |
|         try:
 | |
|             pin = self.pins[n]
 | |
|         except KeyError:
 | |
|             pin = self.pin_class(self, n)
 | |
|             self.pins[n] = pin
 | |
|         return pin
 | |
| 
 | |
|     def _to_gpio(self, spec):
 | |
|         """
 | |
|         Converts the pin *spec* to a GPIO port number.
 | |
|         """
 | |
|         if not 0 <= spec < 54:
 | |
|             raise PinInvalidPin('invalid GPIO port %d specified (range 0..53) ' % spec)
 | |
|         return spec
 | |
| 
 | |
|     def _get_revision(self):
 | |
|         raise NotImplementedError
 | |
| 
 | |
|     def _get_pi_info(self):
 | |
|         if self._info is None:
 | |
|             self._info = pi_info(self._get_revision())
 | |
|         return self._info
 | |
| 
 | |
|     def spi(self, **spi_args):
 | |
|         """
 | |
|         Returns an SPI interface, for the specified SPI *port* and *device*, or
 | |
|         for the specified pins (*clock_pin*, *mosi_pin*, *miso_pin*, and
 | |
|         *select_pin*).  Only one of the schemes can be used; attempting to mix
 | |
|         *port* and *device* with pin numbers will raise :exc:`SPIBadArgs`.
 | |
| 
 | |
|         If the pins specified match the hardware SPI pins (clock on GPIO11,
 | |
|         MOSI on GPIO10, MISO on GPIO9, and chip select on GPIO8 or GPIO7), and
 | |
|         the spidev module can be imported, a :class:`SPIHardwareInterface`
 | |
|         instance will be returned. Otherwise, a :class:`SPISoftwareInterface`
 | |
|         will be returned which will use simple bit-banging to communicate.
 | |
| 
 | |
|         Both interfaces have the same API, support clock polarity and phase
 | |
|         attributes, and can handle half and full duplex communications, but the
 | |
|         hardware interface is significantly faster (though for many things this
 | |
|         doesn't matter).
 | |
|         """
 | |
|         spi_args, kwargs = self._extract_spi_args(**spi_args)
 | |
|         shared = 'shared' if kwargs.pop('shared', False) else 'exclusive'
 | |
|         if kwargs:
 | |
|             raise SPIBadArgs(
 | |
|                 'unrecognized keyword argument %s' % kwargs.popitem()[0])
 | |
|         for port, pins in SPI_HARDWARE_PINS.items():
 | |
|             if all((
 | |
|                     spi_args['clock_pin']  == pins['clock'],
 | |
|                     spi_args['mosi_pin']   == pins['mosi'],
 | |
|                     spi_args['miso_pin']   == pins['miso'],
 | |
|                     spi_args['select_pin'] in pins['select'],
 | |
|                     )):
 | |
|                 try:
 | |
|                     return self.spi_classes[('hardware', shared)](
 | |
|                         self, port=port,
 | |
|                         device=pins['select'].index(spi_args['select_pin'])
 | |
|                         )
 | |
|                 except Exception as e:
 | |
|                     warnings.warn(
 | |
|                         SPISoftwareFallback(
 | |
|                             'failed to initialize hardware SPI, falling back to '
 | |
|                             'software (error was: %s)' % str(e)))
 | |
|                     break
 | |
|         # Convert all pin arguments to integer GPIO numbers. This is necessary
 | |
|         # to ensure the shared-key for shared implementations get matched
 | |
|         # correctly, and is a bit of a hack for the pigpio bit-bang
 | |
|         # implementation which just wants the pin numbers too.
 | |
|         spi_args = {
 | |
|             key: pin.number if isinstance(pin, Pin) else pin
 | |
|             for key, pin in spi_args.items()
 | |
|             }
 | |
|         return self.spi_classes[('software', shared)](self, **spi_args)
 | |
| 
 | |
|     def _extract_spi_args(self, **kwargs):
 | |
|         """
 | |
|         Given a set of keyword arguments, splits it into those relevant to SPI
 | |
|         implementations and all the rest. SPI arguments are augmented with
 | |
|         defaults and converted into the pin format (from the port/device
 | |
|         format) if necessary.
 | |
| 
 | |
|         Returns a tuple of ``(spi_args, other_args)``.
 | |
|         """
 | |
|         dev_defaults = {
 | |
|             'port': 0,
 | |
|             'device': 0,
 | |
|             }
 | |
|         default_hw = SPI_HARDWARE_PINS[dev_defaults['port']]
 | |
|         pin_defaults = {
 | |
|             'clock_pin':  default_hw['clock'],
 | |
|             'mosi_pin':   default_hw['mosi'],
 | |
|             'miso_pin':   default_hw['miso'],
 | |
|             'select_pin': default_hw['select'][dev_defaults['device']],
 | |
|             }
 | |
|         spi_args = {
 | |
|             key: value for (key, value) in kwargs.items()
 | |
|             if key in pin_defaults or key in dev_defaults
 | |
|             }
 | |
|         kwargs = {
 | |
|             key: value for (key, value) in kwargs.items()
 | |
|             if key not in spi_args
 | |
|             }
 | |
|         if not spi_args:
 | |
|             spi_args = pin_defaults
 | |
|         elif set(spi_args) <= set(pin_defaults):
 | |
|             spi_args = {
 | |
|                 key: self._to_gpio(spi_args.get(key, default))
 | |
|                 for key, default in pin_defaults.items()
 | |
|                 }
 | |
|         elif set(spi_args) <= set(dev_defaults):
 | |
|             spi_args = {
 | |
|                 key: spi_args.get(key, default)
 | |
|                 for key, default in dev_defaults.items()
 | |
|                 }
 | |
|             if spi_args['port'] != 0:
 | |
|                 raise SPIBadArgs('port 0 is the only valid SPI port')
 | |
|             selected_hw = SPI_HARDWARE_PINS[spi_args['port']]
 | |
|             try:
 | |
|                 selected_hw['select'][spi_args['device']]
 | |
|             except IndexError:
 | |
|                 raise SPIBadArgs(
 | |
|                     'device must be in the range 0..%d' %
 | |
|                     len(selected_hw['select']))
 | |
|             spi_args = {
 | |
|                 key: value if key != 'select_pin' else selected_hw['select'][spi_args['device']]
 | |
|                 for key, value in pin_defaults.items()
 | |
|                 }
 | |
|         else:
 | |
|             raise SPIBadArgs(
 | |
|                 'you must either specify port and device, or clock_pin, '
 | |
|                 'mosi_pin, miso_pin, and select_pin; combinations of the two '
 | |
|                 'schemes (e.g. port and clock_pin) are not permitted')
 | |
|         return spi_args, kwargs
 | |
| 
 | |
| 
 | |
| class PiPin(Pin):
 | |
|     """
 | |
|     Abstract base class representing a multi-function GPIO pin attached to a
 | |
|     Raspberry Pi. This overrides several methods in the abstract base
 | |
|     :class:`~gpiozero.Pin`. Descendents must override the following methods:
 | |
| 
 | |
|     * :meth:`_get_function`
 | |
|     * :meth:`_set_function`
 | |
|     * :meth:`_get_state`
 | |
|     * :meth:`_call_when_changed`
 | |
|     * :meth:`_enable_event_detect`
 | |
|     * :meth:`_disable_event_detect`
 | |
| 
 | |
|     Descendents *may* additionally override the following methods, if
 | |
|     applicable:
 | |
| 
 | |
|     * :meth:`close`
 | |
|     * :meth:`output_with_state`
 | |
|     * :meth:`input_with_pull`
 | |
|     * :meth:`_set_state`
 | |
|     * :meth:`_get_frequency`
 | |
|     * :meth:`_set_frequency`
 | |
|     * :meth:`_get_pull`
 | |
|     * :meth:`_set_pull`
 | |
|     * :meth:`_get_bounce`
 | |
|     * :meth:`_set_bounce`
 | |
|     * :meth:`_get_edges`
 | |
|     * :meth:`_set_edges`
 | |
|     """
 | |
|     def __init__(self, factory, number):
 | |
|         super(PiPin, self).__init__()
 | |
|         self._factory = factory
 | |
|         self._when_changed_lock = RLock()
 | |
|         self._when_changed = None
 | |
|         self._number = number
 | |
|         try:
 | |
|             factory.pi_info.physical_pin(repr(self))
 | |
|         except PinNoPins:
 | |
|             warnings.warn(
 | |
|                 PinNonPhysical(
 | |
|                     'no physical pins exist for %s' % repr(self)))
 | |
| 
 | |
|     @property
 | |
|     def number(self):
 | |
|         return self._number
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return 'GPIO%d' % self._number
 | |
| 
 | |
|     @property
 | |
|     def factory(self):
 | |
|         return self._factory
 | |
| 
 | |
|     def _call_when_changed(self):
 | |
|         """
 | |
|         Called to fire the :attr:`when_changed` event handler; override this
 | |
|         in descendents if additional (currently redundant) parameters need
 | |
|         to be passed.
 | |
|         """
 | |
|         method = self.when_changed()
 | |
|         if method is None:
 | |
|             self.when_changed = None
 | |
|         else:
 | |
|             method()
 | |
| 
 | |
|     def _get_when_changed(self):
 | |
|         return self._when_changed
 | |
| 
 | |
|     def _set_when_changed(self, value):
 | |
|         with self._when_changed_lock:
 | |
|             if value is None:
 | |
|                 if self._when_changed is not None:
 | |
|                     self._disable_event_detect()
 | |
|                 self._when_changed = None
 | |
|             else:
 | |
|                 enabled = self._when_changed is not None
 | |
|                 # Have to take care, if value is either a closure or a bound
 | |
|                 # method, not to keep a strong reference to the containing
 | |
|                 # object
 | |
|                 if isinstance(value, MethodType):
 | |
|                     self._when_changed = WeakMethod(value)
 | |
|                 else:
 | |
|                     self._when_changed = ref(value)
 | |
|                 if not enabled:
 | |
|                     self._enable_event_detect()
 | |
| 
 | |
|     def _enable_event_detect(self):
 | |
|         """
 | |
|         Enables event detection. This is called to activate event detection on
 | |
|         pin :attr:`number`, watching for the specified :attr:`edges`. In
 | |
|         response, :meth:`_call_when_changed` should be executed.
 | |
|         """
 | |
|         raise NotImplementedError
 | |
| 
 | |
|     def _disable_event_detect(self):
 | |
|         """
 | |
|         Disables event detection. This is called to deactivate event detection
 | |
|         on pin :attr:`number`.
 | |
|         """
 | |
|         raise NotImplementedError
 | |
| 
 |