mirror of
https://github.com/KevinMidboe/python-gpiozero.git
synced 2025-10-29 17:50:37 +00:00
Lots of fixes
* Move `is_active` to `GPIODevice`; it's equally applicable to inputs and outputs so there's no point having it just in inputs * Flip the pull-up status for `MotionSensor` (it was backwards leading to reversed readings from the sensor) * Add a `threshold` to `MotionSensor` (optional), and `value` (similar to `LightSensor`) * Also expose `pull_up` as a simple bool property * Rejig `LightSensor` so it also derives from `InputDevice` (it inherits enough to make it worthwhile) and so that its API is similar to `MotionSensor` (a `value` property with a `*_detected` property, and a background threaded queue which constantly monitors values)
This commit is contained in:
@@ -12,11 +12,17 @@ class GPIODevice(object):
|
|||||||
if pin is None:
|
if pin is None:
|
||||||
raise GPIODeviceError('No GPIO pin number given')
|
raise GPIODeviceError('No GPIO pin number given')
|
||||||
self._pin = pin
|
self._pin = pin
|
||||||
|
self._active_state = 1
|
||||||
|
self._inactive_state = 0
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def pin(self):
|
def pin(self):
|
||||||
return self._pin
|
return self._pin
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_active(self):
|
||||||
|
return GPIO.input(self.pin) == self._active_state
|
||||||
|
|
||||||
|
|
||||||
_GPIO_THREADS = set()
|
_GPIO_THREADS = set()
|
||||||
def _gpio_threads_shutdown():
|
def _gpio_threads_shutdown():
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
from __future__ import division
|
from __future__ import division
|
||||||
|
|
||||||
from time import sleep
|
from time import sleep, time
|
||||||
from threading import Event
|
from threading import Event
|
||||||
from collections import deque
|
from collections import deque
|
||||||
|
|
||||||
@@ -15,17 +15,18 @@ class InputDeviceError(GPIODeviceError):
|
|||||||
|
|
||||||
|
|
||||||
class InputDevice(GPIODevice):
|
class InputDevice(GPIODevice):
|
||||||
def __init__(self, pin=None):
|
def __init__(self, pin=None, pull_up=True):
|
||||||
super(InputDevice, self).__init__(pin)
|
super(InputDevice, self).__init__(pin)
|
||||||
self._pull = GPIO.PUD_UP
|
self._pull_up = pull_up
|
||||||
self._edge = GPIO.FALLING
|
self._edge = (GPIO.RISING, GPIO.FALLING)[pull_up]
|
||||||
self._active_state = 0
|
if pull_up:
|
||||||
self._inactive_state = 1
|
self._active_state = 0
|
||||||
GPIO.setup(pin, GPIO.IN, self._pull)
|
self._inactive_state = 1
|
||||||
|
GPIO.setup(pin, GPIO.IN, (GPIO.PUD_DOWN, GPIO.PUD_UP)[pull_up])
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_active(self):
|
def pull_up(self):
|
||||||
return GPIO.input(self.pin) == self._active_state
|
return self._pull_up
|
||||||
|
|
||||||
def wait_for_input(self):
|
def wait_for_input(self):
|
||||||
GPIO.wait_for_edge(self.pin, self._edge)
|
GPIO.wait_for_edge(self.pin, self._edge)
|
||||||
@@ -44,9 +45,14 @@ class Button(InputDevice):
|
|||||||
|
|
||||||
|
|
||||||
class MotionSensor(InputDevice):
|
class MotionSensor(InputDevice):
|
||||||
def __init__(self, pin=None, queue_len=20, sample_rate=10, partial=False):
|
def __init__(
|
||||||
super(MotionSensor, self).__init__(pin)
|
self, pin=None, queue_len=5, sample_rate=10, threshold=0.5,
|
||||||
|
partial=False):
|
||||||
|
super(MotionSensor, self).__init__(pin, pull_up=False)
|
||||||
|
if queue_len < 1:
|
||||||
|
raise InputDeviceError('queue_len must be at least one')
|
||||||
self.sample_rate = sample_rate
|
self.sample_rate = sample_rate
|
||||||
|
self.threshold = threshold
|
||||||
self.partial = partial
|
self.partial = partial
|
||||||
self._queue = deque(maxlen=queue_len)
|
self._queue = deque(maxlen=queue_len)
|
||||||
self._queue_full = Event()
|
self._queue_full = Event()
|
||||||
@@ -57,6 +63,36 @@ class MotionSensor(InputDevice):
|
|||||||
def queue_len(self):
|
def queue_len(self):
|
||||||
return self._queue.maxlen
|
return self._queue.maxlen
|
||||||
|
|
||||||
|
@property
|
||||||
|
def value(self):
|
||||||
|
if not self.partial:
|
||||||
|
self._queue_full.wait()
|
||||||
|
try:
|
||||||
|
return sum(self._queue) / len(self._queue)
|
||||||
|
except ZeroDivisionError:
|
||||||
|
# No data == no motion
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def motion_detected(self):
|
||||||
|
return self.value > self.threshold
|
||||||
|
|
||||||
|
def _get_sample_rate(self):
|
||||||
|
return self._sample_rate
|
||||||
|
def _set_sample_rate(self, value):
|
||||||
|
if value <= 0:
|
||||||
|
raise InputDeviceError('sample_rate must be greater than zero')
|
||||||
|
self._sample_rate = value
|
||||||
|
sample_rate = property(_get_sample_rate, _set_sample_rate)
|
||||||
|
|
||||||
|
def _get_threshold(self):
|
||||||
|
return self._threshold
|
||||||
|
def _set_threshold(self, value):
|
||||||
|
if value < 0:
|
||||||
|
raise InputDeviceError('threshold must be zero or more')
|
||||||
|
self._threshold = value
|
||||||
|
threshold = property(_get_threshold, _set_threshold)
|
||||||
|
|
||||||
def _fill_queue(self):
|
def _fill_queue(self):
|
||||||
while (
|
while (
|
||||||
not self._queue_thread.stopping.wait(1 / self.sample_rate) and
|
not self._queue_thread.stopping.wait(1 / self.sample_rate) and
|
||||||
@@ -67,53 +103,83 @@ class MotionSensor(InputDevice):
|
|||||||
while not self._queue_thread.stopping.wait(1 / self.sample_rate):
|
while not self._queue_thread.stopping.wait(1 / self.sample_rate):
|
||||||
self._queue.append(self.is_active)
|
self._queue.append(self.is_active)
|
||||||
|
|
||||||
@property
|
|
||||||
def motion_detected(self):
|
|
||||||
if not self.partial:
|
|
||||||
self._queue_full.wait()
|
|
||||||
return sum(self._queue) > (len(self._queue) / 2)
|
|
||||||
|
|
||||||
|
class LightSensor(InputDevice):
|
||||||
class LightSensor(object):
|
def __init__(
|
||||||
def __init__(self, pin=None, darkness_level=0.01):
|
self, pin=None, queue_len=5, darkness_time=0.01,
|
||||||
if pin is None:
|
threshold=0.1, partial=False):
|
||||||
raise InputDeviceError('No GPIO pin number given')
|
super(LightSensor, self).__init__(pin, pull_up=False)
|
||||||
|
if queue_len < 1:
|
||||||
self._pin = pin
|
raise InputDeviceError('queue_len must be at least one')
|
||||||
self.darkness_level = darkness_level
|
self.darkness_time = darkness_time
|
||||||
|
self.threshold = threshold
|
||||||
|
self.partial = partial
|
||||||
|
self._charged = Event()
|
||||||
|
GPIO.add_event_detect(self.pin, GPIO.RISING, lambda channel: self._charged.set())
|
||||||
|
self._queue = deque(maxlen=queue_len)
|
||||||
|
self._queue_full = Event()
|
||||||
|
self._queue_thread = GPIOThread(target=self._fill_queue)
|
||||||
|
self._queue_thread.start()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def pin(self):
|
def queue_len(self):
|
||||||
return self._pin
|
return self._queue.maxlen
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def value(self):
|
def value(self):
|
||||||
return self._get_average_light_level(5)
|
if not self.partial:
|
||||||
|
self._queue_full.wait()
|
||||||
|
try:
|
||||||
|
return 1.0 - (sum(self._queue) / len(self._queue)) / self.darkness_time
|
||||||
|
except ZeroDivisionError:
|
||||||
|
# No data == no light
|
||||||
|
return 0.0
|
||||||
|
|
||||||
def _get_light_level(self):
|
@property
|
||||||
time_taken = self._time_charging_light_capacitor()
|
def light_detected(self):
|
||||||
value = 100 * time_taken / self.darkness_level
|
return self.value > self.threshold
|
||||||
return 100 - value
|
|
||||||
|
|
||||||
def _time_charging_light_capacitor(self):
|
def _get_darkness_time(self):
|
||||||
|
return self._darkness_time
|
||||||
|
def _set_darkness_time(self, value):
|
||||||
|
if value <= 0.0:
|
||||||
|
raise InputDeviceError('darkness_time must be greater than zero')
|
||||||
|
self._darkness_time = value
|
||||||
|
# XXX Empty the queue and restart the thread
|
||||||
|
darkness_time = property(_get_darkness_time, _set_darkness_time)
|
||||||
|
|
||||||
|
def _get_threshold(self):
|
||||||
|
return self._threshold
|
||||||
|
def _set_threshold(self, value):
|
||||||
|
if value < 0:
|
||||||
|
raise InputDeviceError('threshold must be zero or more')
|
||||||
|
self._threshold = value
|
||||||
|
threshold = property(_get_threshold, _set_threshold)
|
||||||
|
|
||||||
|
def _fill_queue(self):
|
||||||
|
try:
|
||||||
|
while (
|
||||||
|
not self._queue_thread.stopping.is_set() and
|
||||||
|
len(self._queue) < self._queue.maxlen
|
||||||
|
):
|
||||||
|
self._queue.append(self._time_charging())
|
||||||
|
self._queue_full.set()
|
||||||
|
while not self._queue_thread.stopping.is_set():
|
||||||
|
self._queue.append(self._time_charging())
|
||||||
|
finally:
|
||||||
|
GPIO.remove_event_detect(self.pin)
|
||||||
|
|
||||||
|
def _time_charging(self):
|
||||||
|
# Drain charge from the capacitor
|
||||||
GPIO.setup(self.pin, GPIO.OUT)
|
GPIO.setup(self.pin, GPIO.OUT)
|
||||||
GPIO.output(self.pin, GPIO.LOW)
|
GPIO.output(self.pin, GPIO.LOW)
|
||||||
sleep(0.1)
|
sleep(0.1)
|
||||||
|
# Time the charging of the capacitor
|
||||||
|
start = time()
|
||||||
|
self._charged.clear()
|
||||||
GPIO.setup(self.pin, GPIO.IN)
|
GPIO.setup(self.pin, GPIO.IN)
|
||||||
start_time = time()
|
self._charged.wait(self.darkness_time)
|
||||||
end_time = time()
|
return min(self.darkness_time, time() - start)
|
||||||
while (
|
|
||||||
GPIO.input(self.pin) == GPIO.LOW and
|
|
||||||
time() - start_time < self.darkness_level
|
|
||||||
):
|
|
||||||
end_time = time()
|
|
||||||
time_taken = end_time - start_time
|
|
||||||
return min(time_taken, self.darkness_level)
|
|
||||||
|
|
||||||
def _get_average_light_level(self, num):
|
|
||||||
values = [self._get_light_level() for n in range(num)]
|
|
||||||
average_value = sum(values) / len(values)
|
|
||||||
return average_value
|
|
||||||
|
|
||||||
|
|
||||||
class TemperatureSensor(W1ThermSensor):
|
class TemperatureSensor(W1ThermSensor):
|
||||||
|
|||||||
Reference in New Issue
Block a user