mirror of
https://github.com/KevinMidboe/python-gpiozero.git
synced 2025-10-29 09:40:36 +00:00
383 lines
12 KiB
Python
383 lines
12 KiB
Python
from __future__ import (
|
|
unicode_literals,
|
|
print_function,
|
|
absolute_import,
|
|
division,
|
|
)
|
|
nstr = str
|
|
str = type('')
|
|
|
|
import atexit
|
|
import weakref
|
|
from threading import Thread, Event, RLock
|
|
from collections import deque
|
|
from types import FunctionType
|
|
|
|
from .exc import (
|
|
GPIOPinMissing,
|
|
GPIOPinInUse,
|
|
GPIODeviceClosed,
|
|
GPIOBadQueueLen,
|
|
)
|
|
|
|
# Get a pin implementation to use as the default; we prefer RPi.GPIO's here
|
|
# as it supports PWM, and all Pi revisions. If no third-party libraries are
|
|
# available, however, we fall back to a pure Python implementation which
|
|
# supports platforms like PyPy
|
|
from .pins import PINS_CLEANUP
|
|
try:
|
|
from .pins.rpigpio import RPiGPIOPin
|
|
DefaultPin = RPiGPIOPin
|
|
except ImportError:
|
|
try:
|
|
from .pins.rpio import RPIOPin
|
|
DefaultPin = RPIOPin
|
|
except ImportError:
|
|
from .pins.native import NativePin
|
|
DefaultPin = NativePin
|
|
|
|
|
|
_THREADS = set()
|
|
_PINS = set()
|
|
# Due to interactions between RPi.GPIO cleanup and the GPIODevice.close()
|
|
# method the same thread may attempt to acquire this lock, leading to deadlock
|
|
# unless the lock is re-entrant
|
|
_PINS_LOCK = RLock()
|
|
|
|
def _shutdown():
|
|
while _THREADS:
|
|
for t in _THREADS.copy():
|
|
t.stop()
|
|
with _PINS_LOCK:
|
|
while _PINS:
|
|
_PINS.pop().close()
|
|
# Any cleanup routines registered by pins libraries must be called *after*
|
|
# cleanup of pin objects used by devices
|
|
for routine in PINS_CLEANUP:
|
|
routine()
|
|
|
|
atexit.register(_shutdown)
|
|
|
|
|
|
class GPIOMeta(type):
|
|
# NOTE Yes, this is a metaclass. Don't be scared - it's a simple one.
|
|
|
|
def __new__(mcls, name, bases, cls_dict):
|
|
# Construct the class as normal
|
|
cls = super(GPIOMeta, mcls).__new__(mcls, name, bases, cls_dict)
|
|
for attr_name, attr in cls_dict.items():
|
|
# If there's a method in the class which has no docstring, search
|
|
# the base classes recursively for a docstring to copy
|
|
if isinstance(attr, FunctionType) and not attr.__doc__:
|
|
for base_cls in cls.__mro__:
|
|
if hasattr(base_cls, attr_name):
|
|
base_fn = getattr(base_cls, attr_name)
|
|
if base_fn.__doc__:
|
|
attr.__doc__ = base_fn.__doc__
|
|
break
|
|
return cls
|
|
|
|
def __call__(mcls, *args, **kwargs):
|
|
# Construct the instance as normal and ensure it's an instance of
|
|
# GPIOBase (defined below with a custom __setattrs__)
|
|
result = super(GPIOMeta, mcls).__call__(*args, **kwargs)
|
|
assert isinstance(result, GPIOBase)
|
|
# At this point __new__ and __init__ have all been run. We now fix the
|
|
# set of attributes on the class by dir'ing the instance and creating a
|
|
# frozenset of the result called __attrs__ (which is queried by
|
|
# GPIOBase.__setattr__)
|
|
result.__attrs__ = frozenset(dir(result))
|
|
return result
|
|
|
|
|
|
# Cross-version compatible method of using a metaclass
|
|
class GPIOBase(GPIOMeta(nstr('GPIOBase'), (), {})):
|
|
def __setattr__(self, name, value):
|
|
# This overridden __setattr__ simply ensures that additional attributes
|
|
# cannot be set on the class after construction (it manages this in
|
|
# conjunction with the meta-class above). Traditionally, this is
|
|
# managed with __slots__; however, this doesn't work with Python's
|
|
# multiple inheritance system which we need to use in order to avoid
|
|
# repeating the "source" and "values" property code in myriad places
|
|
if hasattr(self, '__attrs__') and name not in self.__attrs__:
|
|
raise AttributeError(
|
|
"'%s' object has no attribute '%s'" % (
|
|
self.__class__.__name__, name))
|
|
return super(GPIOBase, self).__setattr__(name, value)
|
|
|
|
def __del__(self):
|
|
self.close()
|
|
|
|
def close(self):
|
|
# This is a placeholder which is simply here to ensure close() can be
|
|
# safely called from subclasses without worrying whether super-class'
|
|
# have it (which in turn is useful in conjunction with the SourceMixin
|
|
# class).
|
|
"""
|
|
Shut down the device and release all associated resources.
|
|
"""
|
|
pass
|
|
|
|
@property
|
|
def closed(self):
|
|
"""
|
|
Returns ``True`` if the device is closed (see the :meth:`close`
|
|
method). Once a device is closed you can no longer use any other
|
|
methods or properties to control or query the device.
|
|
"""
|
|
return False
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, exc_tb):
|
|
self.close()
|
|
|
|
|
|
class ValuesMixin(object):
|
|
# NOTE Use this mixin *first* in the parent list
|
|
|
|
@property
|
|
def values(self):
|
|
"""
|
|
An infinite iterator of values read from `value`.
|
|
"""
|
|
while True:
|
|
try:
|
|
yield self.value
|
|
except GPIODeviceClosed:
|
|
break
|
|
|
|
|
|
class SourceMixin(object):
|
|
# NOTE Use this mixin *first* in the parent list
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self._source = None
|
|
self._source_thread = None
|
|
super(SourceMixin, self).__init__(*args, **kwargs)
|
|
|
|
def close(self):
|
|
try:
|
|
super(SourceMixin, self).close()
|
|
except AttributeError:
|
|
pass
|
|
self.source = None
|
|
|
|
def _copy_values(self, source):
|
|
for v in source:
|
|
self.value = v
|
|
if self._source_thread.stopping.wait(0):
|
|
break
|
|
|
|
@property
|
|
def source(self):
|
|
"""
|
|
The iterable to use as a source of values for `value`.
|
|
"""
|
|
return self._source
|
|
|
|
@source.setter
|
|
def source(self, value):
|
|
if self._source_thread is not None:
|
|
self._source_thread.stop()
|
|
self._source_thread = None
|
|
self._source = value
|
|
if value is not None:
|
|
self._source_thread = GPIOThread(target=self._copy_values, args=(value,))
|
|
self._source_thread.start()
|
|
|
|
|
|
class CompositeDevice(ValuesMixin, GPIOBase):
|
|
"""
|
|
Represents a device composed of multiple GPIO devices like simple HATs,
|
|
H-bridge motor controllers, robots composed of multiple motors, etc.
|
|
"""
|
|
def __repr__(self):
|
|
return "<gpiozero.%s object>" % (self.__class__.__name__)
|
|
|
|
|
|
class GPIODevice(ValuesMixin, GPIOBase):
|
|
"""
|
|
Represents a generic GPIO device.
|
|
|
|
This is the class at the root of the gpiozero class hierarchy. It handles
|
|
ensuring that two GPIO devices do not share the same pin, and provides
|
|
basic services applicable to all devices (specifically the :attr:`pin`
|
|
property, :attr:`is_active` property, and the :attr:`close` method).
|
|
|
|
:param int pin:
|
|
The GPIO pin (in BCM numbering) that the device is connected to. If
|
|
this is ``None``, :exc:`GPIOPinMissing` will be raised. If the pin is
|
|
already in use by another device, :exc:`GPIOPinInUse` will be raised.
|
|
"""
|
|
def __init__(self, pin=None):
|
|
super(GPIODevice, self).__init__()
|
|
# self._pin must be set before any possible exceptions can be raised
|
|
# because it's accessed in __del__. However, it mustn't be given the
|
|
# value of pin until we've verified that it isn't already allocated
|
|
self._pin = None
|
|
if pin is None:
|
|
raise GPIOPinMissing('No pin given')
|
|
if isinstance(pin, int):
|
|
pin = DefaultPin(pin)
|
|
with _PINS_LOCK:
|
|
if pin in _PINS:
|
|
raise GPIOPinInUse(
|
|
'pin %r is already in use by another gpiozero object' % pin
|
|
)
|
|
_PINS.add(pin)
|
|
self._pin = pin
|
|
self._active_state = True
|
|
self._inactive_state = False
|
|
|
|
def _read(self):
|
|
try:
|
|
return self.pin.state == self._active_state
|
|
except TypeError:
|
|
self._check_open()
|
|
raise
|
|
|
|
def _fire_events(self):
|
|
pass
|
|
|
|
def _check_open(self):
|
|
if self.closed:
|
|
raise GPIODeviceClosed(
|
|
'%s is closed or uninitialized' % self.__class__.__name__)
|
|
|
|
def close(self):
|
|
"""
|
|
Shut down the device and release all associated resources.
|
|
|
|
This method is primarily intended for interactive use at the command
|
|
line. It disables the device and releases its pin for use by another
|
|
device.
|
|
|
|
You can attempt to do this simply by deleting an object, but unless
|
|
you've cleaned up all references to the object this may not work (even
|
|
if you've cleaned up all references, there's still no guarantee the
|
|
garbage collector will actually delete the object at that point). By
|
|
contrast, the close method provides a means of ensuring that the object
|
|
is shut down.
|
|
|
|
For example, if you have a breadboard with a buzzer connected to pin
|
|
16, but then wish to attach an LED instead:
|
|
|
|
>>> from gpiozero import *
|
|
>>> bz = Buzzer(16)
|
|
>>> bz.on()
|
|
>>> bz.off()
|
|
>>> bz.close()
|
|
>>> led = LED(16)
|
|
>>> led.blink()
|
|
|
|
:class:`GPIODevice` descendents can also be used as context managers
|
|
using the :keyword:`with` statement. For example:
|
|
|
|
>>> from gpiozero import *
|
|
>>> with Buzzer(16) as bz:
|
|
... bz.on()
|
|
...
|
|
>>> with LED(16) as led:
|
|
... led.on()
|
|
...
|
|
"""
|
|
super(GPIODevice, self).close()
|
|
with _PINS_LOCK:
|
|
pin = self._pin
|
|
self._pin = None
|
|
if pin in _PINS:
|
|
_PINS.remove(pin)
|
|
pin.close()
|
|
|
|
@property
|
|
def closed(self):
|
|
return self._pin is None
|
|
|
|
@property
|
|
def pin(self):
|
|
"""
|
|
The :class:`Pin` that the device is connected to. This will be ``None``
|
|
if the device has been closed (see the :meth:`close` method). When
|
|
dealing with GPIO pins, query ``pin.number`` to discover the GPIO
|
|
pin (in BCM numbering) that the device is connected to.
|
|
"""
|
|
return self._pin
|
|
|
|
@property
|
|
def value(self):
|
|
"""
|
|
Returns ``True`` if the device is currently active and ``False``
|
|
otherwise.
|
|
"""
|
|
return self._read()
|
|
|
|
is_active = value
|
|
|
|
def __repr__(self):
|
|
try:
|
|
return "<gpiozero.%s object on pin %r, is_active=%s>" % (
|
|
self.__class__.__name__, self.pin, self.is_active)
|
|
except GPIODeviceClosed:
|
|
return "<gpiozero.%s object closed>" % self.__class__.__name__
|
|
|
|
|
|
class GPIOThread(Thread):
|
|
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
|
|
super(GPIOThread, self).__init__(group, target, name, args, kwargs)
|
|
self.stopping = Event()
|
|
self.daemon = True
|
|
|
|
def start(self):
|
|
self.stopping.clear()
|
|
_THREADS.add(self)
|
|
super(GPIOThread, self).start()
|
|
|
|
def stop(self):
|
|
self.stopping.set()
|
|
self.join()
|
|
|
|
def join(self):
|
|
super(GPIOThread, self).join()
|
|
_THREADS.discard(self)
|
|
|
|
|
|
class GPIOQueue(GPIOThread):
|
|
def __init__(self, parent, queue_len=5, sample_wait=0.0, partial=False):
|
|
assert isinstance(parent, GPIODevice)
|
|
super(GPIOQueue, self).__init__(target=self.fill)
|
|
if queue_len < 1:
|
|
raise GPIOBadQueueLen('queue_len must be at least one')
|
|
self.queue = deque(maxlen=queue_len)
|
|
self.partial = partial
|
|
self.sample_wait = sample_wait
|
|
self.full = Event()
|
|
self.parent = weakref.proxy(parent)
|
|
|
|
@property
|
|
def value(self):
|
|
if not self.partial:
|
|
self.full.wait()
|
|
try:
|
|
return sum(self.queue) / len(self.queue)
|
|
except ZeroDivisionError:
|
|
# No data == inactive value
|
|
return 0.0
|
|
|
|
def fill(self):
|
|
try:
|
|
while (not self.stopping.wait(self.sample_wait) and
|
|
len(self.queue) < self.queue.maxlen):
|
|
self.queue.append(self.parent._read())
|
|
if self.partial:
|
|
self.parent._fire_events()
|
|
self.full.set()
|
|
while not self.stopping.wait(self.sample_wait):
|
|
self.queue.append(self.parent._read())
|
|
self.parent._fire_events()
|
|
except ReferenceError:
|
|
# Parent is dead; time to die!
|
|
pass
|
|
|