mirror of
https://github.com/KevinMidboe/python-gpiozero.git
synced 2025-10-29 17:50:37 +00:00
304 lines
9.2 KiB
Python
304 lines
9.2 KiB
Python
from __future__ import division
|
|
|
|
import inspect
|
|
from functools import wraps
|
|
from time import sleep, time
|
|
from threading import Event
|
|
|
|
from RPi import GPIO
|
|
from w1thermsensor import W1ThermSensor
|
|
|
|
from .devices import GPIODeviceError, GPIODevice, GPIOQueue
|
|
|
|
|
|
def _alias(key):
|
|
return property(
|
|
lambda self: getattr(self, key),
|
|
lambda self, val: setattr(self, key, val))
|
|
|
|
|
|
class InputDeviceError(GPIODeviceError):
|
|
pass
|
|
|
|
|
|
class InputDevice(GPIODevice):
|
|
"""
|
|
Generic GPIO Input Device.
|
|
"""
|
|
def __init__(self, pin=None, pull_up=False):
|
|
super(InputDevice, self).__init__(pin)
|
|
self._pull_up = pull_up
|
|
self._active_edge = (GPIO.RISING, GPIO.FALLING)[pull_up]
|
|
self._inactive_edge = (GPIO.FALLING, GPIO.RISING)[pull_up]
|
|
if pull_up:
|
|
self._active_state = GPIO.LOW
|
|
self._inactive_state = GPIO.HIGH
|
|
GPIO.setup(pin, GPIO.IN, (GPIO.PUD_DOWN, GPIO.PUD_UP)[pull_up])
|
|
|
|
@property
|
|
def pull_up(self):
|
|
return self._pull_up
|
|
|
|
def __repr__(self):
|
|
return "<gpiozero.%s object on pin=%d, pull_up=%s, is_active=%s>" % (
|
|
self.__class__.__name__, self.pin, self.pull_up, self.is_active)
|
|
|
|
|
|
class WaitableInputDevice(InputDevice):
|
|
"""
|
|
A time-dependent Generic Input Device.
|
|
"""
|
|
def __init__(self, pin=None, pull_up=False):
|
|
super(WaitableInputDevice, self).__init__(pin, pull_up)
|
|
self._active_event = Event()
|
|
self._inactive_event = Event()
|
|
self._when_activated = None
|
|
self._when_deactivated = None
|
|
self._last_state = None
|
|
|
|
def wait_for_active(self, timeout=None):
|
|
"""
|
|
Halt the program until the device is activated, or the timeout is
|
|
reached.
|
|
|
|
timeout: None
|
|
Number of seconds (?) to wait before proceeding
|
|
"""
|
|
return self._active_event.wait(timeout)
|
|
|
|
def wait_for_inactive(self, timeout=None):
|
|
"""
|
|
Halt the program until the device is inactivated, or the timeout is
|
|
reached.
|
|
|
|
timeout: None
|
|
Number of seconds (?) to wait before proceeding
|
|
"""
|
|
return self._inactive_event.wait(timeout)
|
|
|
|
def _get_when_activated(self):
|
|
return self._when_activated
|
|
|
|
def _set_when_activated(self, value):
|
|
self._when_activated = self._wrap_callback(value)
|
|
|
|
when_activated = property(_get_when_activated, _set_when_activated)
|
|
|
|
def _get_when_deactivated(self):
|
|
return self._when_deactivated
|
|
|
|
def _set_when_deactivated(self, value):
|
|
self._when_deactivated = self._wrap_callback(value)
|
|
|
|
when_deactivated = property(_get_when_deactivated, _set_when_deactivated)
|
|
|
|
def _wrap_callback(self, fn):
|
|
if fn is None:
|
|
return None
|
|
elif not callable(fn):
|
|
raise InputDeviceError('value must be None or a callable')
|
|
else:
|
|
# Try binding ourselves to the argspec of the provided callable.
|
|
# If this works, assume the function is capable of accepting no
|
|
# parameters
|
|
try:
|
|
inspect.getcallargs(fn)
|
|
return fn
|
|
except TypeError:
|
|
try:
|
|
# If the above fails, try binding with a single parameter
|
|
# (ourselves). If this works, wrap the specified callback
|
|
inspect.getcallargs(fn, self)
|
|
@wraps(fn)
|
|
def wrapper():
|
|
return fn(self)
|
|
return wrapper
|
|
except TypeError:
|
|
raise InputDeviceError(
|
|
'value must be a callable which accepts up to one '
|
|
'mandatory parameter')
|
|
|
|
def _fire_events(self):
|
|
old_state = self._last_state
|
|
new_state = self._last_state = self.is_active
|
|
if old_state is None:
|
|
# Initial "indeterminate" state; set events but don't fire
|
|
# callbacks as there's not necessarily an edge
|
|
if new_state:
|
|
self._active_event.set()
|
|
else:
|
|
self._inactive_event.set()
|
|
else:
|
|
if not old_state and new_state:
|
|
self._inactive_event.clear()
|
|
self._active_event.set()
|
|
if self.when_activated:
|
|
self.when_activated()
|
|
elif old_state and not new_state:
|
|
self._active_event.clear()
|
|
self._inactive_event.set()
|
|
if self.when_deactivated:
|
|
self.when_deactivated()
|
|
|
|
|
|
class DigitalInputDevice(WaitableInputDevice):
|
|
"""
|
|
A Generic Digital Input Device.
|
|
"""
|
|
def __init__(self, pin=None, pull_up=False, bouncetime=None):
|
|
super(DigitalInputDevice, self).__init__(pin, pull_up)
|
|
# Yes, that's really the default bouncetime in RPi.GPIO...
|
|
GPIO.add_event_detect(
|
|
self.pin, GPIO.BOTH, callback=self._fire_events,
|
|
bouncetime=-666 if bouncetime is None else bouncetime
|
|
)
|
|
# Call _fire_events once to set initial state of events
|
|
super(DigitalInputDevice, self)._fire_events()
|
|
|
|
def __del__(self):
|
|
GPIO.remove_event_detect(self.pin)
|
|
|
|
def _fire_events(self, channel):
|
|
super(DigitalInputDevice, self)._fire_events()
|
|
|
|
|
|
class SmoothedInputDevice(WaitableInputDevice):
|
|
"""
|
|
A Generic Digital Input Device with background polling.
|
|
"""
|
|
def __init__(
|
|
self, pin=None, pull_up=False, threshold=0.5,
|
|
queue_len=5, sample_wait=0.0, partial=False):
|
|
super(SmoothedInputDevice, self).__init__(pin, pull_up)
|
|
self._queue = GPIOQueue(self, queue_len, sample_wait, partial)
|
|
self.threshold = float(threshold)
|
|
|
|
def __repr__(self):
|
|
if self.partial or self._queue.full.wait(0):
|
|
return super(SmoothedInputDevice, self).__repr__()
|
|
else:
|
|
return "<gpiozero.%s object on pin=%d, pull_up=%s>" % (
|
|
self.__class__.__name__, self.pin, self.pull_up)
|
|
|
|
@property
|
|
def queue_len(self):
|
|
return self._queue.queue.maxlen
|
|
|
|
@property
|
|
def partial(self):
|
|
return self._queue.partial
|
|
|
|
@property
|
|
def value(self):
|
|
return self._queue.value
|
|
|
|
def _get_threshold(self):
|
|
return self._threshold
|
|
|
|
def _set_threshold(self, value):
|
|
if not (0.0 < value < 1.0):
|
|
raise InputDeviceError(
|
|
'threshold must be between zero and one exclusive'
|
|
)
|
|
self._threshold = float(value)
|
|
|
|
threshold = property(_get_threshold, _set_threshold)
|
|
|
|
@property
|
|
def is_active(self):
|
|
return self.value > self.threshold
|
|
|
|
|
|
class Button(DigitalInputDevice):
|
|
"""
|
|
A physical push button or switch.
|
|
"""
|
|
def __init__(self, pin=None, pull_up=True, bouncetime=None):
|
|
super(Button, self).__init__(pin, pull_up, bouncetime)
|
|
|
|
when_pressed = _alias('when_activated')
|
|
when_released = _alias('when_deactivated')
|
|
|
|
wait_for_press = _alias('wait_for_active')
|
|
wait_for_release = _alias('wait_for_inactive')
|
|
|
|
|
|
class MotionSensor(SmoothedInputDevice):
|
|
"""
|
|
A PIR (Passive Infra-Red) motion sensor.
|
|
"""
|
|
def __init__(
|
|
self, pin=None, queue_len=5, sample_rate=10, threshold=0.5,
|
|
partial=False):
|
|
super(MotionSensor, self).__init__(
|
|
pin, pull_up=False, threshold=threshold,
|
|
queue_len=queue_len, sample_wait=1 / sample_rate, partial=partial
|
|
)
|
|
self._queue.start()
|
|
|
|
motion_detected = _alias('is_active')
|
|
|
|
when_motion = _alias('when_activated')
|
|
when_no_motion = _alias('when_deactivated')
|
|
|
|
wait_for_motion = _alias('wait_for_active')
|
|
wait_for_no_motion = _alias('wait_for_inactive')
|
|
|
|
|
|
class LightSensor(SmoothedInputDevice):
|
|
"""
|
|
An LDR (Light Dependent Resistor) Light Sensor.
|
|
"""
|
|
def __init__(
|
|
self, pin=None, queue_len=5, charge_time_limit=0.01,
|
|
threshold=0.1, partial=False):
|
|
super(LightSensor, self).__init__(
|
|
pin, pull_up=False, threshold=threshold,
|
|
queue_len=queue_len, sample_wait=0.0, partial=partial
|
|
)
|
|
self._charge_time_limit = charge_time_limit
|
|
self._charged = Event()
|
|
GPIO.add_event_detect(
|
|
self.pin, GPIO.RISING, lambda channel: self._charged.set()
|
|
)
|
|
self._queue.start()
|
|
|
|
def __del__(self):
|
|
GPIO.remove_event_detect(self.pin)
|
|
|
|
@property
|
|
def charge_time_limit(self):
|
|
return self._charge_time_limit
|
|
|
|
def _read(self):
|
|
# Drain charge from the capacitor
|
|
GPIO.setup(self.pin, GPIO.OUT)
|
|
GPIO.output(self.pin, GPIO.LOW)
|
|
sleep(0.1)
|
|
# Time the charging of the capacitor
|
|
start = time()
|
|
self._charged.clear()
|
|
GPIO.setup(self.pin, GPIO.IN)
|
|
self._charged.wait(self.charge_time_limit)
|
|
return (
|
|
1.0 - min(self.charge_time_limit, time() - start) /
|
|
self.charge_time_limit
|
|
)
|
|
|
|
light_detected = _alias('is_active')
|
|
|
|
when_light = _alias('when_activated')
|
|
when_dark = _alias('when_deactivated')
|
|
|
|
wait_for_light = _alias('wait_for_active')
|
|
wait_for_dark = _alias('wait_for_inactive')
|
|
|
|
|
|
class TemperatureSensor(W1ThermSensor):
|
|
"""
|
|
A Digital Temperature Sensor.
|
|
"""
|
|
@property
|
|
def value(self):
|
|
return self.get_temperature()
|