mirror of
				https://github.com/KevinMidboe/python-gpiozero.git
				synced 2025-10-29 17:50:37 +00:00 
			
		
		
		
	This prevents it being modified post-construction (just like the way CompositeDevice._all and CompositeDevice._order are already 'frozen' by being tuples)
		
			
				
	
	
		
			431 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			431 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from __future__ import (
 | |
|     unicode_literals,
 | |
|     print_function,
 | |
|     absolute_import,
 | |
|     division,
 | |
|     )
 | |
| nstr = str
 | |
| str = type('')
 | |
| 
 | |
| import atexit
 | |
| import weakref
 | |
| from collections import namedtuple
 | |
| from itertools import chain
 | |
| from types import FunctionType
 | |
| from threading import RLock
 | |
| 
 | |
| from .threads import _threads_shutdown
 | |
| from .mixins import (
 | |
|     ValuesMixin,
 | |
|     SharedMixin,
 | |
|     )
 | |
| from .exc import (
 | |
|     DeviceClosed,
 | |
|     CompositeDeviceBadName,
 | |
|     CompositeDeviceBadOrder,
 | |
|     CompositeDeviceBadDevice,
 | |
|     GPIOPinMissing,
 | |
|     GPIOPinInUse,
 | |
|     GPIODeviceClosed,
 | |
|     )
 | |
| from .compat import frozendict
 | |
| 
 | |
| # Get a pin implementation to use as the default; we prefer RPi.GPIO's here
 | |
| # as it supports PWM, and all Pi revisions. If no third-party libraries are
 | |
| # available, however, we fall back to a pure Python implementation which
 | |
| # supports platforms like PyPy
 | |
| from .pins import _pins_shutdown
 | |
| try:
 | |
|     from .pins.rpigpio import RPiGPIOPin
 | |
|     pin_factory = RPiGPIOPin
 | |
| except ImportError:
 | |
|     try:
 | |
|         from .pins.rpio import RPIOPin
 | |
|         pin_factory = RPIOPin
 | |
|     except ImportError:
 | |
|         try:
 | |
|             from .pins.pigipod import PiGPIOPin
 | |
|             pin_factory = PiGPIOPin
 | |
|         except ImportError:
 | |
|             from .pins.native import NativePin
 | |
|             pin_factory = NativePin
 | |
| 
 | |
| 
 | |
| _PINS = set()
 | |
| _PINS_LOCK = RLock() # Yes, this needs to be re-entrant
 | |
| 
 | |
| def _shutdown():
 | |
|     _threads_shutdown()
 | |
|     with _PINS_LOCK:
 | |
|         while _PINS:
 | |
|             _PINS.pop().close()
 | |
|     # Any cleanup routines registered by pins libraries must be called *after*
 | |
|     # cleanup of pin objects used by devices
 | |
|     _pins_shutdown()
 | |
| 
 | |
| atexit.register(_shutdown)
 | |
| 
 | |
| 
 | |
| class GPIOMeta(type):
 | |
|     # NOTE Yes, this is a metaclass. Don't be scared - it's a simple one.
 | |
| 
 | |
|     def __new__(mcls, name, bases, cls_dict):
 | |
|         # Construct the class as normal
 | |
|         cls = super(GPIOMeta, mcls).__new__(mcls, name, bases, cls_dict)
 | |
|         # If there's a method in the class which has no docstring, search
 | |
|         # the base classes recursively for a docstring to copy
 | |
|         for attr_name, attr in cls_dict.items():
 | |
|             if isinstance(attr, FunctionType) and not attr.__doc__:
 | |
|                 for base_cls in cls.__mro__:
 | |
|                     if hasattr(base_cls, attr_name):
 | |
|                         base_fn = getattr(base_cls, attr_name)
 | |
|                         if base_fn.__doc__:
 | |
|                             attr.__doc__ = base_fn.__doc__
 | |
|                             break
 | |
|         return cls
 | |
| 
 | |
|     def __call__(cls, *args, **kwargs):
 | |
|         # Make sure cls has GPIOBase somewhere in its ancestry (otherwise
 | |
|         # setting __attrs__ below will be rather pointless)
 | |
|         assert issubclass(cls, GPIOBase)
 | |
|         if issubclass(cls, SharedMixin):
 | |
|             # If SharedMixin appears in the class' ancestry, convert the
 | |
|             # constructor arguments to a key and check whether an instance
 | |
|             # already exists. Only construct the instance if the key's new.
 | |
|             key = cls._shared_key(*args, **kwargs)
 | |
|             try:
 | |
|                 self = cls._INSTANCES[key]
 | |
|                 self._refs += 1
 | |
|             except (KeyError, ReferenceError) as e:
 | |
|                 self = super(GPIOMeta, cls).__call__(*args, **kwargs)
 | |
|                 self._refs = 1
 | |
|                 # Replace the close method with one that merely decrements
 | |
|                 # the refs counter and calls the original close method when
 | |
|                 # it reaches zero
 | |
|                 old_close = self.close
 | |
|                 def close():
 | |
|                     self._refs = max(0, self._refs - 1)
 | |
|                     if not self._refs:
 | |
|                         try:
 | |
|                             old_close()
 | |
|                         finally:
 | |
|                             try:
 | |
|                                 del cls._INSTANCES[key]
 | |
|                             except KeyError:
 | |
|                                 # If the _refs go negative (too many closes)
 | |
|                                 # just ignore the resulting KeyError here -
 | |
|                                 # it's already gone
 | |
|                                 pass
 | |
|                 self.close = close
 | |
|                 cls._INSTANCES[key] = weakref.proxy(self)
 | |
|         else:
 | |
|             # Construct the instance as normal
 | |
|             self = super(GPIOMeta, cls).__call__(*args, **kwargs)
 | |
|         # At this point __new__ and __init__ have all been run. We now fix the
 | |
|         # set of attributes on the class by dir'ing the instance and creating a
 | |
|         # frozenset of the result called __attrs__ (which is queried by
 | |
|         # GPIOBase.__setattr__). An exception is made for SharedMixin devices
 | |
|         # which can be constructed multiple times, returning the same instance
 | |
|         if not issubclass(cls, SharedMixin) or self._refs == 1:
 | |
|             self.__attrs__ = frozenset(dir(self))
 | |
|         return self
 | |
| 
 | |
| 
 | |
| # Cross-version compatible method of using a metaclass
 | |
| class GPIOBase(GPIOMeta(nstr('GPIOBase'), (), {})):
 | |
|     def __setattr__(self, name, value):
 | |
|         # This overridden __setattr__ simply ensures that additional attributes
 | |
|         # cannot be set on the class after construction (it manages this in
 | |
|         # conjunction with the meta-class above). Traditionally, this is
 | |
|         # managed with __slots__; however, this doesn't work with Python's
 | |
|         # multiple inheritance system which we need to use in order to avoid
 | |
|         # repeating the "source" and "values" property code in myriad places
 | |
|         if hasattr(self, '__attrs__') and name not in self.__attrs__:
 | |
|             raise AttributeError(
 | |
|                 "'%s' object has no attribute '%s'" % (
 | |
|                 self.__class__.__name__, name))
 | |
|         return super(GPIOBase, self).__setattr__(name, value)
 | |
| 
 | |
|     def __del__(self):
 | |
|         self.close()
 | |
| 
 | |
|     def close(self):
 | |
|         """
 | |
|         Shut down the device and release all associated resources. This method
 | |
|         can be called on an already closed device without raising an exception.
 | |
| 
 | |
|         This method is primarily intended for interactive use at the command
 | |
|         line. It disables the device and releases its pin(s) for use by another
 | |
|         device.
 | |
| 
 | |
|         You can attempt to do this simply by deleting an object, but unless
 | |
|         you've cleaned up all references to the object this may not work (even
 | |
|         if you've cleaned up all references, there's still no guarantee the
 | |
|         garbage collector will actually delete the object at that point).  By
 | |
|         contrast, the close method provides a means of ensuring that the object
 | |
|         is shut down.
 | |
| 
 | |
|         For example, if you have a breadboard with a buzzer connected to pin
 | |
|         16, but then wish to attach an LED instead:
 | |
| 
 | |
|             >>> from gpiozero import *
 | |
|             >>> bz = Buzzer(16)
 | |
|             >>> bz.on()
 | |
|             >>> bz.off()
 | |
|             >>> bz.close()
 | |
|             >>> led = LED(16)
 | |
|             >>> led.blink()
 | |
| 
 | |
|         :class:`Device` descendents can also be used as context managers using
 | |
|         the :keyword:`with` statement. For example:
 | |
| 
 | |
|             >>> from gpiozero import *
 | |
|             >>> with Buzzer(16) as bz:
 | |
|             ...     bz.on()
 | |
|             ...
 | |
|             >>> with LED(16) as led:
 | |
|             ...     led.on()
 | |
|             ...
 | |
|         """
 | |
|         # This is a placeholder which is simply here to ensure close() can be
 | |
|         # safely called from subclasses without worrying whether super-class'
 | |
|         # have it (which in turn is useful in conjunction with the SourceMixin
 | |
|         # class).
 | |
|         pass
 | |
| 
 | |
|     @property
 | |
|     def closed(self):
 | |
|         """
 | |
|         Returns ``True`` if the device is closed (see the :meth:`close`
 | |
|         method). Once a device is closed you can no longer use any other
 | |
|         methods or properties to control or query the device.
 | |
|         """
 | |
|         raise NotImplementedError
 | |
| 
 | |
|     def _check_open(self):
 | |
|         if self.closed:
 | |
|             raise DeviceClosed(
 | |
|                 '%s is closed or uninitialized' % self.__class__.__name__)
 | |
| 
 | |
|     def __enter__(self):
 | |
|         return self
 | |
| 
 | |
|     def __exit__(self, exc_type, exc_value, exc_tb):
 | |
|         self.close()
 | |
| 
 | |
| 
 | |
| class Device(ValuesMixin, GPIOBase):
 | |
|     """
 | |
|     Represents a single device of any type; GPIO-based, SPI-based, I2C-based,
 | |
|     etc. This is the base class of the device hierarchy. It defines the
 | |
|     basic services applicable to all devices (specifically thhe :attr:`is_active`
 | |
|     property, the :attr:`value` property, and the :meth:`close` method).
 | |
|     """
 | |
|     def __repr__(self):
 | |
|         return "<gpiozero.%s object>" % (self.__class__.__name__)
 | |
| 
 | |
|     @property
 | |
|     def value(self):
 | |
|         """
 | |
|         Returns a value representing the device's state. Frequently, this is a
 | |
|         boolean value, or a number between 0 and 1 but some devices use larger
 | |
|         ranges (e.g. -1 to +1) and composite devices usually use tuples to
 | |
|         return the states of all their subordinate components.
 | |
|         """
 | |
|         raise NotImplementedError
 | |
| 
 | |
|     @property
 | |
|     def is_active(self):
 | |
|         """
 | |
|         Returns ``True`` if the device is currently active and ``False``
 | |
|         otherwise. This property is usually derived from :attr:`value`. Unlike
 | |
|         :attr:`value`, this is *always* a boolean.
 | |
|         """
 | |
|         return bool(self.value)
 | |
| 
 | |
| 
 | |
| class CompositeDevice(Device):
 | |
|     """
 | |
|     Extends :class:`Device`. Represents a device composed of multiple devices
 | |
|     like simple HATs, H-bridge motor controllers, robots composed of multiple
 | |
|     motors, etc.
 | |
| 
 | |
|     The constructor accepts subordinate devices as positional or keyword
 | |
|     arguments.  Positional arguments form unnamed devices accessed via the
 | |
|     :attr:`all` attribute, while keyword arguments are added to the device
 | |
|     as named (read-only) attributes.
 | |
| 
 | |
|     :param list _order:
 | |
|         If specified, this is the order of named items specified by keyword
 | |
|         arguments (to ensure that the :attr:`value` tuple is constructed with a
 | |
|         specific order). All keyword arguments *must* be included in the
 | |
|         collection. If omitted, an alphabetically sorted order will be selected
 | |
|         for keyword arguments.
 | |
|     """
 | |
|     def __init__(self, *args, **kwargs):
 | |
|         self._all = ()
 | |
|         self._named = frozendict({})
 | |
|         self._namedtuple = None
 | |
|         self._order = kwargs.pop('_order', None)
 | |
|         if self._order is None:
 | |
|             self._order = sorted(kwargs.keys())
 | |
|         self._order = tuple(self._order)
 | |
|         for missing_name in set(kwargs.keys()) - set(self._order):
 | |
|             raise CompositeDeviceBadOrder('%s missing from _order' % missing_name)
 | |
|         super(CompositeDevice, self).__init__()
 | |
|         for name in set(self._order) & set(dir(self)):
 | |
|             raise CompositeDeviceBadName('%s is a reserved name' % name)
 | |
|         self._all = args + tuple(kwargs[v] for v in self._order)
 | |
|         for dev in self._all:
 | |
|             if not isinstance(dev, Device):
 | |
|                 raise CompositeDeviceBadDevice("%s doesn't inherit from Device" % dev)
 | |
|         self._named = frozendict(kwargs)
 | |
|         self._namedtuple = namedtuple('%sValue' % self.__class__.__name__, chain(
 | |
|             (str(i) for i in range(len(args))), self._order),
 | |
|             rename=True)
 | |
| 
 | |
|     def __getattr__(self, name):
 | |
|         # if _named doesn't exist yet, pretend it's an empty dict
 | |
|         if name == '_named':
 | |
|             return frozendict({})
 | |
|         try:
 | |
|             return self._named[name]
 | |
|         except KeyError:
 | |
|             raise AttributeError("no such attribute %s" % name)
 | |
| 
 | |
|     def __setattr__(self, name, value):
 | |
|         # make named components read-only properties
 | |
|         if name in self._named:
 | |
|             raise AttributeError("can't set attribute %s" % name)
 | |
|         return super(CompositeDevice, self).__setattr__(name, value)
 | |
| 
 | |
|     def __repr__(self):
 | |
|         try:
 | |
|             self._check_open()
 | |
|             return "<gpiozero.%s object containing %d devices: %s and %d unnamed>" % (
 | |
|                     self.__class__.__name__,
 | |
|                     len(self), ','.join(self._named),
 | |
|                     len(self) - len(self._named)
 | |
|                     )
 | |
|         except DeviceClosed:
 | |
|             return "<gpiozero.%s object closed>"
 | |
| 
 | |
|     def __len__(self):
 | |
|         return len(self._all)
 | |
| 
 | |
|     def __getitem__(self, index):
 | |
|         return self._all[index]
 | |
| 
 | |
|     def __iter__(self):
 | |
|         return iter(self._all)
 | |
| 
 | |
|     @property
 | |
|     def all(self):
 | |
|         # XXX Deprecate this in favour of using the instance as a container
 | |
|         return self._all
 | |
| 
 | |
|     def close(self):
 | |
|         if self._all:
 | |
|             for device in self._all:
 | |
|                 device.close()
 | |
| 
 | |
|     @property
 | |
|     def closed(self):
 | |
|         return all(device.closed for device in self)
 | |
| 
 | |
|     @property
 | |
|     def namedtuple(self):
 | |
|         return self._namedtuple
 | |
| 
 | |
|     @property
 | |
|     def value(self):
 | |
|         return self.namedtuple(*(device.value for device in self))
 | |
| 
 | |
|     @property
 | |
|     def is_active(self):
 | |
|         return any(self.value)
 | |
| 
 | |
| 
 | |
| class GPIODevice(Device):
 | |
|     """
 | |
|     Extends :class:`Device`. Represents a generic GPIO device and provides
 | |
|     the services common to all single-pin GPIO devices (like ensuring two
 | |
|     GPIO devices do no share a :attr:`pin`).
 | |
| 
 | |
|     :param int pin:
 | |
|         The GPIO pin (in BCM numbering) that the device is connected to. If
 | |
|         this is ``None``, :exc:`GPIOPinMissing` will be raised. If the pin is
 | |
|         already in use by another device, :exc:`GPIOPinInUse` will be raised.
 | |
|     """
 | |
|     def __init__(self, pin=None):
 | |
|         super(GPIODevice, self).__init__()
 | |
|         # self._pin must be set before any possible exceptions can be raised
 | |
|         # because it's accessed in __del__. However, it mustn't be given the
 | |
|         # value of pin until we've verified that it isn't already allocated
 | |
|         self._pin = None
 | |
|         if pin is None:
 | |
|             raise GPIOPinMissing('No pin given')
 | |
|         if isinstance(pin, int):
 | |
|             pin = pin_factory(pin)
 | |
|         with _PINS_LOCK:
 | |
|             if pin in _PINS:
 | |
|                 raise GPIOPinInUse(
 | |
|                     'pin %r is already in use by another gpiozero object' % pin
 | |
|                 )
 | |
|             _PINS.add(pin)
 | |
|         self._pin = pin
 | |
|         self._active_state = True
 | |
|         self._inactive_state = False
 | |
| 
 | |
|     def _state_to_value(self, state):
 | |
|         return bool(state == self._active_state)
 | |
| 
 | |
|     def _read(self):
 | |
|         try:
 | |
|             return self._state_to_value(self.pin.state)
 | |
|         except (AttributeError, TypeError):
 | |
|             self._check_open()
 | |
|             raise
 | |
| 
 | |
|     def close(self):
 | |
|         super(GPIODevice, self).close()
 | |
|         with _PINS_LOCK:
 | |
|             pin = self._pin
 | |
|             self._pin = None
 | |
|             if pin in _PINS:
 | |
|                 _PINS.remove(pin)
 | |
|                 pin.close()
 | |
| 
 | |
|     @property
 | |
|     def closed(self):
 | |
|         return self._pin is None
 | |
| 
 | |
|     def _check_open(self):
 | |
|         try:
 | |
|             super(GPIODevice, self)._check_open()
 | |
|         except DeviceClosed as e:
 | |
|             # For backwards compatibility; GPIODeviceClosed is deprecated
 | |
|             raise GPIODeviceClosed(str(e))
 | |
| 
 | |
|     @property
 | |
|     def pin(self):
 | |
|         """
 | |
|         The :class:`Pin` that the device is connected to. This will be ``None``
 | |
|         if the device has been closed (see the :meth:`close` method). When
 | |
|         dealing with GPIO pins, query ``pin.number`` to discover the GPIO
 | |
|         pin (in BCM numbering) that the device is connected to.
 | |
|         """
 | |
|         return self._pin
 | |
| 
 | |
|     @property
 | |
|     def value(self):
 | |
|         return self._read()
 | |
| 
 | |
|     def __repr__(self):
 | |
|         try:
 | |
|             return "<gpiozero.%s object on pin %r, is_active=%s>" % (
 | |
|                 self.__class__.__name__, self.pin, self.is_active)
 | |
|         except DeviceClosed:
 | |
|             return "<gpiozero.%s object closed>" % self.__class__.__name__
 | |
| 
 |