mirror of
https://github.com/KevinMidboe/python-gpiozero.git
synced 2025-10-29 17:50:37 +00:00
Fix #115
Adds when_held event hook to Button (via extension of the EventsMixin class). Also fixes some minor notes and activates codecov coverage tracking.
This commit is contained in:
@@ -12,6 +12,7 @@ import weakref
|
||||
from functools import wraps
|
||||
from threading import Event
|
||||
from collections import deque
|
||||
from time import time
|
||||
try:
|
||||
from statistics import median
|
||||
except ImportError:
|
||||
@@ -20,10 +21,9 @@ except ImportError:
|
||||
from .threads import GPIOThread
|
||||
from .exc import (
|
||||
BadEventHandler,
|
||||
BadWaitTime,
|
||||
BadQueueLen,
|
||||
DeviceClosed,
|
||||
GPIOBadSourceDelay,
|
||||
GPIOBadQueueLen,
|
||||
GPIOBadSampleWait,
|
||||
)
|
||||
|
||||
class ValuesMixin(object):
|
||||
@@ -89,7 +89,7 @@ class SourceMixin(object):
|
||||
@source_delay.setter
|
||||
def source_delay(self, value):
|
||||
if value < 0:
|
||||
raise GPIOBadSourceDelay('source_delay must be 0 or greater')
|
||||
raise BadWaitTime('source_delay must be 0 or greater')
|
||||
self._source_delay = float(value)
|
||||
|
||||
@property
|
||||
@@ -160,6 +160,7 @@ class EventsMixin(object):
|
||||
self._when_activated = None
|
||||
self._when_deactivated = None
|
||||
self._last_state = None
|
||||
self._last_changed = time()
|
||||
|
||||
def wait_for_active(self, timeout=None):
|
||||
"""
|
||||
@@ -223,6 +224,28 @@ class EventsMixin(object):
|
||||
def when_deactivated(self, value):
|
||||
self._when_deactivated = self._wrap_callback(value)
|
||||
|
||||
@property
|
||||
def active_time(self):
|
||||
"""
|
||||
The length of time (in seconds) that the device has been active for.
|
||||
When the device is inactive, this is ``None``.
|
||||
"""
|
||||
if self._active_event.wait(0):
|
||||
return time() - self._last_changed
|
||||
else:
|
||||
return None
|
||||
|
||||
@property
|
||||
def inactive_time(self):
|
||||
"""
|
||||
The length of time (in seconds) that the device has been inactive for.
|
||||
When the device is inactive, this is ``None``.
|
||||
"""
|
||||
if self._inactive_event.wait(0):
|
||||
return time() - self._last_changed
|
||||
else:
|
||||
return None
|
||||
|
||||
def _wrap_callback(self, fn):
|
||||
if fn is None:
|
||||
return None
|
||||
@@ -256,6 +279,16 @@ class EventsMixin(object):
|
||||
'value must be a callable which accepts up to one '
|
||||
'mandatory parameter')
|
||||
|
||||
def _fire_activated(self):
|
||||
# These methods are largely here to be overridden by descendents
|
||||
if self.when_activated:
|
||||
self.when_activated()
|
||||
|
||||
def _fire_deactivated(self):
|
||||
# These methods are largely here to be overridden by descendents
|
||||
if self.when_deactivated:
|
||||
self.when_deactivated()
|
||||
|
||||
def _fire_events(self):
|
||||
old_state = self._last_state
|
||||
new_state = self._last_state = self.is_active
|
||||
@@ -266,17 +299,143 @@ class EventsMixin(object):
|
||||
self._active_event.set()
|
||||
else:
|
||||
self._inactive_event.set()
|
||||
else:
|
||||
if not old_state and new_state:
|
||||
elif old_state != new_state:
|
||||
self._last_changed = time()
|
||||
if new_state:
|
||||
self._inactive_event.clear()
|
||||
self._active_event.set()
|
||||
if self.when_activated:
|
||||
self.when_activated()
|
||||
elif old_state and not new_state:
|
||||
self._fire_activated()
|
||||
else:
|
||||
self._active_event.clear()
|
||||
self._inactive_event.set()
|
||||
if self.when_deactivated:
|
||||
self.when_deactivated()
|
||||
self._fire_deactivated()
|
||||
|
||||
|
||||
class HoldMixin(EventsMixin):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(HoldMixin, self).__init__(*args, **kwargs)
|
||||
self._when_held = None
|
||||
self._held_from = None
|
||||
self._hold_time = 1
|
||||
self._hold_repeat = False
|
||||
self._hold_thread = HoldThread(self)
|
||||
|
||||
def close(self):
|
||||
if self._hold_thread:
|
||||
self._hold_thread.stop()
|
||||
self._hold_thread = None
|
||||
try:
|
||||
super(HoldMixin, self).close()
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
def _fire_activated(self):
|
||||
super(HoldMixin, self)._fire_activated()
|
||||
self._hold_thread.holding.set()
|
||||
|
||||
def _fire_deactivated(self):
|
||||
self._held_from = None
|
||||
super(HoldMixin, self)._fire_deactivated()
|
||||
|
||||
def _fire_held(self):
|
||||
if self.when_held:
|
||||
self.when_held()
|
||||
|
||||
@property
|
||||
def when_held(self):
|
||||
"""
|
||||
The function to run when the device has remained active for
|
||||
:attr:`hold_time` seconds.
|
||||
|
||||
This can be set to a function which accepts no (mandatory) parameters,
|
||||
or a Python function which accepts a single mandatory parameter (with
|
||||
as many optional parameters as you like). If the function accepts a
|
||||
single mandatory parameter, the device that activated will be passed
|
||||
as that parameter.
|
||||
|
||||
Set this property to ``None`` (the default) to disable the event.
|
||||
"""
|
||||
return self._when_held
|
||||
|
||||
@when_held.setter
|
||||
def when_held(self, value):
|
||||
self._when_held = self._wrap_callback(value)
|
||||
|
||||
@property
|
||||
def hold_time(self):
|
||||
"""
|
||||
The length of time (in seconds) to wait after the device is activated,
|
||||
until executing the :attr:`when_held` handler. If :attr:`hold_repeat`
|
||||
is True, this is also the length of time between invocations of
|
||||
:attr:`when_held`.
|
||||
"""
|
||||
return self._hold_time
|
||||
|
||||
@hold_time.setter
|
||||
def hold_time(self, value):
|
||||
if value < 0:
|
||||
raise BadWaitTime('source_delay must be 0 or greater')
|
||||
self._hold_time = float(value)
|
||||
|
||||
@property
|
||||
def hold_repeat(self):
|
||||
"""
|
||||
If ``True``, :attr:`when_held` will be executed repeatedly with
|
||||
:attr:`hold_time` seconds between each invocation.
|
||||
"""
|
||||
return self._hold_repeat
|
||||
|
||||
@hold_repeat.setter
|
||||
def hold_repeat(self, value):
|
||||
self._hold_repeat = bool(value)
|
||||
|
||||
@property
|
||||
def is_held(self):
|
||||
"""
|
||||
When ``True``, the device has been active for at least
|
||||
:attr:`hold_time` seconds.
|
||||
"""
|
||||
return self._held_from is not None
|
||||
|
||||
@property
|
||||
def held_time(self):
|
||||
"""
|
||||
The length of time (in seconds) that the device has been held for.
|
||||
This is counted from the first execution of the :attr:`when_held` event
|
||||
rather than when the device activated, in contrast to
|
||||
:attr:`active_time`. If the device is not currently held, this is
|
||||
``None``.
|
||||
"""
|
||||
if self._held_from is not None:
|
||||
return time() - self._held_from
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
class HoldThread(GPIOThread):
|
||||
"""
|
||||
Extends :class:`GPIOThread`. Provides a background thread that repeatedly
|
||||
fires the :attr:`HoldMixin.when_held` event as long as the owning
|
||||
device is active.
|
||||
"""
|
||||
def __init__(self, parent):
|
||||
super(HoldThread, self).__init__(target=self.held, args=(parent,))
|
||||
self.holding = Event()
|
||||
self.start()
|
||||
|
||||
def held(self, parent):
|
||||
while not self.stopping.wait(0):
|
||||
if self.holding.wait(0.1):
|
||||
self.holding.clear()
|
||||
while not (
|
||||
self.stopping.wait(0) or
|
||||
parent._inactive_event.wait(parent.hold_time)
|
||||
):
|
||||
if parent._held_from is None:
|
||||
parent._held_from = time()
|
||||
parent._fire_held()
|
||||
if not parent.hold_repeat:
|
||||
break
|
||||
|
||||
|
||||
class GPIOQueue(GPIOThread):
|
||||
@@ -293,9 +452,9 @@ class GPIOQueue(GPIOThread):
|
||||
assert callable(average)
|
||||
super(GPIOQueue, self).__init__(target=self.fill)
|
||||
if queue_len < 1:
|
||||
raise GPIOBadQueueLen('queue_len must be at least one')
|
||||
raise BadQueueLen('queue_len must be at least one')
|
||||
if sample_wait < 0:
|
||||
raise GPIOBadSampleWait('sample_wait must be 0 or greater')
|
||||
raise BadWaitTime('sample_wait must be 0 or greater')
|
||||
self.queue = deque(maxlen=queue_len)
|
||||
self.partial = partial
|
||||
self.sample_wait = sample_wait
|
||||
|
||||
Reference in New Issue
Block a user