Merge pull request #20 from waveform80/when-active-when-inactive

Nicely named event attributes
This commit is contained in:
Ben Nuttall
2015-09-22 14:48:08 +01:00
3 changed files with 239 additions and 198 deletions

View File

@@ -72,7 +72,6 @@ Alternatively:
```python
from gpiozero import LED
from time import sleep
red = LED(2)
red.blink(1, 1)
@@ -118,7 +117,7 @@ from gpiozero import Button
button = Button(4)
button.wait_for_input()
button.wait_for_press()
print("Button was pressed")
```
@@ -127,44 +126,54 @@ Run a function every time the button is pressed:
```python
from gpiozero import Button
def hello(pin):
print("Button was pressed")
def warning():
print("Don't push the button!")
button = Button(4)
button.add_callback(hello)
button.when_pressed = warning
```
### Motion Sensor
Detect motion:
Detect motion and light an LED when it's detected:
```python
from gpiozero import MotionSensor
from gpiozero import MotionSensor, LED
pir = MotionSensor(5)
led = LED(16)
while True:
if pir.motion_detected:
print("Motion detected")
pir.when_motion = led.on
pir.when_no_motion = led.off
```
### Light Sensor
Retrieve light sensor value:
Wait for light and dark:
```python
from time import sleep
from gpiozero import LightSensor
sensor = LightSensor(18)
while True:
sensor.wait_for_light()
print("It's light! :)")
sensor.wait_for_dark()
print("It's dark :(")
```
Run a function when the light changes:
```python
from gpiozero import LightSensor, LED
sensor = LightSensor(18)
led = LED(16)
sensor.when_dark = led.on
sensor.when_light = led.off
while True:
sleep(1)
```
### Temperature Sensor
@@ -196,3 +205,4 @@ sleep(5)
left_motor.off()
right_motor.off()
```

View File

@@ -1,4 +1,6 @@
import weakref
from threading import Thread, Event
from collections import deque
from RPi import GPIO
@@ -12,8 +14,14 @@ class GPIODevice(object):
if pin is None:
raise GPIODeviceError('No GPIO pin number given')
self._pin = pin
self._active_state = 1
self._inactive_state = 0
self._active_state = GPIO.HIGH
self._inactive_state = GPIO.LOW
def _read(self):
return GPIO.input(self.pin) == self._active_state
def _fire_events(self):
pass
@property
def pin(self):
@@ -21,7 +29,7 @@ class GPIODevice(object):
@property
def is_active(self):
return GPIO.input(self.pin) == self._active_state
return self._read()
_GPIO_THREADS = set()
@@ -48,3 +56,44 @@ class GPIOThread(Thread):
self.stopping.set()
self.join()
_GPIO_THREADS.discard(self)
class GPIOQueue(GPIOThread):
def __init__(self, parent, queue_len=5, sample_wait=0.0, partial=False):
assert isinstance(parent, GPIODevice)
super(GPIOQueue, self).__init__(target=self.fill)
if queue_len < 1:
raise InputDeviceError('queue_len must be at least one')
self.queue = deque(maxlen=queue_len)
self.partial = partial
self.sample_wait = sample_wait
self.full = Event()
self.parent = weakref.proxy(parent)
@property
def value(self):
if not self.partial:
self.full.wait()
try:
return sum(self.queue) / len(self.queue)
except ZeroDivisionError:
# No data == inactive value
return 0.0
def fill(self):
try:
while (
not self.stopping.wait(self.sample_wait) and
len(self.queue) < self.queue.maxlen
):
self.queue.append(self.parent._read())
if self.partial:
self.parent._fire_events()
self.full.set()
while not self.stopping.wait(self.sample_wait):
self.queue.append(self.parent._read())
self.parent._fire_events()
except ReferenceError:
# Parent is dead; time to die!
pass

View File

@@ -2,12 +2,17 @@ from __future__ import division
from time import sleep, time
from threading import Event
from collections import deque
from RPi import GPIO
from w1thermsensor import W1ThermSensor
from .devices import GPIODeviceError, GPIODevice, GPIOThread
from .devices import GPIODeviceError, GPIODevice, GPIOQueue
def _alias(key):
return property(
lambda self: getattr(self, key),
lambda self, val: setattr(self, key, val))
class InputDeviceError(GPIODeviceError):
@@ -15,194 +20,175 @@ class InputDeviceError(GPIODeviceError):
class InputDevice(GPIODevice):
def __init__(self, pin=None, pull_up=True):
def __init__(self, pin=None, pull_up=False):
super(InputDevice, self).__init__(pin)
self._pull_up = pull_up
self._edge = (GPIO.RISING, GPIO.FALLING)[pull_up]
self._active_edge = (GPIO.RISING, GPIO.FALLING)[pull_up]
self._inactive_edge = (GPIO.FALLING, GPIO.RISING)[pull_up]
if pull_up:
self._active_state = 0
self._inactive_state = 1
pull = GPIO.PUD_UP if pull_up else GPIO.PUD_DOWN
GPIO.setup(pin, GPIO.IN, pull)
self._active_state = GPIO.LOW
self._inactive_state = GPIO.HIGH
GPIO.setup(pin, GPIO.IN, (GPIO.PUD_DOWN, GPIO.PUD_UP)[pull_up])
@property
def pull_up(self):
return self._pull_up
class Button(InputDevice):
pass
class WaitableInputDevice(InputDevice):
def __init__(self, pin=None, pull_up=False):
super(WaitableInputDevice, self).__init__(pin, pull_up)
self._active_event = Event()
self._inactive_event = Event()
self._when_activated = None
self._when_deactivated = None
self._last_state = None
def wait_for_active(self, timeout=None):
return self._active_event.wait(timeout)
def wait_for_inactive(self, timeout=None):
return self._inactive_event.wait(timeout)
def _get_when_activated(self):
return self._when_activated
def _set_when_activated(self, value):
if not callable(value) and value is not None:
raise InputDeviceError('value must be None or a function')
self._when_activated = value
when_activated = property(_get_when_activated, _set_when_activated)
def _get_when_deactivated(self):
return self._when_deactivated
def _set_when_deactivated(self, value):
if not callable(value) and value is not None:
raise InputDeviceError('value must be None or a function')
self._when_deactivated = value
when_deactivated = property(_get_when_deactivated, _set_when_deactivated)
def _fire_events(self):
old_state = self._last_state
new_state = self._last_state = self.is_active
if old_state is None:
# Initial "indeterminate" state; set events but don't fire
# callbacks as there's not necessarily an edge
if new_state:
self._active_event.set()
else:
self._inactive_event.set()
else:
if not old_state and new_state:
self._inactive_event.clear()
self._active_event.set()
if self.when_activated:
self.when_activated()
elif old_state and not new_state:
self._active_event.clear()
self._inactive_event.set()
if self.when_deactivated:
self.when_deactivated()
class MotionSensor(InputDevice):
class DigitalInputDevice(WaitableInputDevice):
def __init__(self, pin=None, pull_up=False, bouncetime=None):
super(DigitalInputDevice, self).__init__(pin, pull_up)
# Yes, that's really the default bouncetime in RPi.GPIO...
GPIO.add_event_detect(
self.pin, GPIO.BOTH, callback=self._fire_events,
bouncetime=-666 if bouncetime is None else bouncetime)
# Call _fire_events once to set initial state of events
super(DigitalInputDevice, self)._fire_events()
def __del__(self):
GPIO.remove_event_detect(self.pin)
def _fire_events(self, channel):
super(DigitalInputDevice, self)._fire_events()
class SmoothedInputDevice(WaitableInputDevice):
def __init__(
self, pin=None, pull_up=False, threshold=0.5,
queue_len=5, sample_wait=0.0, partial=False):
super(SmoothedInputDevice, self).__init__(pin, pull_up)
self._queue = GPIOQueue(self, queue_len, sample_wait, partial)
self.threshold = float(threshold)
@property
def queue_len(self):
return self._queue.queue.maxlen
@property
def partial(self):
return self._queue.partial
@property
def value(self):
return self._queue.value
def _get_threshold(self):
return self._threshold
def _set_threshold(self, value):
if not (0.0 < value < 1.0):
raise InputDeviceError('threshold must be between zero and one exclusive')
self._threshold = float(value)
threshold = property(_get_threshold, _set_threshold)
@property
def is_active(self):
return self.value > self.threshold
class Button(DigitalInputDevice):
def __init__(self, pin=None, pull_up=True, bouncetime=None):
super(Button, self).__init__(pin, pull_up, bouncetime)
when_pressed = _alias('when_activated')
when_released = _alias('when_deactivated')
wait_for_press = _alias('wait_for_active')
wait_for_release = _alias('wait_for_inactive')
class MotionSensor(SmoothedInputDevice):
def __init__(
self, pin=None, queue_len=5, sample_rate=10, threshold=0.5,
partial=False):
super(MotionSensor, self).__init__(pin, pull_up=False)
if queue_len < 1:
raise InputDeviceError('queue_len must be at least one')
self.sample_rate = sample_rate
self.threshold = threshold
self.partial = partial
self._queue = deque(maxlen=queue_len)
self._queue_full = Event()
self._queue_thread = GPIOThread(target=self._fill_queue)
self._queue_thread.start()
super(MotionSensor, self).__init__(
pin, pull_up=False, threshold=threshold,
queue_len=queue_len, sample_wait=1 / sample_rate, partial=partial)
self._queue.start()
@property
def queue_len(self):
return self._queue.maxlen
motion_detected = _alias('is_active')
@property
def value(self):
if not self.partial:
self._queue_full.wait()
try:
return sum(self._queue) / len(self._queue)
except ZeroDivisionError:
# No data == no motion
return 0.0
when_motion = _alias('when_activated')
when_no_motion = _alias('when_deactivated')
@property
def motion_detected(self):
return self.value > self.threshold
def _get_sample_rate(self):
return self._sample_rate
def _set_sample_rate(self, value):
if value <= 0:
raise InputDeviceError('sample_rate must be greater than zero')
self._sample_rate = value
sample_rate = property(_get_sample_rate, _set_sample_rate)
def _get_threshold(self):
return self._threshold
def _set_threshold(self, value):
if value < 0:
raise InputDeviceError('threshold must be zero or more')
self._threshold = value
threshold = property(_get_threshold, _set_threshold)
def _fill_queue(self):
while (
not self._queue_thread.stopping.wait(1 / self.sample_rate) and
len(self._queue) < self._queue.maxlen
):
self._queue.append(self.is_active)
self._queue_full.set()
while not self._queue_thread.stopping.wait(1 / self.sample_rate):
self._queue.append(self.is_active)
wait_for_motion = _alias('wait_for_active')
wait_for_no_motion = _alias('wait_for_inactive')
class LightSensor(InputDevice):
class LightSensor(SmoothedInputDevice):
def __init__(
self, pin=None, queue_len=5, darkness_time=0.01,
self, pin=None, queue_len=5, charge_time_limit=0.01,
threshold=0.1, partial=False):
super(LightSensor, self).__init__(pin, pull_up=False)
if queue_len < 1:
raise InputDeviceError('queue_len must be at least one')
self.darkness_time = darkness_time
self.threshold = threshold
self.partial = partial
super(LightSensor, self).__init__(
pin, pull_up=False, threshold=threshold,
queue_len=queue_len, sample_wait=0.0, partial=partial)
self._charge_time_limit = charge_time_limit
self._charged = Event()
GPIO.add_event_detect(
self.pin, GPIO.RISING, lambda channel: self._charged.set()
)
self._queue = deque(maxlen=queue_len)
self._queue_full = Event()
self._queue_thread = GPIOThread(target=self._fill_queue)
self._last_state = None
self._when_light = None
self._when_dark = None
self._when_light_event = Event()
self._when_dark_event = Event()
self._queue_thread.start()
GPIO.add_event_detect(self.pin, GPIO.RISING, lambda channel: self._charged.set())
self._queue.start()
def __del__(self):
GPIO.remove_event_detect(self.pin)
@property
def queue_len(self):
return self._queue.maxlen
def charge_time_limit(self):
return self._charge_time_limit
@property
def value(self):
if not self.partial:
self._queue_full.wait()
try:
return (
1.0 - (sum(self._queue) / len(self._queue)) /
self.darkness_time
)
except ZeroDivisionError:
# No data == no light
return 0.0
@property
def light_detected(self):
return self.value > self.threshold
def _get_when_light(self):
return self._when_light
def _set_when_light(self, value):
if not callable(value) and value is not None:
raise InputDeviceError('when_light must be None or a function')
self._when_light = value
when_light = property(_get_when_light, _set_when_light)
def _get_when_dark(self):
return self._when_dark
def _set_when_dark(self, value):
if not callable(value) and value is not None:
raise InputDeviceError('when_dark must be None or a function')
self._when_dark = value
def wait_for_light(self, timeout=None):
self._when_light_event.wait(timeout)
def wait_for_dark(self, timeout=None):
self._when_dark_event.wait(timeout)
def _get_darkness_time(self):
return self._darkness_time
def _set_darkness_time(self, value):
if value <= 0.0:
raise InputDeviceError('darkness_time must be greater than zero')
self._darkness_time = value
# XXX Empty the queue and restart the thread
darkness_time = property(_get_darkness_time, _set_darkness_time)
def _get_threshold(self):
return self._threshold
def _set_threshold(self, value):
if value < 0:
raise InputDeviceError('threshold must be zero or more')
self._threshold = value
threshold = property(_get_threshold, _set_threshold)
def _fill_queue(self):
try:
while (not self._queue_thread.stopping.is_set() and
len(self._queue) < self._queue.maxlen):
self._queue.append(self._time_charging())
if self.partial:
self._fire_events()
self._queue_full.set()
while not self._queue_thread.stopping.is_set():
self._queue.append(self._time_charging())
self._fire_events()
finally:
GPIO.remove_event_detect(self.pin)
def _time_charging(self):
def _read(self):
# Drain charge from the capacitor
GPIO.setup(self.pin, GPIO.OUT)
GPIO.output(self.pin, GPIO.LOW)
@@ -211,25 +197,21 @@ class LightSensor(InputDevice):
start = time()
self._charged.clear()
GPIO.setup(self.pin, GPIO.IN)
self._charged.wait(self.darkness_time)
return min(self.darkness_time, time() - start)
self._charged.wait(self.charge_time_limit)
return 1.0 - min(self.charge_time_limit, time() - start) / self.charge_time_limit
light_detected = _alias('is_active')
when_light = _alias('when_activated')
when_dark = _alias('when_deactivated')
wait_for_light = _alias('wait_for_active')
wait_for_dark = _alias('wait_for_inactive')
def _fire_events(self):
last_state = self._last_state
self._last_state = state = self.light_detected
if not last_state and state:
self._when_dark_event.clear()
self._when_light_event.set()
if self.when_light:
self.when_light()
elif last_state and not state:
self._when_light_event.clear()
self._when_dark_event.set()
if self.when_dark:
self.when_dark()
class TemperatureSensor(W1ThermSensor):
@property
def value(self):
return self.get_temperature()