mirror of
https://github.com/KevinMidboe/python-gpiozero.git
synced 2025-12-08 20:39:01 +00:00
The source/values toolkit
Me and my big mouth. No sooner do I declare the base classes "relatively stable" than I go and mess around with it all again. Anyway, this is the long promised set of utilities to make source/values more interesting. It includes a few interesting little utility functions, a whole bunch of examples and introduces the notion of "pseudo" devices with no (obvious) hardware representation like a time-of-day device. This necessitated making the event system a little more generic (it's not exclusive the GPIO devices after all; no reason we can't use it on composite devices in future) and by this point the mixins have gotten large enough to justify their own module. The pseudo-devices are a bit spartan and basic at the moment but I'm sure there'll be plenty of future ideas...
This commit is contained in:
@@ -11,6 +11,7 @@ from .pins import (
|
||||
from .exc import (
|
||||
GPIOZeroError,
|
||||
DeviceClosed,
|
||||
BadEventHandler,
|
||||
CompositeDeviceError,
|
||||
CompositeDeviceBadName,
|
||||
SPIError,
|
||||
@@ -45,13 +46,15 @@ from .devices import (
|
||||
Device,
|
||||
GPIODevice,
|
||||
CompositeDevice,
|
||||
)
|
||||
from .mixins import (
|
||||
SharedMixin,
|
||||
SourceMixin,
|
||||
ValuesMixin,
|
||||
EventsMixin,
|
||||
)
|
||||
from .input_devices import (
|
||||
InputDevice,
|
||||
WaitableInputDevice,
|
||||
DigitalInputDevice,
|
||||
SmoothedInputDevice,
|
||||
Button,
|
||||
@@ -103,3 +106,24 @@ from .boards import (
|
||||
CamJamKitRobot,
|
||||
Energenie,
|
||||
)
|
||||
from .other_devices import (
|
||||
InternalDevice,
|
||||
PingServer,
|
||||
TimeOfDay,
|
||||
)
|
||||
from .source_tools import (
|
||||
averaged,
|
||||
clamped,
|
||||
conjunction,
|
||||
cos_values,
|
||||
disjunction,
|
||||
inverted,
|
||||
negated,
|
||||
post_delayed,
|
||||
pre_delayed,
|
||||
quantized,
|
||||
queued,
|
||||
random_values,
|
||||
scaled,
|
||||
sin_values,
|
||||
)
|
||||
|
||||
@@ -22,7 +22,8 @@ from .exc import (
|
||||
from .input_devices import Button
|
||||
from .output_devices import OutputDevice, LED, PWMLED, Buzzer, Motor
|
||||
from .threads import GPIOThread
|
||||
from .devices import Device, CompositeDevice, SharedMixin, SourceMixin
|
||||
from .devices import Device, CompositeDevice
|
||||
from .mixins import SharedMixin, SourceMixin
|
||||
|
||||
|
||||
class CompositeOutputDevice(SourceMixin, CompositeDevice):
|
||||
|
||||
@@ -15,6 +15,10 @@ from types import FunctionType
|
||||
from threading import RLock
|
||||
|
||||
from .threads import GPIOThread, _threads_shutdown
|
||||
from .mixins import (
|
||||
ValuesMixin,
|
||||
SharedMixin,
|
||||
)
|
||||
from .exc import (
|
||||
DeviceClosed,
|
||||
GPIOPinMissing,
|
||||
@@ -201,127 +205,35 @@ class GPIOBase(GPIOMeta(nstr('GPIOBase'), (), {})):
|
||||
self.close()
|
||||
|
||||
|
||||
class ValuesMixin(object):
|
||||
"""
|
||||
Adds a :attr:`values` property to the class which returns an infinite
|
||||
generator of readings from the :attr:`value` property.
|
||||
|
||||
.. note::
|
||||
|
||||
Use this mixin *first* in the parent class list.
|
||||
"""
|
||||
|
||||
@property
|
||||
def values(self):
|
||||
"""
|
||||
An infinite iterator of values read from `value`.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
yield self.value
|
||||
except GPIODeviceClosed:
|
||||
break
|
||||
|
||||
|
||||
class SourceMixin(object):
|
||||
"""
|
||||
Adds a :attr:`source` property to the class which, given an iterable,
|
||||
sets :attr:`value` to each member of that iterable until it is exhausted.
|
||||
|
||||
.. note::
|
||||
|
||||
Use this mixin *first* in the parent class list.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._source = None
|
||||
self._source_thread = None
|
||||
self._source_delay = 0.01
|
||||
super(SourceMixin, self).__init__(*args, **kwargs)
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
super(SourceMixin, self).close()
|
||||
except AttributeError:
|
||||
pass
|
||||
self.source = None
|
||||
|
||||
def _copy_values(self, source):
|
||||
for v in source:
|
||||
self.value = v
|
||||
if self._source_thread.stopping.wait(self._source_delay):
|
||||
break
|
||||
|
||||
@property
|
||||
def source_delay(self):
|
||||
"""
|
||||
The delay (measured in seconds) in the loop used to read values from
|
||||
:attr:`source`. Defaults to 0.01 seconds which is generally sufficient
|
||||
to keep CPU usage to a minimum while providing adequate responsiveness.
|
||||
"""
|
||||
return self._source_delay
|
||||
|
||||
@source_delay.setter
|
||||
def source_delay(self, value):
|
||||
if value < 0:
|
||||
raise GPIOBadSourceDelay('source_delay must be 0 or greater')
|
||||
self._source_delay = float(value)
|
||||
|
||||
@property
|
||||
def source(self):
|
||||
"""
|
||||
The iterable to use as a source of values for :attr:`value`.
|
||||
"""
|
||||
return self._source
|
||||
|
||||
@source.setter
|
||||
def source(self, value):
|
||||
if self._source_thread is not None:
|
||||
self._source_thread.stop()
|
||||
self._source_thread = None
|
||||
self._source = value
|
||||
if value is not None:
|
||||
self._source_thread = GPIOThread(target=self._copy_values, args=(value,))
|
||||
self._source_thread.start()
|
||||
|
||||
|
||||
class SharedMixin(object):
|
||||
"""
|
||||
This mixin marks a class as "shared". In this case, the meta-class
|
||||
(GPIOMeta) will use :meth:`_shared_key` to convert the constructor
|
||||
arguments to an immutable key, and will check whether any existing
|
||||
instances match that key. If they do, they will be returned by the
|
||||
constructor instead of a new instance. An internal reference counter is
|
||||
used to determine how many times an instance has been "constructed" in this
|
||||
way.
|
||||
|
||||
When :meth:`close` is called, an internal reference counter will be
|
||||
decremented and the instance will only close when it reaches zero.
|
||||
"""
|
||||
_INSTANCES = {}
|
||||
|
||||
def __del__(self):
|
||||
self._refs = 0
|
||||
super(SharedMixin, self).__del__()
|
||||
|
||||
@classmethod
|
||||
def _shared_key(cls, *args, **kwargs):
|
||||
"""
|
||||
Given the constructor arguments, returns an immutable key representing
|
||||
the instance. The default simply assumes all positional arguments are
|
||||
immutable.
|
||||
"""
|
||||
return args
|
||||
|
||||
|
||||
class Device(ValuesMixin, GPIOBase):
|
||||
"""
|
||||
Represents a single device of any type; GPIO-based, SPI-based, I2C-based,
|
||||
etc. This is the base class of the device hierarchy.
|
||||
etc. This is the base class of the device hierarchy. It defines the
|
||||
basic services applicable to all devices (specifically thhe :attr:`is_active`
|
||||
property, the :attr:`value` property, and the :meth:`close` method).
|
||||
"""
|
||||
def __repr__(self):
|
||||
return "<gpiozero.%s object>" % (self.__class__.__name__)
|
||||
|
||||
@property
|
||||
def value(self):
|
||||
"""
|
||||
Returns a value representing the device's state. Frequently, this is a
|
||||
boolean value, or a number between 0 and 1 but some devices use larger
|
||||
ranges (e.g. -1 to +1) and composite devices usually use tuples to
|
||||
return the states of all their subordinate components.
|
||||
"""
|
||||
return 0
|
||||
|
||||
@property
|
||||
def is_active(self):
|
||||
"""
|
||||
Returns ``True`` if the device is currently active and ``False``
|
||||
otherwise. This property is usually derived from :attr:`value`. Unlike
|
||||
:attr:`value`, this is *always* a boolean.
|
||||
"""
|
||||
return bool(self.value)
|
||||
|
||||
|
||||
class CompositeDevice(Device):
|
||||
"""
|
||||
@@ -417,15 +329,16 @@ class CompositeDevice(Device):
|
||||
def value(self):
|
||||
return self.tuple(*(device.value for device in self))
|
||||
|
||||
@property
|
||||
def is_active(self):
|
||||
return any(self.value)
|
||||
|
||||
|
||||
class GPIODevice(Device):
|
||||
"""
|
||||
Extends :class:`Device`. Represents a generic GPIO device.
|
||||
|
||||
This is the class at the root of the gpiozero class hierarchy. It handles
|
||||
ensuring that two GPIO devices do not share the same pin, and provides
|
||||
basic services applicable to all devices (specifically the :attr:`pin`
|
||||
property, :attr:`is_active` property, and the :attr:`close` method).
|
||||
Extends :class:`Device`. Represents a generic GPIO device and provides
|
||||
the services common to all single-pin GPIO devices (like ensuring two
|
||||
GPIO devices do no share a :attr:`pin`).
|
||||
|
||||
:param int pin:
|
||||
The GPIO pin (in BCM numbering) that the device is connected to. If
|
||||
@@ -494,14 +407,8 @@ class GPIODevice(Device):
|
||||
|
||||
@property
|
||||
def value(self):
|
||||
"""
|
||||
Returns ``True`` if the device is currently active and ``False``
|
||||
otherwise.
|
||||
"""
|
||||
return self._read()
|
||||
|
||||
is_active = value
|
||||
|
||||
def __repr__(self):
|
||||
try:
|
||||
return "<gpiozero.%s object on pin %r, is_active=%s>" % (
|
||||
|
||||
@@ -13,6 +13,9 @@ class GPIOZeroError(Exception):
|
||||
class DeviceClosed(GPIOZeroError):
|
||||
"Error raised when an operation is attempted on a closed device"
|
||||
|
||||
class BadEventHandler(GPIOZeroError, ValueError):
|
||||
"Error raised when an event handler with an incompatible prototype is specified"
|
||||
|
||||
class CompositeDeviceError(GPIOZeroError):
|
||||
"Base class for errors specific to the CompositeDevice hierarchy"
|
||||
|
||||
|
||||
@@ -7,15 +7,12 @@ from __future__ import (
|
||||
division,
|
||||
)
|
||||
|
||||
import inspect
|
||||
import warnings
|
||||
from functools import wraps
|
||||
from time import sleep, time
|
||||
from threading import Event
|
||||
|
||||
from .exc import InputDeviceError, GPIODeviceError, GPIODeviceClosed
|
||||
from .devices import GPIODevice, CompositeDevice
|
||||
from .threads import GPIOQueue
|
||||
from .mixins import GPIOQueue, EventsMixin
|
||||
|
||||
|
||||
class InputDevice(GPIODevice):
|
||||
@@ -65,148 +62,7 @@ class InputDevice(GPIODevice):
|
||||
return super(InputDevice, self).__repr__()
|
||||
|
||||
|
||||
class WaitableInputDevice(InputDevice):
|
||||
"""
|
||||
Represents a generic input device with distinct waitable states.
|
||||
|
||||
This class extends :class:`InputDevice` with methods for waiting on the
|
||||
device's status (:meth:`wait_for_active` and :meth:`wait_for_inactive`),
|
||||
and properties that hold functions to be called when the device changes
|
||||
state (:meth:`when_activated` and :meth:`when_deactivated`). These are
|
||||
aliased appropriately in various subclasses.
|
||||
|
||||
.. note::
|
||||
|
||||
Note that this class provides no means of actually firing its events;
|
||||
it's effectively an abstract base class.
|
||||
"""
|
||||
def __init__(self, pin=None, pull_up=False):
|
||||
super(WaitableInputDevice, self).__init__(pin, pull_up)
|
||||
self._active_event = Event()
|
||||
self._inactive_event = Event()
|
||||
self._when_activated = None
|
||||
self._when_deactivated = None
|
||||
self._last_state = None
|
||||
|
||||
def wait_for_active(self, timeout=None):
|
||||
"""
|
||||
Pause the script until the device is activated, or the timeout is
|
||||
reached.
|
||||
|
||||
:param float timeout:
|
||||
Number of seconds to wait before proceeding. If this is ``None``
|
||||
(the default), then wait indefinitely until the device is active.
|
||||
"""
|
||||
return self._active_event.wait(timeout)
|
||||
|
||||
def wait_for_inactive(self, timeout=None):
|
||||
"""
|
||||
Pause the script until the device is deactivated, or the timeout is
|
||||
reached.
|
||||
|
||||
:param float timeout:
|
||||
Number of seconds to wait before proceeding. If this is ``None``
|
||||
(the default), then wait indefinitely until the device is inactive.
|
||||
"""
|
||||
return self._inactive_event.wait(timeout)
|
||||
|
||||
@property
|
||||
def when_activated(self):
|
||||
"""
|
||||
The function to run when the device changes state from inactive to
|
||||
active.
|
||||
|
||||
This can be set to a function which accepts no (mandatory) parameters,
|
||||
or a Python function which accepts a single mandatory parameter (with
|
||||
as many optional parameters as you like). If the function accepts a
|
||||
single mandatory parameter, the device that activated will be passed
|
||||
as that parameter.
|
||||
|
||||
Set this property to ``None`` (the default) to disable the event.
|
||||
"""
|
||||
return self._when_activated
|
||||
|
||||
@when_activated.setter
|
||||
def when_activated(self, value):
|
||||
self._when_activated = self._wrap_callback(value)
|
||||
|
||||
@property
|
||||
def when_deactivated(self):
|
||||
"""
|
||||
The function to run when the device changes state from active to
|
||||
inactive.
|
||||
|
||||
This can be set to a function which accepts no (mandatory) parameters,
|
||||
or a Python function which accepts a single mandatory parameter (with
|
||||
as many optional parameters as you like). If the function accepts a
|
||||
single mandatory parameter, the device that deactivated will be
|
||||
passed as that parameter.
|
||||
|
||||
Set this property to ``None`` (the default) to disable the event.
|
||||
"""
|
||||
return self._when_deactivated
|
||||
|
||||
@when_deactivated.setter
|
||||
def when_deactivated(self, value):
|
||||
self._when_deactivated = self._wrap_callback(value)
|
||||
|
||||
def _wrap_callback(self, fn):
|
||||
if fn is None:
|
||||
return None
|
||||
elif not callable(fn):
|
||||
raise InputDeviceError('value must be None or a callable')
|
||||
elif inspect.isbuiltin(fn):
|
||||
# We can't introspect the prototype of builtins. In this case we
|
||||
# assume that the builtin has no (mandatory) parameters; this is
|
||||
# the most reasonable assumption on the basis that pre-existing
|
||||
# builtins have no knowledge of gpiozero, and the sole parameter
|
||||
# we would pass is a gpiozero object
|
||||
return fn
|
||||
else:
|
||||
# Try binding ourselves to the argspec of the provided callable.
|
||||
# If this works, assume the function is capable of accepting no
|
||||
# parameters
|
||||
try:
|
||||
inspect.getcallargs(fn)
|
||||
return fn
|
||||
except TypeError:
|
||||
try:
|
||||
# If the above fails, try binding with a single parameter
|
||||
# (ourselves). If this works, wrap the specified callback
|
||||
inspect.getcallargs(fn, self)
|
||||
@wraps(fn)
|
||||
def wrapper():
|
||||
return fn(self)
|
||||
return wrapper
|
||||
except TypeError:
|
||||
raise InputDeviceError(
|
||||
'value must be a callable which accepts up to one '
|
||||
'mandatory parameter')
|
||||
|
||||
def _fire_events(self):
|
||||
old_state = self._last_state
|
||||
new_state = self._last_state = self.is_active
|
||||
if old_state is None:
|
||||
# Initial "indeterminate" state; set events but don't fire
|
||||
# callbacks as there's not necessarily an edge
|
||||
if new_state:
|
||||
self._active_event.set()
|
||||
else:
|
||||
self._inactive_event.set()
|
||||
else:
|
||||
if not old_state and new_state:
|
||||
self._inactive_event.clear()
|
||||
self._active_event.set()
|
||||
if self.when_activated:
|
||||
self.when_activated()
|
||||
elif old_state and not new_state:
|
||||
self._active_event.clear()
|
||||
self._inactive_event.set()
|
||||
if self.when_deactivated:
|
||||
self.when_deactivated()
|
||||
|
||||
|
||||
class DigitalInputDevice(WaitableInputDevice):
|
||||
class DigitalInputDevice(EventsMixin, InputDevice):
|
||||
"""
|
||||
Represents a generic input device with typical on/off behaviour.
|
||||
|
||||
@@ -233,7 +89,7 @@ class DigitalInputDevice(WaitableInputDevice):
|
||||
raise
|
||||
|
||||
|
||||
class SmoothedInputDevice(WaitableInputDevice):
|
||||
class SmoothedInputDevice(EventsMixin, InputDevice):
|
||||
"""
|
||||
Represents a generic input device which takes its value from the mean of a
|
||||
queue of historical values.
|
||||
|
||||
326
gpiozero/mixins.py
Normal file
326
gpiozero/mixins.py
Normal file
@@ -0,0 +1,326 @@
|
||||
from __future__ import (
|
||||
unicode_literals,
|
||||
print_function,
|
||||
absolute_import,
|
||||
division,
|
||||
)
|
||||
nstr = str
|
||||
str = type('')
|
||||
|
||||
import inspect
|
||||
import weakref
|
||||
from functools import wraps
|
||||
from threading import Event
|
||||
from collections import deque
|
||||
try:
|
||||
from statistics import median, mean
|
||||
except ImportError:
|
||||
from .compat import median, mean
|
||||
|
||||
from .threads import GPIOThread
|
||||
from .exc import BadEventHandler, DeviceClosed
|
||||
|
||||
|
||||
class ValuesMixin(object):
|
||||
"""
|
||||
Adds a :attr:`values` property to the class which returns an infinite
|
||||
generator of readings from the :attr:`value` property.
|
||||
|
||||
.. note::
|
||||
|
||||
Use this mixin *first* in the parent class list.
|
||||
"""
|
||||
|
||||
@property
|
||||
def values(self):
|
||||
"""
|
||||
An infinite iterator of values read from `value`.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
yield self.value
|
||||
except DeviceClosed:
|
||||
break
|
||||
|
||||
|
||||
class SourceMixin(object):
|
||||
"""
|
||||
Adds a :attr:`source` property to the class which, given an iterable,
|
||||
sets :attr:`value` to each member of that iterable until it is exhausted.
|
||||
|
||||
.. note::
|
||||
|
||||
Use this mixin *first* in the parent class list.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._source = None
|
||||
self._source_thread = None
|
||||
self._source_delay = 0.01
|
||||
super(SourceMixin, self).__init__(*args, **kwargs)
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
super(SourceMixin, self).close()
|
||||
except AttributeError:
|
||||
pass
|
||||
self.source = None
|
||||
|
||||
def _copy_values(self, source):
|
||||
for v in source:
|
||||
self.value = v
|
||||
if self._source_thread.stopping.wait(self._source_delay):
|
||||
break
|
||||
|
||||
@property
|
||||
def source_delay(self):
|
||||
"""
|
||||
The delay (measured in seconds) in the loop used to read values from
|
||||
:attr:`source`. Defaults to 0.01 seconds which is generally sufficient
|
||||
to keep CPU usage to a minimum while providing adequate responsiveness.
|
||||
"""
|
||||
return self._source_delay
|
||||
|
||||
@source_delay.setter
|
||||
def source_delay(self, value):
|
||||
if value < 0:
|
||||
raise GPIOBadSourceDelay('source_delay must be 0 or greater')
|
||||
self._source_delay = float(value)
|
||||
|
||||
@property
|
||||
def source(self):
|
||||
"""
|
||||
The iterable to use as a source of values for :attr:`value`.
|
||||
"""
|
||||
return self._source
|
||||
|
||||
@source.setter
|
||||
def source(self, value):
|
||||
if self._source_thread is not None:
|
||||
self._source_thread.stop()
|
||||
self._source_thread = None
|
||||
self._source = value
|
||||
if value is not None:
|
||||
self._source_thread = GPIOThread(target=self._copy_values, args=(value,))
|
||||
self._source_thread.start()
|
||||
|
||||
|
||||
class SharedMixin(object):
|
||||
"""
|
||||
This mixin marks a class as "shared". In this case, the meta-class
|
||||
(GPIOMeta) will use :meth:`_shared_key` to convert the constructor
|
||||
arguments to an immutable key, and will check whether any existing
|
||||
instances match that key. If they do, they will be returned by the
|
||||
constructor instead of a new instance. An internal reference counter is
|
||||
used to determine how many times an instance has been "constructed" in this
|
||||
way.
|
||||
|
||||
When :meth:`close` is called, an internal reference counter will be
|
||||
decremented and the instance will only close when it reaches zero.
|
||||
"""
|
||||
_INSTANCES = {}
|
||||
|
||||
def __del__(self):
|
||||
self._refs = 0
|
||||
super(SharedMixin, self).__del__()
|
||||
|
||||
@classmethod
|
||||
def _shared_key(cls, *args, **kwargs):
|
||||
"""
|
||||
Given the constructor arguments, returns an immutable key representing
|
||||
the instance. The default simply assumes all positional arguments are
|
||||
immutable.
|
||||
"""
|
||||
return args
|
||||
|
||||
|
||||
class EventsMixin(object):
|
||||
"""
|
||||
Adds edge-detected :meth:`when_activated` and :meth:`when_deactivated`
|
||||
events to a device based on changes to the :attr:`~Device.is_active`
|
||||
property common to all devices. Also adds :meth:`wait_for_active` and
|
||||
:meth:`wait_for_inactive` methods for level-waiting.
|
||||
|
||||
.. note::
|
||||
|
||||
Note that this mixin provides no means of actually firing its events;
|
||||
call :meth:`_fire_events` in sub-classes when device state changes to
|
||||
trigger the events. This should also be called once at the end of
|
||||
initialization to set initial states.
|
||||
"""
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(EventsMixin, self).__init__(*args, **kwargs)
|
||||
self._active_event = Event()
|
||||
self._inactive_event = Event()
|
||||
self._when_activated = None
|
||||
self._when_deactivated = None
|
||||
self._last_state = None
|
||||
|
||||
def wait_for_active(self, timeout=None):
|
||||
"""
|
||||
Pause the script until the device is activated, or the timeout is
|
||||
reached.
|
||||
|
||||
:param float timeout:
|
||||
Number of seconds to wait before proceeding. If this is ``None``
|
||||
(the default), then wait indefinitely until the device is active.
|
||||
"""
|
||||
return self._active_event.wait(timeout)
|
||||
|
||||
def wait_for_inactive(self, timeout=None):
|
||||
"""
|
||||
Pause the script until the device is deactivated, or the timeout is
|
||||
reached.
|
||||
|
||||
:param float timeout:
|
||||
Number of seconds to wait before proceeding. If this is ``None``
|
||||
(the default), then wait indefinitely until the device is inactive.
|
||||
"""
|
||||
return self._inactive_event.wait(timeout)
|
||||
|
||||
@property
|
||||
def when_activated(self):
|
||||
"""
|
||||
The function to run when the device changes state from inactive to
|
||||
active.
|
||||
|
||||
This can be set to a function which accepts no (mandatory) parameters,
|
||||
or a Python function which accepts a single mandatory parameter (with
|
||||
as many optional parameters as you like). If the function accepts a
|
||||
single mandatory parameter, the device that activated will be passed
|
||||
as that parameter.
|
||||
|
||||
Set this property to ``None`` (the default) to disable the event.
|
||||
"""
|
||||
return self._when_activated
|
||||
|
||||
@when_activated.setter
|
||||
def when_activated(self, value):
|
||||
self._when_activated = self._wrap_callback(value)
|
||||
|
||||
@property
|
||||
def when_deactivated(self):
|
||||
"""
|
||||
The function to run when the device changes state from active to
|
||||
inactive.
|
||||
|
||||
This can be set to a function which accepts no (mandatory) parameters,
|
||||
or a Python function which accepts a single mandatory parameter (with
|
||||
as many optional parameters as you like). If the function accepts a
|
||||
single mandatory parameter, the device that deactivated will be
|
||||
passed as that parameter.
|
||||
|
||||
Set this property to ``None`` (the default) to disable the event.
|
||||
"""
|
||||
return self._when_deactivated
|
||||
|
||||
@when_deactivated.setter
|
||||
def when_deactivated(self, value):
|
||||
self._when_deactivated = self._wrap_callback(value)
|
||||
|
||||
def _wrap_callback(self, fn):
|
||||
if fn is None:
|
||||
return None
|
||||
elif not callable(fn):
|
||||
raise BadEventHandler('value must be None or a callable')
|
||||
elif inspect.isbuiltin(fn):
|
||||
# We can't introspect the prototype of builtins. In this case we
|
||||
# assume that the builtin has no (mandatory) parameters; this is
|
||||
# the most reasonable assumption on the basis that pre-existing
|
||||
# builtins have no knowledge of gpiozero, and the sole parameter
|
||||
# we would pass is a gpiozero object
|
||||
return fn
|
||||
else:
|
||||
# Try binding ourselves to the argspec of the provided callable.
|
||||
# If this works, assume the function is capable of accepting no
|
||||
# parameters
|
||||
try:
|
||||
inspect.getcallargs(fn)
|
||||
return fn
|
||||
except TypeError:
|
||||
try:
|
||||
# If the above fails, try binding with a single parameter
|
||||
# (ourselves). If this works, wrap the specified callback
|
||||
inspect.getcallargs(fn, self)
|
||||
@wraps(fn)
|
||||
def wrapper():
|
||||
return fn(self)
|
||||
return wrapper
|
||||
except TypeError:
|
||||
raise BadEventHandler(
|
||||
'value must be a callable which accepts up to one '
|
||||
'mandatory parameter')
|
||||
|
||||
def _fire_events(self):
|
||||
old_state = self._last_state
|
||||
new_state = self._last_state = self.is_active
|
||||
if old_state is None:
|
||||
# Initial "indeterminate" state; set events but don't fire
|
||||
# callbacks as there's not necessarily an edge
|
||||
if new_state:
|
||||
self._active_event.set()
|
||||
else:
|
||||
self._inactive_event.set()
|
||||
else:
|
||||
if not old_state and new_state:
|
||||
self._inactive_event.clear()
|
||||
self._active_event.set()
|
||||
if self.when_activated:
|
||||
self.when_activated()
|
||||
elif old_state and not new_state:
|
||||
self._active_event.clear()
|
||||
self._inactive_event.set()
|
||||
if self.when_deactivated:
|
||||
self.when_deactivated()
|
||||
|
||||
|
||||
class GPIOQueue(GPIOThread):
|
||||
"""
|
||||
Extends :class:`GPIOThread`. Provides a background thread that monitors a
|
||||
device's values and provides a running *average* (defaults to median) of
|
||||
those values. If the *parent* device includes the :class:`EventsMixin` in
|
||||
its ancestry, the thread automatically calls
|
||||
:meth:`~EventsMixin._fire_events`.
|
||||
"""
|
||||
def __init__(
|
||||
self, parent, queue_len=5, sample_wait=0.0, partial=False,
|
||||
average=median):
|
||||
assert callable(average)
|
||||
super(GPIOQueue, self).__init__(target=self.fill)
|
||||
if queue_len < 1:
|
||||
raise GPIOBadQueueLen('queue_len must be at least one')
|
||||
if sample_wait < 0:
|
||||
raise GPIOBadSampleWait('sample_wait must be 0 or greater')
|
||||
self.queue = deque(maxlen=queue_len)
|
||||
self.partial = partial
|
||||
self.sample_wait = sample_wait
|
||||
self.full = Event()
|
||||
self.parent = weakref.proxy(parent)
|
||||
self.average = average
|
||||
|
||||
@property
|
||||
def value(self):
|
||||
if not self.partial:
|
||||
self.full.wait()
|
||||
try:
|
||||
return self.average(self.queue)
|
||||
except ZeroDivisionError:
|
||||
# No data == inactive value
|
||||
return 0.0
|
||||
|
||||
def fill(self):
|
||||
try:
|
||||
while (not self.stopping.wait(self.sample_wait) and
|
||||
len(self.queue) < self.queue.maxlen):
|
||||
self.queue.append(self.parent._read())
|
||||
if self.partial and isinstance(self.parent, EventsMixin):
|
||||
self.parent._fire_events()
|
||||
self.full.set()
|
||||
while not self.stopping.wait(self.sample_wait):
|
||||
self.queue.append(self.parent._read())
|
||||
if isinstance(self.parent, EventsMixin):
|
||||
self.parent._fire_events()
|
||||
except ReferenceError:
|
||||
# Parent is dead; time to die!
|
||||
pass
|
||||
|
||||
168
gpiozero/other_devices.py
Normal file
168
gpiozero/other_devices.py
Normal file
@@ -0,0 +1,168 @@
|
||||
# vim: set fileencoding=utf-8:
|
||||
|
||||
from __future__ import (
|
||||
unicode_literals,
|
||||
print_function,
|
||||
absolute_import,
|
||||
division,
|
||||
)
|
||||
str = type('')
|
||||
|
||||
|
||||
import os
|
||||
import io
|
||||
import subprocess
|
||||
from datetime import datetime, time
|
||||
|
||||
from .devices import Device
|
||||
from .mixins import EventsMixin
|
||||
|
||||
|
||||
class InternalDevice(EventsMixin, Device):
|
||||
"""
|
||||
Extends :class:`Device` to provide a basis for devices which have no
|
||||
specific hardware representation. This are effectively pseudo-devices and
|
||||
usually represent operating system services like the internal clock, file
|
||||
systems or network facilities.
|
||||
"""
|
||||
|
||||
|
||||
class PingServer(InternalDevice):
|
||||
"""
|
||||
Extends :class:`InternalDevice` to provide a device which is active when a
|
||||
*host* on the network can be pinged.
|
||||
|
||||
The following example lights an LED while a server is reachable (note the
|
||||
use of :attr:`~SourceMixin.source_delay` to ensure the server is not
|
||||
flooded with pings)::
|
||||
|
||||
from gpiozero import PingServer, LED
|
||||
from signal import pause
|
||||
|
||||
server = PingServer('my-server')
|
||||
led = LED(4)
|
||||
led.source_delay = 1
|
||||
led.source = server.values
|
||||
pause()
|
||||
|
||||
:param str host:
|
||||
The hostname or IP address to attempt to ping.
|
||||
"""
|
||||
def __init__(self, host):
|
||||
self.host = host
|
||||
super(PingServer, self).__init__()
|
||||
self._fire_events()
|
||||
|
||||
def __repr__(self):
|
||||
return '<gpiozero.PingDevice host="%s">' % self.host
|
||||
|
||||
@property
|
||||
def value(self):
|
||||
# XXX This is doing a DNS lookup every time it's queried; should we
|
||||
# call gethostbyname in the constructor and ping that instead (good
|
||||
# for consistency, but what if the user *expects* the host to change
|
||||
# address?)
|
||||
with io.open(os.devnull, 'wb') as devnull:
|
||||
try:
|
||||
subprocess.check_call(
|
||||
['ping', '-c1', self.host],
|
||||
stdout=devnull, stderr=devnull)
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
class TimeOfDay(InternalDevice):
|
||||
"""
|
||||
Extends :class:`InternalDevice` to provide a device which is active when
|
||||
the computer's clock indicates that the current time is between
|
||||
*start_time* and *end_time* (inclusive) which are :class:`~datetime.time`
|
||||
instances.
|
||||
|
||||
The following example turns on a lamp attached to an :class:`Energenie`
|
||||
plug between 7 and 8 AM::
|
||||
|
||||
from datetime import time
|
||||
from gpiozero import TimeOfDay, Energenie
|
||||
from signal import pause
|
||||
|
||||
lamp = Energenie(0)
|
||||
morning = TimeOfDay(time(7), time(8))
|
||||
morning.when_activated = lamp.on
|
||||
morning.when_deactivated = lamp.off
|
||||
pause()
|
||||
|
||||
:param ~datetime.time start_time:
|
||||
The time from which the device will be considered active.
|
||||
|
||||
:param ~datetime.time end_time:
|
||||
The time after which the device will be considered inactive.
|
||||
|
||||
:param bool utc:
|
||||
If ``True`` (the default), a naive UTC time will be used for the
|
||||
comparison rather than a local time-zone reading.
|
||||
"""
|
||||
def __init__(self, start_time, end_time, utc=True):
|
||||
self._start_time = None
|
||||
self._end_time = None
|
||||
self._utc = True
|
||||
super(TimeOfDay, self).__init__()
|
||||
self.start_time = start_time
|
||||
self.end_time = end_time
|
||||
self.utc = utc
|
||||
self._fire_events()
|
||||
|
||||
def __repr__(self):
|
||||
return '<gpiozero.TimeOfDay active between %s and %s %s>' % (
|
||||
self.start_time, self.end_time, ('local', 'UTC')[self.utc])
|
||||
|
||||
@property
|
||||
def start_time(self):
|
||||
"""
|
||||
The time of day after which the device will be considered active.
|
||||
"""
|
||||
return self._start_time
|
||||
|
||||
@start_time.setter
|
||||
def start_time(self, value):
|
||||
if isinstance(value, datetime):
|
||||
value = value.time()
|
||||
if not isinstance(value, time):
|
||||
raise ValueError('start_time must be a datetime, or time instance')
|
||||
self._start_time = value
|
||||
|
||||
@property
|
||||
def end_time(self):
|
||||
"""
|
||||
The time of day after which the device will be considered inactive.
|
||||
"""
|
||||
return self._end_time
|
||||
|
||||
@end_time.setter
|
||||
def end_time(self, value):
|
||||
if isinstance(value, datetime):
|
||||
value = value.time()
|
||||
if not isinstance(value, time):
|
||||
raise ValueError('end_time must be a datetime, or time instance')
|
||||
self._end_time = value
|
||||
|
||||
@property
|
||||
def utc(self):
|
||||
"""
|
||||
If ``True``, use a naive UTC time reading for comparison instead of a
|
||||
local timezone reading.
|
||||
"""
|
||||
return self._utc
|
||||
|
||||
@utc.setter
|
||||
def utc(self, value):
|
||||
self._utc = bool(value)
|
||||
|
||||
@property
|
||||
def value(self):
|
||||
if self.utc:
|
||||
return self.start_time <= datetime.utcnow().time() <= self.end_time
|
||||
else:
|
||||
return self.start_time <= datetime.now().time() <= self.end_time
|
||||
|
||||
@@ -11,7 +11,8 @@ from threading import Lock
|
||||
from itertools import repeat, cycle, chain
|
||||
|
||||
from .exc import OutputDeviceBadValue, GPIOPinMissing
|
||||
from .devices import GPIODevice, Device, CompositeDevice, SourceMixin
|
||||
from .devices import GPIODevice, Device, CompositeDevice
|
||||
from .mixins import SourceMixin
|
||||
from .threads import GPIOThread
|
||||
|
||||
|
||||
|
||||
296
gpiozero/source_tools.py
Normal file
296
gpiozero/source_tools.py
Normal file
@@ -0,0 +1,296 @@
|
||||
# vim: set fileencoding=utf-8:
|
||||
|
||||
from __future__ import (
|
||||
unicode_literals,
|
||||
print_function,
|
||||
absolute_import,
|
||||
division,
|
||||
)
|
||||
str = type('')
|
||||
|
||||
|
||||
from random import random
|
||||
from time import sleep
|
||||
try:
|
||||
from itertools import izip as zip
|
||||
except ImportError:
|
||||
pass
|
||||
from itertools import count, cycle
|
||||
from math import sin, cos, floor
|
||||
try:
|
||||
from statistics import mean
|
||||
except ImportError:
|
||||
from .compat import mean
|
||||
|
||||
|
||||
def negated(values):
|
||||
"""
|
||||
Returns the negation of the supplied values (``True`` becomes ``False``,
|
||||
and ``False`` becomes ``True``). For example::
|
||||
|
||||
from gpiozero import Button, LED, negated
|
||||
from signal import pause
|
||||
|
||||
led = LED(4)
|
||||
btn = Button(17)
|
||||
led.source = negated(btn.values)
|
||||
pause()
|
||||
"""
|
||||
for v in values:
|
||||
yield not v
|
||||
|
||||
|
||||
def inverted(values):
|
||||
"""
|
||||
Returns the inversion of the supplied values (1 becomes 0, 0 becomes 1,
|
||||
0.1 becomes 0.9, etc.). For example::
|
||||
|
||||
from gpiozero import MCP3008, PWMLED, inverted
|
||||
from signal import pause
|
||||
|
||||
led = PWMLED(4)
|
||||
pot = MCP3008(channel=0)
|
||||
led.source = inverted(pot.values)
|
||||
pause()
|
||||
"""
|
||||
for v in values:
|
||||
yield 1 - v
|
||||
|
||||
|
||||
def scaled(values, range_min, range_max, domain_min=0, domain_max=1):
|
||||
"""
|
||||
Returns *values* scaled from *range_min* to *range_max*, assuming that all
|
||||
items in *values* lie between *domain_min* and *domain_max* (which default
|
||||
to 0 and 1 respectively). For example, to control the direction of a motor
|
||||
(which is represented as a value between -1 and 1) using a potentiometer
|
||||
(which typically provides values between 0 and 1)::
|
||||
|
||||
from gpiozero import Motor, MCP3008, scaled
|
||||
from signal import pause
|
||||
|
||||
motor = Motor(20, 21)
|
||||
pot = MCP3008(channel=0)
|
||||
motor.source = scaled(pot.values, -1, 1)
|
||||
pause()
|
||||
"""
|
||||
domain_size = domain_max - domain_min
|
||||
range_size = range_max - range_min
|
||||
for v in values:
|
||||
yield (((v - domain_min) / domain_size) * range_size) + range_min
|
||||
|
||||
|
||||
def clamped(values, range_min=0, range_max=1):
|
||||
"""
|
||||
Returns *values* clamped from *range_min* to *range_max*, i.e. any items
|
||||
less than *range_min* will be returned as *range_min* and any items
|
||||
larger than *range_max* will be returned as *range_max* (these default to
|
||||
0 and 1 respectively). For example::
|
||||
|
||||
from gpiozero import PWMLED, MCP3008, clamped
|
||||
from signal import pause
|
||||
|
||||
led = PWMLED(4)
|
||||
pot = MCP3008(channel=0)
|
||||
led.source = clamped(pot.values, 0.5, 1.0)
|
||||
pause()
|
||||
"""
|
||||
for v in values:
|
||||
yield min(max(v, range_min), range_max)
|
||||
|
||||
|
||||
def quantized(values, steps, range_min=0, range_max=1):
|
||||
"""
|
||||
Returns *values* quantized to *steps* increments. All items in *values* are
|
||||
assumed to be between *range_min* and *range_max* (use :func:`scaled` to
|
||||
ensure this if necessary).
|
||||
|
||||
For example, to quantize values between 0 and 1 to 5 "steps" (0.0, 0.25,
|
||||
0.5, 0.75, 1.0)::
|
||||
|
||||
from gpiozero import PWMLED, MCP3008, quantized
|
||||
from signal import pause
|
||||
|
||||
led = PWMLED(4)
|
||||
pot = MCP3008(channel=0)
|
||||
led.source = quantized(pot.values, 4)
|
||||
pause()
|
||||
"""
|
||||
range_size = range_max - range_min
|
||||
for v in scaled(values, 0, 1, range_min, range_max):
|
||||
yield ((int(v * steps) / steps) * range_size) + range_min
|
||||
|
||||
|
||||
def conjunction(*values):
|
||||
"""
|
||||
Returns the `logical conjunction`_ of all supplied values (the result is
|
||||
only ``True`` if and only if all input values are simultaneously ``True``).
|
||||
One or more *values* can be specified. For example, to light an
|
||||
:class:`LED` only when *both* buttons are pressed::
|
||||
|
||||
from gpiozero import LED, Button, conjunction
|
||||
from signal import pause
|
||||
|
||||
led = LED(4)
|
||||
btn1 = Button(20)
|
||||
btn2 = Button(21)
|
||||
led.source = conjunction(btn1.values, btn2.values)
|
||||
pause()
|
||||
|
||||
.. _logical conjunction: https://en.wikipedia.org/wiki/Logical_conjunction
|
||||
"""
|
||||
for v in zip(*values):
|
||||
yield all(v)
|
||||
|
||||
|
||||
def disjunction(*values):
|
||||
"""
|
||||
Returns the `logical disjunction`_ of all supplied values (the result is
|
||||
``True`` if any of the input values are currently ``True``). One or more
|
||||
*values* can be specified. For example, the light an :class:`LED` when
|
||||
*any* button is pressed::
|
||||
|
||||
from gpiozero import LED, Button, conjunction
|
||||
from signal import pause
|
||||
|
||||
led = LED(4)
|
||||
btn1 = Button(20)
|
||||
btn2 = Button(21)
|
||||
led.source = disjunction(btn1.values, btn2.values)
|
||||
pause()
|
||||
|
||||
.. _logical disjunction: https://en.wikipedia.org/wiki/Logical_disjunction
|
||||
"""
|
||||
for v in zip(*values):
|
||||
yield any(v)
|
||||
|
||||
|
||||
def averaged(*values):
|
||||
"""
|
||||
Returns the mean of all supplied values. One or more *values* can be
|
||||
specified. For example, to light a :class:`PWMLED` as the average of
|
||||
several potentiometers connected to an :class:`MCP3008` ADC::
|
||||
|
||||
from gpiozero import MCP3008, PWMLED, averaged
|
||||
from signal import pause
|
||||
|
||||
pot1 = MCP3008(channel=0)
|
||||
pot2 = MCP3008(channel=1)
|
||||
pot3 = MCP3008(channel=2)
|
||||
led = PWMLED(4)
|
||||
led.source = averaged(pot1.values, pot2.values, pot3.values)
|
||||
pause()
|
||||
"""
|
||||
for v in zip(*values):
|
||||
yield mean(v)
|
||||
|
||||
|
||||
def queued(values, qsize):
|
||||
"""
|
||||
Queues up readings from *values* (the number of readings queued is
|
||||
determined by *qsize*) and begins yielding values only when the queue is
|
||||
full. For example, to "cascade" values along a sequence of LEDs::
|
||||
|
||||
from gpiozero import LEDBoard, Button, queued
|
||||
from signal import pause
|
||||
|
||||
leds = LEDBoard(5, 6, 13, 19, 26)
|
||||
btn = Button(17)
|
||||
for i in range(4):
|
||||
leds[i].source = queued(leds[i + 1].values, 5)
|
||||
leds[i].source_delay = 0.01
|
||||
leds[4].source = btn.values
|
||||
pause()
|
||||
"""
|
||||
q = []
|
||||
it = iter(values)
|
||||
for i in range(qsize):
|
||||
q.append(next(it))
|
||||
for i in cycle(range(qsize)):
|
||||
yield q[i]
|
||||
try:
|
||||
q[i] = next(it)
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
|
||||
def pre_delayed(values, delay):
|
||||
"""
|
||||
Waits for *delay* seconds before returning each item from *values*.
|
||||
"""
|
||||
for v in values:
|
||||
sleep(delay)
|
||||
yield v
|
||||
|
||||
|
||||
def post_delayed(values, delay):
|
||||
"""
|
||||
Waits for *delay* seconds after returning each item from *values*.
|
||||
"""
|
||||
for v in values:
|
||||
yield v
|
||||
sleep(delay)
|
||||
|
||||
|
||||
def random_values():
|
||||
"""
|
||||
Provides an infinite source of random values between 0 and 1. For example,
|
||||
to produce a "flickering candle" effect with an LED::
|
||||
|
||||
from gpiozero import PWMLED, random_values
|
||||
from signal import pause
|
||||
|
||||
led = PWMLED(4)
|
||||
led.source = random_values()
|
||||
pause()
|
||||
|
||||
If you require a wider range than 0 to 1, see :func:`scaled`.
|
||||
"""
|
||||
while True:
|
||||
yield random()
|
||||
|
||||
|
||||
def sin_values():
|
||||
"""
|
||||
Provides an infinite source of values representing a sine wave (from -1 to
|
||||
+1), calculated as the result of applying sign to a simple degrees counter
|
||||
that increments by one for each requested value. For example, to produce a
|
||||
"siren" effect with a couple of LEDs::
|
||||
|
||||
from gpiozero import PWMLED, sin_values, scaled, inverted
|
||||
from signal import pause
|
||||
|
||||
red = PWMLED(2)
|
||||
blue = PWMLED(3)
|
||||
red.source_delay = 0.1
|
||||
blue.source_delay = 0.1
|
||||
red.source = scaled(sin_values(), 0, 1, -1, 1)
|
||||
blue.source = inverted(red.values)
|
||||
pause()
|
||||
|
||||
If you require a wider range than 0 to 1, see :func:`scaled`.
|
||||
"""
|
||||
for d in cycle(range(360)):
|
||||
yield sin(d)
|
||||
|
||||
|
||||
def cos_values():
|
||||
"""
|
||||
Provides an infinite source of values representing a cosine wave (from -1
|
||||
to +1), calculated as the result of applying sign to a simple degrees
|
||||
counter that increments by one for each requested value. For example, to
|
||||
produce a "siren" effect with a couple of LEDs::
|
||||
|
||||
from gpiozero import PWMLED, cos_values, scaled, inverted
|
||||
from signal import pause
|
||||
|
||||
red = PWMLED(2)
|
||||
blue = PWMLED(3)
|
||||
red.source = scaled(cos_values(), 0, 1, -1, 1)
|
||||
blue.source = inverted(red.values)
|
||||
pause()
|
||||
|
||||
If you require a wider range than 0 to 1, see :func:`scaled`.
|
||||
"""
|
||||
for d in cycle(range(360)):
|
||||
yield cos(d)
|
||||
|
||||
@@ -6,13 +6,7 @@ from __future__ import (
|
||||
)
|
||||
str = type('')
|
||||
|
||||
import weakref
|
||||
from collections import deque
|
||||
from threading import Thread, Event, RLock
|
||||
try:
|
||||
from statistics import median, mean
|
||||
except ImportError:
|
||||
from .compat import median, mean
|
||||
from threading import Thread, Event
|
||||
|
||||
from .exc import (
|
||||
GPIOBadQueueLen,
|
||||
@@ -46,46 +40,3 @@ class GPIOThread(Thread):
|
||||
super(GPIOThread, self).join()
|
||||
_THREADS.discard(self)
|
||||
|
||||
|
||||
class GPIOQueue(GPIOThread):
|
||||
def __init__(
|
||||
self, parent, queue_len=5, sample_wait=0.0, partial=False,
|
||||
average=median):
|
||||
assert callable(average)
|
||||
super(GPIOQueue, self).__init__(target=self.fill)
|
||||
if queue_len < 1:
|
||||
raise GPIOBadQueueLen('queue_len must be at least one')
|
||||
if sample_wait < 0:
|
||||
raise GPIOBadSampleWait('sample_wait must be 0 or greater')
|
||||
self.queue = deque(maxlen=queue_len)
|
||||
self.partial = partial
|
||||
self.sample_wait = sample_wait
|
||||
self.full = Event()
|
||||
self.parent = weakref.proxy(parent)
|
||||
self.average = average
|
||||
|
||||
@property
|
||||
def value(self):
|
||||
if not self.partial:
|
||||
self.full.wait()
|
||||
try:
|
||||
return self.average(self.queue)
|
||||
except ZeroDivisionError:
|
||||
# No data == inactive value
|
||||
return 0.0
|
||||
|
||||
def fill(self):
|
||||
try:
|
||||
while (not self.stopping.wait(self.sample_wait) and
|
||||
len(self.queue) < self.queue.maxlen):
|
||||
self.queue.append(self.parent._read())
|
||||
if self.partial:
|
||||
self.parent._fire_events()
|
||||
self.full.set()
|
||||
while not self.stopping.wait(self.sample_wait):
|
||||
self.queue.append(self.parent._read())
|
||||
self.parent._fire_events()
|
||||
except ReferenceError:
|
||||
# Parent is dead; time to die!
|
||||
pass
|
||||
|
||||
|
||||
Reference in New Issue
Block a user