mirror of
				https://github.com/KevinMidboe/python-gpiozero.git
				synced 2025-10-29 17:50:37 +00:00 
			
		
		
		
	While the tests work well on a PC or Travis, the Pi (where I ought to be running them!) has some issues with the timing tests. Need to relax the tolerance of the "assert_states_and_times" method to 0.05 seconds otherwise it periodically fails even on something reasonably quick like a Pi 2 (less failures on a Pi 3 but still occasionally). Also reduced default fps to 25; if the default timing occasionally fails on a Pi 2 it's evidently too fast for a Pi 1 and shouldn't be the default; 25 also doesn't look any different to me on a pulsing LED. There's also a bunch of miscellaneous fixes in here; last minute typos and chart re-gens for the 1.2 release.
		
			
				
	
	
		
			491 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			491 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from __future__ import (
 | 
						|
    unicode_literals,
 | 
						|
    print_function,
 | 
						|
    absolute_import,
 | 
						|
    division,
 | 
						|
    )
 | 
						|
nstr = str
 | 
						|
str = type('')
 | 
						|
 | 
						|
import inspect
 | 
						|
import weakref
 | 
						|
from functools import wraps
 | 
						|
from threading import Event
 | 
						|
from collections import deque
 | 
						|
from time import time
 | 
						|
try:
 | 
						|
    from statistics import median
 | 
						|
except ImportError:
 | 
						|
    from .compat import median
 | 
						|
 | 
						|
from .threads import GPIOThread
 | 
						|
from .exc import (
 | 
						|
    BadEventHandler,
 | 
						|
    BadWaitTime,
 | 
						|
    BadQueueLen,
 | 
						|
    DeviceClosed,
 | 
						|
    )
 | 
						|
 | 
						|
class ValuesMixin(object):
 | 
						|
    """
 | 
						|
    Adds a :attr:`values` property to the class which returns an infinite
 | 
						|
    generator of readings from the :attr:`value` property.
 | 
						|
 | 
						|
    .. note::
 | 
						|
 | 
						|
        Use this mixin *first* in the parent class list.
 | 
						|
    """
 | 
						|
 | 
						|
    @property
 | 
						|
    def values(self):
 | 
						|
        """
 | 
						|
        An infinite iterator of values read from `value`.
 | 
						|
        """
 | 
						|
        while True:
 | 
						|
            try:
 | 
						|
                yield self.value
 | 
						|
            except DeviceClosed:
 | 
						|
                break
 | 
						|
 | 
						|
 | 
						|
class SourceMixin(object):
 | 
						|
    """
 | 
						|
    Adds a :attr:`source` property to the class which, given an iterable,
 | 
						|
    sets :attr:`value` to each member of that iterable until it is exhausted.
 | 
						|
 | 
						|
    .. note::
 | 
						|
 | 
						|
        Use this mixin *first* in the parent class list.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, *args, **kwargs):
 | 
						|
        self._source = None
 | 
						|
        self._source_thread = None
 | 
						|
        self._source_delay = 0.01
 | 
						|
        super(SourceMixin, self).__init__(*args, **kwargs)
 | 
						|
 | 
						|
    def close(self):
 | 
						|
        try:
 | 
						|
            super(SourceMixin, self).close()
 | 
						|
        except AttributeError:
 | 
						|
            pass
 | 
						|
        self.source = None
 | 
						|
 | 
						|
    def _copy_values(self, source):
 | 
						|
        for v in source:
 | 
						|
            self.value = v
 | 
						|
            if self._source_thread.stopping.wait(self._source_delay):
 | 
						|
                break
 | 
						|
 | 
						|
    @property
 | 
						|
    def source_delay(self):
 | 
						|
        """
 | 
						|
        The delay (measured in seconds) in the loop used to read values from
 | 
						|
        :attr:`source`. Defaults to 0.01 seconds which is generally sufficient
 | 
						|
        to keep CPU usage to a minimum while providing adequate responsiveness.
 | 
						|
        """
 | 
						|
        return self._source_delay
 | 
						|
 | 
						|
    @source_delay.setter
 | 
						|
    def source_delay(self, value):
 | 
						|
        if value < 0:
 | 
						|
            raise BadWaitTime('source_delay must be 0 or greater')
 | 
						|
        self._source_delay = float(value)
 | 
						|
 | 
						|
    @property
 | 
						|
    def source(self):
 | 
						|
        """
 | 
						|
        The iterable to use as a source of values for :attr:`value`.
 | 
						|
        """
 | 
						|
        return self._source
 | 
						|
 | 
						|
    @source.setter
 | 
						|
    def source(self, value):
 | 
						|
        if self._source_thread is not None:
 | 
						|
            self._source_thread.stop()
 | 
						|
            self._source_thread = None
 | 
						|
        self._source = value
 | 
						|
        if value is not None:
 | 
						|
            self._source_thread = GPIOThread(target=self._copy_values, args=(value,))
 | 
						|
            self._source_thread.start()
 | 
						|
 | 
						|
 | 
						|
class SharedMixin(object):
 | 
						|
    """
 | 
						|
    This mixin marks a class as "shared". In this case, the meta-class
 | 
						|
    (GPIOMeta) will use :meth:`_shared_key` to convert the constructor
 | 
						|
    arguments to an immutable key, and will check whether any existing
 | 
						|
    instances match that key. If they do, they will be returned by the
 | 
						|
    constructor instead of a new instance. An internal reference counter is
 | 
						|
    used to determine how many times an instance has been "constructed" in this
 | 
						|
    way.
 | 
						|
 | 
						|
    When :meth:`close` is called, an internal reference counter will be
 | 
						|
    decremented and the instance will only close when it reaches zero.
 | 
						|
    """
 | 
						|
    _INSTANCES = {}
 | 
						|
 | 
						|
    def __del__(self):
 | 
						|
        self._refs = 0
 | 
						|
        super(SharedMixin, self).__del__()
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def _shared_key(cls, *args, **kwargs):
 | 
						|
        """
 | 
						|
        Given the constructor arguments, returns an immutable key representing
 | 
						|
        the instance. The default simply assumes all positional arguments are
 | 
						|
        immutable.
 | 
						|
        """
 | 
						|
        return args
 | 
						|
 | 
						|
 | 
						|
class EventsMixin(object):
 | 
						|
    """
 | 
						|
    Adds edge-detected :meth:`when_activated` and :meth:`when_deactivated`
 | 
						|
    events to a device based on changes to the :attr:`~Device.is_active`
 | 
						|
    property common to all devices. Also adds :meth:`wait_for_active` and
 | 
						|
    :meth:`wait_for_inactive` methods for level-waiting.
 | 
						|
 | 
						|
    .. note::
 | 
						|
 | 
						|
        Note that this mixin provides no means of actually firing its events;
 | 
						|
        call :meth:`_fire_events` in sub-classes when device state changes to
 | 
						|
        trigger the events. This should also be called once at the end of
 | 
						|
        initialization to set initial states.
 | 
						|
    """
 | 
						|
    def __init__(self, *args, **kwargs):
 | 
						|
        super(EventsMixin, self).__init__(*args, **kwargs)
 | 
						|
        self._active_event = Event()
 | 
						|
        self._inactive_event = Event()
 | 
						|
        self._when_activated = None
 | 
						|
        self._when_deactivated = None
 | 
						|
        self._last_state = None
 | 
						|
        self._last_changed = time()
 | 
						|
 | 
						|
    def wait_for_active(self, timeout=None):
 | 
						|
        """
 | 
						|
        Pause the script until the device is activated, or the timeout is
 | 
						|
        reached.
 | 
						|
 | 
						|
        :param float timeout:
 | 
						|
            Number of seconds to wait before proceeding. If this is ``None``
 | 
						|
            (the default), then wait indefinitely until the device is active.
 | 
						|
        """
 | 
						|
        return self._active_event.wait(timeout)
 | 
						|
 | 
						|
    def wait_for_inactive(self, timeout=None):
 | 
						|
        """
 | 
						|
        Pause the script until the device is deactivated, or the timeout is
 | 
						|
        reached.
 | 
						|
 | 
						|
        :param float timeout:
 | 
						|
            Number of seconds to wait before proceeding. If this is ``None``
 | 
						|
            (the default), then wait indefinitely until the device is inactive.
 | 
						|
        """
 | 
						|
        return self._inactive_event.wait(timeout)
 | 
						|
 | 
						|
    @property
 | 
						|
    def when_activated(self):
 | 
						|
        """
 | 
						|
        The function to run when the device changes state from inactive to
 | 
						|
        active.
 | 
						|
 | 
						|
        This can be set to a function which accepts no (mandatory) parameters,
 | 
						|
        or a Python function which accepts a single mandatory parameter (with
 | 
						|
        as many optional parameters as you like). If the function accepts a
 | 
						|
        single mandatory parameter, the device that activated will be passed
 | 
						|
        as that parameter.
 | 
						|
 | 
						|
        Set this property to ``None`` (the default) to disable the event.
 | 
						|
        """
 | 
						|
        return self._when_activated
 | 
						|
 | 
						|
    @when_activated.setter
 | 
						|
    def when_activated(self, value):
 | 
						|
        self._when_activated = self._wrap_callback(value)
 | 
						|
 | 
						|
    @property
 | 
						|
    def when_deactivated(self):
 | 
						|
        """
 | 
						|
        The function to run when the device changes state from active to
 | 
						|
        inactive.
 | 
						|
 | 
						|
        This can be set to a function which accepts no (mandatory) parameters,
 | 
						|
        or a Python function which accepts a single mandatory parameter (with
 | 
						|
        as many optional parameters as you like). If the function accepts a
 | 
						|
        single mandatory parameter, the device that deactivated will be
 | 
						|
        passed as that parameter.
 | 
						|
 | 
						|
        Set this property to ``None`` (the default) to disable the event.
 | 
						|
        """
 | 
						|
        return self._when_deactivated
 | 
						|
 | 
						|
    @when_deactivated.setter
 | 
						|
    def when_deactivated(self, value):
 | 
						|
        self._when_deactivated = self._wrap_callback(value)
 | 
						|
 | 
						|
    @property
 | 
						|
    def active_time(self):
 | 
						|
        """
 | 
						|
        The length of time (in seconds) that the device has been active for.
 | 
						|
        When the device is inactive, this is ``None``.
 | 
						|
        """
 | 
						|
        if self._active_event.wait(0):
 | 
						|
            return time() - self._last_changed
 | 
						|
        else:
 | 
						|
            return None
 | 
						|
 | 
						|
    @property
 | 
						|
    def inactive_time(self):
 | 
						|
        """
 | 
						|
        The length of time (in seconds) that the device has been inactive for.
 | 
						|
        When the device is active, this is ``None``.
 | 
						|
        """
 | 
						|
        if self._inactive_event.wait(0):
 | 
						|
            return time() - self._last_changed
 | 
						|
        else:
 | 
						|
            return None
 | 
						|
 | 
						|
    def _wrap_callback(self, fn):
 | 
						|
        if fn is None:
 | 
						|
            return None
 | 
						|
        elif not callable(fn):
 | 
						|
            raise BadEventHandler('value must be None or a callable')
 | 
						|
        elif inspect.isbuiltin(fn):
 | 
						|
            # We can't introspect the prototype of builtins. In this case we
 | 
						|
            # assume that the builtin has no (mandatory) parameters; this is
 | 
						|
            # the most reasonable assumption on the basis that pre-existing
 | 
						|
            # builtins have no knowledge of gpiozero, and the sole parameter
 | 
						|
            # we would pass is a gpiozero object
 | 
						|
            return fn
 | 
						|
        else:
 | 
						|
            # Try binding ourselves to the argspec of the provided callable.
 | 
						|
            # If this works, assume the function is capable of accepting no
 | 
						|
            # parameters
 | 
						|
            try:
 | 
						|
                inspect.getcallargs(fn)
 | 
						|
                return fn
 | 
						|
            except TypeError:
 | 
						|
                try:
 | 
						|
                    # If the above fails, try binding with a single parameter
 | 
						|
                    # (ourselves). If this works, wrap the specified callback
 | 
						|
                    inspect.getcallargs(fn, self)
 | 
						|
                    @wraps(fn)
 | 
						|
                    def wrapper():
 | 
						|
                        return fn(self)
 | 
						|
                    return wrapper
 | 
						|
                except TypeError:
 | 
						|
                    raise BadEventHandler(
 | 
						|
                        'value must be a callable which accepts up to one '
 | 
						|
                        'mandatory parameter')
 | 
						|
 | 
						|
    def _fire_activated(self):
 | 
						|
        # These methods are largely here to be overridden by descendents
 | 
						|
        if self.when_activated:
 | 
						|
            self.when_activated()
 | 
						|
 | 
						|
    def _fire_deactivated(self):
 | 
						|
        # These methods are largely here to be overridden by descendents
 | 
						|
        if self.when_deactivated:
 | 
						|
            self.when_deactivated()
 | 
						|
 | 
						|
    def _fire_events(self):
 | 
						|
        old_state = self._last_state
 | 
						|
        new_state = self._last_state = self.is_active
 | 
						|
        if old_state is None:
 | 
						|
            # Initial "indeterminate" state; set events but don't fire
 | 
						|
            # callbacks as there's not necessarily an edge
 | 
						|
            if new_state:
 | 
						|
                self._active_event.set()
 | 
						|
            else:
 | 
						|
                self._inactive_event.set()
 | 
						|
        elif old_state != new_state:
 | 
						|
            self._last_changed = time()
 | 
						|
            if new_state:
 | 
						|
                self._inactive_event.clear()
 | 
						|
                self._active_event.set()
 | 
						|
                self._fire_activated()
 | 
						|
            else:
 | 
						|
                self._active_event.clear()
 | 
						|
                self._inactive_event.set()
 | 
						|
                self._fire_deactivated()
 | 
						|
 | 
						|
 | 
						|
class HoldMixin(EventsMixin):
 | 
						|
    def __init__(self, *args, **kwargs):
 | 
						|
        super(HoldMixin, self).__init__(*args, **kwargs)
 | 
						|
        self._when_held = None
 | 
						|
        self._held_from = None
 | 
						|
        self._hold_time = 1
 | 
						|
        self._hold_repeat = False
 | 
						|
        self._hold_thread = HoldThread(self)
 | 
						|
 | 
						|
    def close(self):
 | 
						|
        if self._hold_thread:
 | 
						|
            self._hold_thread.stop()
 | 
						|
            self._hold_thread = None
 | 
						|
        try:
 | 
						|
            super(HoldMixin, self).close()
 | 
						|
        except AttributeError:
 | 
						|
            pass
 | 
						|
 | 
						|
    def _fire_activated(self):
 | 
						|
        super(HoldMixin, self)._fire_activated()
 | 
						|
        self._hold_thread.holding.set()
 | 
						|
 | 
						|
    def _fire_deactivated(self):
 | 
						|
        self._held_from = None
 | 
						|
        super(HoldMixin, self)._fire_deactivated()
 | 
						|
 | 
						|
    def _fire_held(self):
 | 
						|
        if self.when_held:
 | 
						|
            self.when_held()
 | 
						|
 | 
						|
    @property
 | 
						|
    def when_held(self):
 | 
						|
        """
 | 
						|
        The function to run when the device has remained active for
 | 
						|
        :attr:`hold_time` seconds.
 | 
						|
 | 
						|
        This can be set to a function which accepts no (mandatory) parameters,
 | 
						|
        or a Python function which accepts a single mandatory parameter (with
 | 
						|
        as many optional parameters as you like). If the function accepts a
 | 
						|
        single mandatory parameter, the device that activated will be passed
 | 
						|
        as that parameter.
 | 
						|
 | 
						|
        Set this property to ``None`` (the default) to disable the event.
 | 
						|
        """
 | 
						|
        return self._when_held
 | 
						|
 | 
						|
    @when_held.setter
 | 
						|
    def when_held(self, value):
 | 
						|
        self._when_held = self._wrap_callback(value)
 | 
						|
 | 
						|
    @property
 | 
						|
    def hold_time(self):
 | 
						|
        """
 | 
						|
        The length of time (in seconds) to wait after the device is activated,
 | 
						|
        until executing the :attr:`when_held` handler. If :attr:`hold_repeat`
 | 
						|
        is True, this is also the length of time between invocations of
 | 
						|
        :attr:`when_held`.
 | 
						|
        """
 | 
						|
        return self._hold_time
 | 
						|
 | 
						|
    @hold_time.setter
 | 
						|
    def hold_time(self, value):
 | 
						|
        if value < 0:
 | 
						|
            raise BadWaitTime('hold_time must be 0 or greater')
 | 
						|
        self._hold_time = float(value)
 | 
						|
 | 
						|
    @property
 | 
						|
    def hold_repeat(self):
 | 
						|
        """
 | 
						|
        If ``True``, :attr:`when_held` will be executed repeatedly with
 | 
						|
        :attr:`hold_time` seconds between each invocation.
 | 
						|
        """
 | 
						|
        return self._hold_repeat
 | 
						|
 | 
						|
    @hold_repeat.setter
 | 
						|
    def hold_repeat(self, value):
 | 
						|
        self._hold_repeat = bool(value)
 | 
						|
 | 
						|
    @property
 | 
						|
    def is_held(self):
 | 
						|
        """
 | 
						|
        When ``True``, the device has been active for at least
 | 
						|
        :attr:`hold_time` seconds.
 | 
						|
        """
 | 
						|
        return self._held_from is not None
 | 
						|
 | 
						|
    @property
 | 
						|
    def held_time(self):
 | 
						|
        """
 | 
						|
        The length of time (in seconds) that the device has been held for.
 | 
						|
        This is counted from the first execution of the :attr:`when_held` event
 | 
						|
        rather than when the device activated, in contrast to
 | 
						|
        :attr:`~EventsMixin.active_time`. If the device is not currently held,
 | 
						|
        this is ``None``.
 | 
						|
        """
 | 
						|
        if self._held_from is not None:
 | 
						|
            return time() - self._held_from
 | 
						|
        else:
 | 
						|
            return None
 | 
						|
 | 
						|
 | 
						|
class HoldThread(GPIOThread):
 | 
						|
    """
 | 
						|
    Extends :class:`GPIOThread`. Provides a background thread that repeatedly
 | 
						|
    fires the :attr:`HoldMixin.when_held` event as long as the owning
 | 
						|
    device is active.
 | 
						|
    """
 | 
						|
    def __init__(self, parent):
 | 
						|
        super(HoldThread, self).__init__(target=self.held, args=(parent,))
 | 
						|
        self.holding = Event()
 | 
						|
        self.start()
 | 
						|
 | 
						|
    def held(self, parent):
 | 
						|
        while not self.stopping.wait(0):
 | 
						|
            if self.holding.wait(0.1):
 | 
						|
                self.holding.clear()
 | 
						|
                while not (
 | 
						|
                        self.stopping.wait(0) or
 | 
						|
                        parent._inactive_event.wait(parent.hold_time)
 | 
						|
                        ):
 | 
						|
                    if parent._held_from is None:
 | 
						|
                        parent._held_from = time()
 | 
						|
                    parent._fire_held()
 | 
						|
                    if not parent.hold_repeat:
 | 
						|
                        break
 | 
						|
 | 
						|
 | 
						|
class GPIOQueue(GPIOThread):
 | 
						|
    """
 | 
						|
    Extends :class:`GPIOThread`. Provides a background thread that monitors a
 | 
						|
    device's values and provides a running *average* (defaults to median) of
 | 
						|
    those values. If the *parent* device includes the :class:`EventsMixin` in
 | 
						|
    its ancestry, the thread automatically calls
 | 
						|
    :meth:`~EventsMixin._fire_events`.
 | 
						|
    """
 | 
						|
    def __init__(
 | 
						|
            self, parent, queue_len=5, sample_wait=0.0, partial=False,
 | 
						|
            average=median):
 | 
						|
        assert callable(average)
 | 
						|
        super(GPIOQueue, self).__init__(target=self.fill)
 | 
						|
        if queue_len < 1:
 | 
						|
            raise BadQueueLen('queue_len must be at least one')
 | 
						|
        if sample_wait < 0:
 | 
						|
            raise BadWaitTime('sample_wait must be 0 or greater')
 | 
						|
        self.queue = deque(maxlen=queue_len)
 | 
						|
        self.partial = bool(partial)
 | 
						|
        self.sample_wait = float(sample_wait)
 | 
						|
        self.full = Event()
 | 
						|
        self.parent = weakref.proxy(parent)
 | 
						|
        self.average = average
 | 
						|
 | 
						|
    @property
 | 
						|
    def value(self):
 | 
						|
        if not self.partial:
 | 
						|
            self.full.wait()
 | 
						|
        try:
 | 
						|
            return self.average(self.queue)
 | 
						|
        except ZeroDivisionError:
 | 
						|
            # No data == inactive value
 | 
						|
            return 0.0
 | 
						|
 | 
						|
    def fill(self):
 | 
						|
        try:
 | 
						|
            while (not self.stopping.wait(self.sample_wait) and
 | 
						|
                    len(self.queue) < self.queue.maxlen):
 | 
						|
                self.queue.append(self.parent._read())
 | 
						|
                if self.partial and isinstance(self.parent, EventsMixin):
 | 
						|
                    self.parent._fire_events()
 | 
						|
            self.full.set()
 | 
						|
            while not self.stopping.wait(self.sample_wait):
 | 
						|
                self.queue.append(self.parent._read())
 | 
						|
                if isinstance(self.parent, EventsMixin):
 | 
						|
                    self.parent._fire_events()
 | 
						|
        except ReferenceError:
 | 
						|
            # Parent is dead; time to die!
 | 
						|
            pass
 | 
						|
 |