Files
python-gpiozero/gpiozero/input_devices.py
2016-01-30 21:58:00 +00:00

750 lines
27 KiB
Python

from __future__ import (
unicode_literals,
print_function,
absolute_import,
division,
)
import inspect
import warnings
from functools import wraps
from time import sleep, time
from threading import Event
from RPi import GPIO
from spidev import SpiDev
from .exc import InputDeviceError, GPIODeviceError, GPIODeviceClosed
from .devices import GPIODevice, CompositeDevice, GPIOQueue
class InputDevice(GPIODevice):
"""
Represents a generic GPIO input device.
This class extends `GPIODevice` to add facilities common to GPIO input
devices. The constructor adds the optional `pull_up` parameter to specify
how the pin should be pulled by the internal resistors. The `is_active`
property is adjusted accordingly so that `True` still means active
regardless of the `pull_up` setting.
pin: `None`
The GPIO pin (in BCM numbering) that the device is connected to. If
this is `None` a GPIODeviceError will be raised.
pull_up: `False`
If `True`, the pin will be pulled high with an internal resistor. If
`False` (the default), the pin will be pulled low.
"""
def __init__(self, pin=None, pull_up=False):
if pin in (2, 3) and not pull_up:
raise InputDeviceError(
'GPIO pins 2 and 3 are fitted with physical pull up '
'resistors; you cannot initialize them with pull_up=False'
)
# _pull_up should be assigned first as __repr__ relies upon it to
# support the case where __repr__ is called during debugging of an
# instance that has failed to initialize (due to an exception in the
# super-class __init__)
self._pull_up = pull_up
super(InputDevice, self).__init__(pin)
self._active_edge = GPIO.FALLING if pull_up else GPIO.RISING
self._inactive_edge = GPIO.RISING if pull_up else GPIO.FALLING
self._active_state = GPIO.LOW if pull_up else GPIO.HIGH
self._inactive_state = GPIO.HIGH if pull_up else GPIO.LOW
pull = GPIO.PUD_UP if pull_up else GPIO.PUD_DOWN
try:
# NOTE: catch_warnings isn't thread-safe but hopefully no-one's
# messing around with GPIO init within background threads...
with warnings.catch_warnings(record=True) as w:
GPIO.setup(pin, GPIO.IN, pull)
# The only warning we want to squash is a RuntimeWarning that is
# thrown when setting pins 2 or 3. Anything else should be replayed
for warning in w:
if warning.category != RuntimeWarning or pin not in (2, 3):
warnings.showwarning(
warning.message, warning.category, warning.filename,
warning.lineno, warning.file, warning.line
)
except:
self.close()
raise
@property
def pull_up(self):
"""
If `True`, the device uses a pull-up resistor to set the GPIO pin
"high" by default. Defaults to `False`.
"""
return self._pull_up
def __repr__(self):
try:
return "<gpiozero.%s object on pin=%d, pull_up=%s, is_active=%s>" % (
self.__class__.__name__, self.pin, self.pull_up, self.is_active)
except:
return super(InputDevice, self).__repr__()
class WaitableInputDevice(InputDevice):
"""
Represents a generic input device with distinct waitable states.
This class extends `InputDevice` with methods for waiting on the device's
status (`wait_for_active` and `wait_for_inactive`), and properties that
hold functions to be called when the device changes state (`when_activated`
and `when_deactivated`). These are aliased appropriately in various
subclasses.
Note that this class provides no means of actually firing its events; it's
effectively an abstract base class.
"""
def __init__(self, pin=None, pull_up=False):
super(WaitableInputDevice, self).__init__(pin, pull_up)
self._active_event = Event()
self._inactive_event = Event()
self._when_activated = None
self._when_deactivated = None
self._last_state = None
def wait_for_active(self, timeout=None):
"""
Pause the script until the device is activated, or the timeout is
reached.
timeout: `None`
Number of seconds to wait before proceeding. If this is `None` (the
default), then wait indefinitely until the device is active.
"""
return self._active_event.wait(timeout)
def wait_for_inactive(self, timeout=None):
"""
Pause the script until the device is deactivated, or the timeout is
reached.
timeout: `None`
Number of seconds to wait before proceeding. If this is `None` (the
default), then wait indefinitely until the device is inactive.
"""
return self._inactive_event.wait(timeout)
@property
def when_activated(self):
"""
The function to run when the device changes state from inactive to
active.
This can be set to a function which accepts no (mandatory) parameters,
or a Python function which accepts a single mandatory parameter (with
as many optional parameters as you like). If the function accepts a
single mandatory parameter, the device that activated will be passed
as that parameter.
Set this property to `None` (the default) to disable the event.
See also: when_deactivated.
"""
return self._when_activated
@when_activated.setter
def when_activated(self, value):
self._when_activated = self._wrap_callback(value)
@property
def when_deactivated(self):
"""
The function to run when the device changes state from active to
inactive.
This can be set to a function which accepts no (mandatory) parameters,
or a Python function which accepts a single mandatory parameter (with
as many optional parameters as you like). If the function accepts a
single mandatory parameter, the device that deactivated will be
passed as that parameter.
Set this property to `None` (the default) to disable the event.
See also: when_activated.
"""
return self._when_deactivated
@when_deactivated.setter
def when_deactivated(self, value):
self._when_deactivated = self._wrap_callback(value)
def _wrap_callback(self, fn):
if fn is None:
return None
elif not callable(fn):
raise InputDeviceError('value must be None or a callable')
elif inspect.isbuiltin(fn):
# We can't introspect the prototype of builtins. In this case we
# assume that the builtin has no (mandatory) parameters; this is
# the most reasonable assumption on the basis that pre-existing
# builtins have no knowledge of gpiozero, and the sole parameter
# we would pass is a gpiozero object
return fn
else:
# Try binding ourselves to the argspec of the provided callable.
# If this works, assume the function is capable of accepting no
# parameters
try:
inspect.getcallargs(fn)
return fn
except TypeError:
try:
# If the above fails, try binding with a single parameter
# (ourselves). If this works, wrap the specified callback
inspect.getcallargs(fn, self)
@wraps(fn)
def wrapper():
return fn(self)
return wrapper
except TypeError:
raise InputDeviceError(
'value must be a callable which accepts up to one '
'mandatory parameter')
def _fire_events(self):
old_state = self._last_state
new_state = self._last_state = self.is_active
if old_state is None:
# Initial "indeterminate" state; set events but don't fire
# callbacks as there's not necessarily an edge
if new_state:
self._active_event.set()
else:
self._inactive_event.set()
else:
if not old_state and new_state:
self._inactive_event.clear()
self._active_event.set()
if self.when_activated:
self.when_activated()
elif old_state and not new_state:
self._active_event.clear()
self._inactive_event.set()
if self.when_deactivated:
self.when_deactivated()
class DigitalInputDevice(WaitableInputDevice):
"""
Represents a generic input device with typical on/off behaviour.
This class extends `WaitableInputDevice` with machinery to fire the active
and inactive events for devices that operate in a typical digital manner:
straight forward on / off states with (reasonably) clean transitions
between the two.
bounce_time: `None`
Specifies the length of time (in seconds) that the component will
ignore changes in state after an initial change. This defaults to
`None` which indicates that no bounce compensation will be performed.
"""
def __init__(self, pin=None, pull_up=False, bounce_time=None):
super(DigitalInputDevice, self).__init__(pin, pull_up)
try:
# Yes, that's really the default bouncetime in RPi.GPIO...
GPIO.add_event_detect(
self.pin, GPIO.BOTH, callback=self._fire_events,
bouncetime=-666 if bounce_time is None else int(bounce_time * 1000)
)
# Call _fire_events once to set initial state of events
super(DigitalInputDevice, self)._fire_events()
except:
self.close()
raise
def _fire_events(self, channel):
super(DigitalInputDevice, self)._fire_events()
class SmoothedInputDevice(WaitableInputDevice):
"""
Represents a generic input device which takes its value from the mean of a
queue of historical values.
This class extends `WaitableInputDevice` with a queue which is filled by a
background thread which continually polls the state of the underlying
device. The mean of the values in the queue is compared to a threshold
which is used to determine the state of the `is_active` property.
This class is intended for use with devices which either exhibit analog
behaviour (such as the charging time of a capacitor with an LDR), or those
which exhibit "twitchy" behaviour (such as certain motion sensors).
threshold: `0.5`
The value above which the device will be considered "on".
queue_len: `5`
The length of the internal queue which is filled by the background
thread.
sample_wait: `0.0`
The length of time to wait between retrieving the state of the
underlying device. Defaults to 0.0 indicating that values are retrieved
as fast as possible.
partial: `False`
If `False` (the default), attempts to read the state of the device
(from the `is_active` property) will block until the queue has filled.
If `True`, a value will be returned immediately, but be aware that this
value is likely to fluctuate excessively.
"""
def __init__(
self, pin=None, pull_up=False, threshold=0.5,
queue_len=5, sample_wait=0.0, partial=False):
self._queue = None
super(SmoothedInputDevice, self).__init__(pin, pull_up)
try:
self._queue = GPIOQueue(self, queue_len, sample_wait, partial)
self.threshold = float(threshold)
except:
self.close()
raise
def close(self):
try:
self._queue.stop()
except AttributeError:
# If the queue isn't initialized (it's None) ignore the error
# because we're trying to close anyway
if self._queue is not None:
raise
except RuntimeError:
# Cannot join thread before it starts; we don't care about this
# because we're trying to close the thread anyway
pass
else:
self._queue = None
super(SmoothedInputDevice, self).close()
def __repr__(self):
try:
self._check_open()
except GPIODeviceClosed:
return super(SmoothedInputDevice, self).__repr__()
else:
if self.partial or self._queue.full.wait(0):
return super(SmoothedInputDevice, self).__repr__()
else:
return "<gpiozero.%s object on pin=%d, pull_up=%s>" % (
self.__class__.__name__, self.pin, self.pull_up)
@property
def queue_len(self):
"""
The length of the internal queue of values which is averaged to
determine the overall state of the device. This defaults to `5`.
"""
self._check_open()
return self._queue.queue.maxlen
@property
def partial(self):
"""
If `False` (the default), attempts to read the `value` or `is_active`
properties will block until the queue has filled.
"""
self._check_open()
return self._queue.partial
@property
def value(self):
"""
Returns the mean of the values in the internal queue. This is
compared to `threshold` to determine whether `is_active` is `True`.
"""
self._check_open()
return self._queue.value
@property
def threshold(self):
"""
If `value` exceeds this amount, then `is_active` will return `True`.
"""
return self._threshold
@threshold.setter
def threshold(self, value):
if not (0.0 < value < 1.0):
raise InputDeviceError(
'threshold must be between zero and one exclusive'
)
self._threshold = float(value)
@property
def is_active(self):
"""
Returns `True` if the device is currently active and `False` otherwise.
"""
return self.value > self.threshold
class Button(DigitalInputDevice):
"""
A physical push button or switch.
A typical configuration of such a device is to connect a GPIO pin to one
side of the switch, and ground to the other (the default `pull_up` value
is `True`).
"""
def __init__(self, pin=None, pull_up=True, bounce_time=None):
super(Button, self).__init__(pin, pull_up, bounce_time)
Button.is_pressed = Button.is_active
Button.when_pressed = Button.when_activated
Button.when_released = Button.when_deactivated
Button.wait_for_press = Button.wait_for_active
Button.wait_for_release = Button.wait_for_inactive
class LineSensor(DigitalInputDevice):
"""
A single sensor line detector.
"""
def __init__(self, pin=None, pull_up=True, bounce_time=None):
super(LineSensor, self).__init__(pin, pull_up, bounce_time)
LineSensor.line_detected = LineSensor.is_active
LineSensor.when_line = LineSensor.when_activated
LineSensor.when_no_line = LineSensor.when_deactivated
LineSensor.wait_for_line = LineSensor.wait_for_active
LineSensor.wait_for_no_line = LineSensor.wait_for_inactive
class MotionSensor(SmoothedInputDevice):
"""
A PIR (Passive Infra-Red) motion sensor.
A typical PIR device has a small circuit board with three pins: VCC, OUT,
and GND. VCC should be connected to the Pi's +5V pin, GND to one of the
Pi's ground pins, and finally OUT to the GPIO specified as the value of the
`pin` parameter in the constructor.
This class defaults `queue_len` to 1, effectively removing the averaging
of the internal queue. If your PIR sensor has a short fall time and is
particularly "jittery" you may wish to set this to a higher value (e.g. 5)
to mitigate this.
"""
def __init__(
self, pin=None, queue_len=1, sample_rate=10, threshold=0.5,
partial=False):
super(MotionSensor, self).__init__(
pin, pull_up=False, threshold=threshold,
queue_len=queue_len, sample_wait=1 / sample_rate, partial=partial
)
try:
self._queue.start()
except:
self.close()
raise
MotionSensor.motion_detected = MotionSensor.is_active
MotionSensor.when_motion = MotionSensor.when_activated
MotionSensor.when_no_motion = MotionSensor.when_deactivated
MotionSensor.wait_for_motion = MotionSensor.wait_for_active
MotionSensor.wait_for_no_motion = MotionSensor.wait_for_inactive
class LightSensor(SmoothedInputDevice):
"""
An LDR (Light Dependent Resistor) Light Sensor.
A typical LDR circuit connects one side of the LDR to the 3v3 line from the
Pi, and the other side to a GPIO pin, and a capacitor tied to ground. This
class repeatedly discharges the capacitor, then times the duration it takes
to charge (which will vary according to the light falling on the LDR).
"""
def __init__(
self, pin=None, queue_len=5, charge_time_limit=0.01,
threshold=0.1, partial=False):
super(LightSensor, self).__init__(
pin, pull_up=False, threshold=threshold,
queue_len=queue_len, sample_wait=0.0, partial=partial
)
try:
self._charge_time_limit = charge_time_limit
self._charged = Event()
GPIO.add_event_detect(
self.pin, GPIO.RISING, lambda channel: self._charged.set()
)
self._queue.start()
except:
self.close()
raise
@property
def charge_time_limit(self):
return self._charge_time_limit
def _read(self):
# Drain charge from the capacitor
GPIO.setup(self.pin, GPIO.OUT)
GPIO.output(self.pin, GPIO.LOW)
sleep(0.1)
# Time the charging of the capacitor
start = time()
self._charged.clear()
GPIO.setup(self.pin, GPIO.IN)
self._charged.wait(self.charge_time_limit)
return (
1.0 - min(self.charge_time_limit, time() - start) /
self.charge_time_limit
)
LightSensor.light_detected = LightSensor.is_active
LightSensor.when_light = LightSensor.when_activated
LightSensor.when_dark = LightSensor.when_deactivated
LightSensor.wait_for_light = LightSensor.wait_for_active
LightSensor.wait_for_dark = LightSensor.wait_for_inactive
class AnalogInputDevice(CompositeDevice):
"""
Represents an analog input device connected to SPI (serial interface).
"""
def __init__(self, device=0, bits=None):
if bits is None:
raise InputDeviceError('you must specify the bit resolution of the device')
if device not in (0, 1):
raise InputDeviceError('device must be 0 or 1')
self._device = device
self._bits = bits
self._spi = SpiDev()
self._spi.open(0, self.device)
super(AnalogInputDevice, self).__init__()
def close(self):
"""
Shut down the device and release all associated resources.
"""
if self._spi:
s = self._spi
self._spi = None
s.close()
super(AnalogInputDevice, self).close()
@property
def bits(self):
"""
The bit-resolution of the device/channel.
"""
return self._bits
@property
def bus(self):
"""
The SPI bus that the device is connected to. As the Pi only has a
single (user accessible) SPI bus, this always returns 0.
"""
return 0
@property
def device(self):
"""
The select pin that the device is connected to. The Pi has two select
pins so this will be 0 or 1.
"""
return self._device
def _read(self):
raise NotImplementedError
@property
def value(self):
"""
The current value read from the device, scaled to a value between 0 and
1.
"""
return self._read() / (2**self.bits - 1)
@property
def raw_value(self):
"""
The raw value as read from the device.
"""
return self._read()
class MCP3xxx(AnalogInputDevice):
def __init__(self, channel=0, device=0, bits=10, differential=False):
self._channel = channel
self._bits = bits
self._differential = bool(differential)
super(MCP3xxx, self).__init__(device, bits)
@property
def channel(self):
"""
The channel to read data from. The MCP3008/3208/3304 have 8 channels
(0-7), while the MCP3004/3204/3302 have 4 channels (0-3), and the
MCP3301 only has 2 channels.
"""
return self._channel
@property
def differential(self):
"""
If True, the device is operated in pseudo-differential mode. In this
mode one channel (specified by the channel attribute) is read relative
to the value of a second channel (implied by the chip's design).
Please refer to the device data-sheet to determine which channel is
used as the relative base value (for example, when using an MCP3008
in differential mode, channel 0 is read relative to channel 1).
"""
return self._differential
def _read(self):
# MCP3008/04 or MCP3208/04 protocol looks like the following:
#
# Byte 0 1 2
# ==== ======== ======== ========
# Tx 0001MCCC xxxxxxxx xxxxxxxx
# Rx xxxxxxxx x0RRRRRR RRRRxxxx for the 3004/08
# Rx xxxxxxxx x0RRRRRR RRRRRRxx for the 3204/08
#
# The transmit bits start with 3 preamble bits "000" (to warm up), a
# start bit "1" followed by the mode bit(M) which is 1 for single-ended
# read, and 0 for differential read, followed by 3-bits for the channel
# (C). The remainder of the transmission are "don't care" bits (x).
#
# The first byte received and the top 1 bit of the second byte are
# don't care bits (x). These are followed by a null bit (0), and then
# the result bits (R). 10 bits for the MCP300x, 12 bits for the
# MCP320x.
#
# XXX Differential mode still requires testing
data = self._spi.xfer2([16 + [8, 0][self.differential] + self.channel, 0, 0])
return ((data[1] & 63) << (self.bits - 6)) | (data[2] >> (14 - self.bits))
class MCP33xx(MCP3xxx):
def __init__(self, channel=0, device=0, differential=False):
super(MCP33xx, self).__init__(channel, device, 12, differential)
def _read(self):
# MCP3308/04 protocol looks like the following:
#
# Byte 0 1 2
# ==== ======== ======== ========
# Tx 0001MCCC xxxxxxxx xxxxxxxx
# Rx xxxxxxxx x0SRRRRR RRRRRRRx
#
# The transmit bits start with 3 preamble bits "000" (to warm up), a
# start bit "1" followed by the mode bit(M) which is 1 for single-ended
# read, and 0 for differential read, followed by 3-bits for the channel
# (C). The remainder of the transmission are "don't care" bits (x).
#
# The first byte received and the top 1 bit of the second byte are
# don't care bits (x). These are followed by a null bit (0), then the
# sign bit (S), and then the 12 result bits (R).
#
# In single read mode (the default) the sign bit is always zero and the
# result is effectively 12-bits. In differential mode, the sign bit is
# significant and the result is a two's-complement 13-bit value.
#
# The MCP3301 variant of the chip always operates in differential
# mode and effectively only has one channel (composed of an IN+ and
# IN-). As such it requires no input, just output. This is the reason
# we split out _send() below; so that MCP3301 can override it.
data = self._spi.xfer2(self._send())
# Extract the last two bytes (again, for MCP3301)
data = data[-2:]
result = ((data[0] & 63) << 7) | (data[1] >> 1)
# Account for the sign bit
if self.differential and value > 4095:
result = -(8192 - result)
assert -4096 <= result < 4096
return result
def _send(self):
return [16 + [8, 0][self.differential] + self.channel, 0, 0]
class MCP3004(MCP3xxx):
"""
The MCP3004 is a 10-bit analog to digital converter with 4 channels (0-3).
"""
def __init__(self, channel=0, device=0, differential=False):
if not 0 <= channel < 4:
raise InputDeviceError('channel must be between 0 and 3')
super(MCP3004, self).__init__(channel, device, 10, differential)
class MCP3008(MCP3xxx):
"""
The MCP3008 is a 10-bit analog to digital converter with 8 channels (0-7).
"""
def __init__(self, channel=0, device=0, differential=False):
if not 0 <= channel < 8:
raise InputDeviceError('channel must be between 0 and 7')
super(MCP3008, self).__init__(channel, device, 10, differential)
class MCP3204(MCP3xxx):
"""
The MCP3204 is a 12-bit analog to digital converter with 4 channels (0-3).
"""
def __init__(self, channel=0, device=0, differential=False):
if not 0 <= channel < 4:
raise InputDeviceError('channel must be between 0 and 3')
super(MCP3204, self).__init__(channel, device, 12, differential)
class MCP3208(MCP3xxx):
"""
The MCP3208 is a 12-bit analog to digital converter with 8 channels (0-7).
"""
def __init__(self, channel=0, device=0, differential=False):
if not 0 <= channel < 8:
raise InputDeviceError('channel must be between 0 and 7')
super(MCP3208, self).__init__(channel, device, 12, differential)
class MCP3301(MCP33xx):
"""
The MCP3301 is a signed 13-bit analog to digital converter. Please note
that the MCP3301 always operates in differential mode between its two
channels and the output value is scaled from -1 to +1.
"""
def __init__(self, device=0):
super(MCP3301, self).__init__(0, device, differential=True)
def _send(self):
return [0, 0]
class MCP3302(MCP33xx):
"""
The MCP3302 is a 12/13-bit analog to digital converter with 4 channels
(0-3). When operated in differential mode, the device outputs a signed
13-bit value which is scaled from -1 to +1. When operated in single-ended
mode (the default), the device outputs an unsigned 12-bit value scaled from
0 to 1.
"""
def __init__(self, channel=0, device=0, differential=False):
if not 0 <= channel < 4:
raise InputDeviceError('channel must be between 0 and 4')
super(MCP3302, self).__init__(channel, device, differential)
class MCP3304(MCP33xx):
"""
The MCP3304 is a 12/13-bit analog to digital converter with 8 channels
(0-7). When operated in differential mode, the device outputs a signed
13-bit value which is scaled from -1 to +1. When operated in single-ended
mode (the default), the device outputs an unsigned 12-bit value scaled from
0 to 1.
"""
def __init__(self, channel=0, device=0, differential=False):
if not 0 <= channel < 8:
raise InputDeviceError('channel must be between 0 and 7')
super(MCP3304, self).__init__(channel, device, differential)