This PR implements SnowPi, adds the ability for LEDBoard's to own other
LEDBoards as well as LEDs, and enhances blink so that manually
controlling a LED automatically stops it from blinking (no matter
whether it's blinking itself or a LEDBoard is blinking it).

It also fixes up RGBLED and Motor which I managed to break with the last
PR ...
This commit is contained in:
Dave Jones
2016-04-01 16:20:54 +01:00
parent a7b7fc8dec
commit d6af02933a
5 changed files with 187 additions and 77 deletions

View File

@@ -95,6 +95,7 @@ from .boards import (
PiLiterBarGraph,
TrafficLights,
PiTraffic,
SnowPi,
TrafficLightsBuzzer,
FishDish,
TrafficHat,

View File

@@ -37,7 +37,7 @@ class CompositeOutputDevice(SourceMixin, CompositeDevice):
Turn all the output devices on.
"""
for device in self.all:
if isinstance(device, OutputDevice):
if isinstance(device, (OutputDevice, CompositeOutputDevice)):
device.on()
def off(self):
@@ -45,7 +45,7 @@ class CompositeOutputDevice(SourceMixin, CompositeDevice):
Turn all the output devices off.
"""
for device in self.all:
if isinstance(device, OutputDevice):
if isinstance(device, (OutputDevice, CompositeOutputDevice)):
device.off()
def toggle(self):
@@ -54,21 +54,21 @@ class CompositeOutputDevice(SourceMixin, CompositeDevice):
off; if it's off, turn it on.
"""
for device in self.all:
if isinstance(device, OutputDevice):
if isinstance(device, (OutputDevice, CompositeOutputDevice)):
device.toggle()
@property
def value(self):
"""
A tuple containing a value for each subordinate device. This property
can also be set to update the state of all output subordinate devices.
can also be set to update the state of all subordinate output devices.
"""
return super(CompositeOutputDevice, self).value
@value.setter
def value(self, value):
for device, v in zip(self.all, value):
if isinstance(device, OutputDevice):
if isinstance(device, (OutputDevice, CompositeOutputDevice)):
device.value = v
# Simply ignore values for non-output devices
@@ -87,17 +87,32 @@ class LEDCollection(CompositeOutputDevice):
order = kwargs.pop('_order', None)
LEDClass = PWMLED if pwm else LED
super(LEDCollection, self).__init__(
*(LEDClass(pin, active_high, initial_value) for pin in args),
*(
pin_or_collection
if isinstance(pin_or_collection, LEDCollection) else
LEDClass(pin_or_collection, active_high, initial_value)
for pin_or_collection in args
),
_order=order,
**{name: LEDClass(pin, active_high, initial_value) for name, pin in kwargs.items()})
**{
name: pin_or_collection
if isinstance(pin_or_collection, LEDCollection) else
LEDClass(pin_or_collection, active_high, initial_value)
for name, pin_or_collection in kwargs.items()
})
@property
def leds(self):
"""
A tuple of all the :class:`LED` or :class:`PWMLED` objects contained by
the instance.
A flat iterator over all LEDs contained in this collection (and all
sub-collections).
"""
return self.all
for item in self:
if isinstance(item, LEDCollection):
for subitem in item.leds:
yield subitem
else:
yield item
class LEDBoard(LEDCollection):
@@ -115,7 +130,8 @@ class LEDBoard(LEDCollection):
:param int \*pins:
Specify the GPIO pins that the LEDs of the board are attached to. You
can designate as many pins as necessary.
can designate as many pins as necessary. You can also specify
:class:`LEDBoard` instances to create trees of LEDs.
:param bool pwm:
If ``True``, construct :class:`PWMLED` instances for each pin. If
@@ -132,7 +148,18 @@ class LEDBoard(LEDCollection):
``None``, each device will be left in whatever state the pin is found
in when configured for output (warning: this can be on). The ``True``,
the device will be switched on initially.
:param \*\*named_pins:
Sepcify GPIO pins that LEDs of the board are attached to, associated
each LED with a property name. You can designate as many pins as
necessary and any name provided it's not already in use by something
else. You can also specify :class:`LEDBoard` instances to create
trees of LEDs.
"""
def __init__(self, *args, **kwargs):
self._blink_leds = []
self._blink_lock = Lock()
super(LEDBoard, self).__init__(*args, **kwargs)
def close(self):
self._stop_blink()
@@ -181,11 +208,12 @@ class LEDBoard(LEDCollection):
finished (warning: the default value of *n* will result in this
method never returning).
"""
if isinstance(self.leds[0], LED):
if fade_in_time:
raise ValueError('fade_in_time must be 0 with non-PWM LEDs')
if fade_out_time:
raise ValueError('fade_out_time must be 0 with non-PWM LEDs')
for led in self.leds:
if isinstance(led, LED):
if fade_in_time:
raise ValueError('fade_in_time must be 0 with non-PWM LEDs')
if fade_out_time:
raise ValueError('fade_out_time must be 0 with non-PWM LEDs')
self._stop_blink()
self._blink_thread = GPIOThread(
target=self._blink_device,
@@ -196,10 +224,14 @@ class LEDBoard(LEDCollection):
self._blink_thread.join()
self._blink_thread = None
def _stop_blink(self):
if self._blink_thread:
self._blink_thread.stop()
self._blink_thread = None
def _stop_blink(self, led=None):
if led is None:
if self._blink_thread:
self._blink_thread.stop()
self._blink_thread = None
else:
with self._blink_lock:
self._blink_leds.remove(led)
def pulse(self, fade_in_time=1, fade_out_time=1, n=None, background=True):
"""
@@ -243,9 +275,18 @@ class LEDBoard(LEDCollection):
cycle(sequence) if n is None else
chain.from_iterable(repeat(sequence, n))
)
with self._blink_lock:
self._blink_leds = list(self.leds)
for led in self._blink_leds:
if led._controller not in (None, self):
led._controller._stop_blink(led)
led._controller = self
for value, delay in sequence:
for led in self.leds:
led.value = value
with self._blink_lock:
if not self._blink_leds:
break
for led in self._blink_leds:
led._write(value)
if self._blink_thread.stopping.wait(delay):
break
@@ -432,7 +473,7 @@ class TrafficLights(LEDBoard):
class PiTraffic(TrafficLights):
"""
Extends :class:`TrafficLights` for the Low Voltage Labs PI-TRAFFIC:
Extends :class:`TrafficLights` for the `Low Voltage Labs PI-TRAFFIC`_:
vertical traffic lights board when attached to GPIO pins 9, 10, and 11.
There's no need to specify the pins if the PI-TRAFFIC is connected to the
@@ -446,12 +487,54 @@ class PiTraffic(TrafficLights):
To use the PI-TRAFFIC board when attached to a non-standard set of pins,
simply use the parent class, :class:`TrafficLights`.
.. _Low Voltage Labs PI-TRAFFIC: http://lowvoltagelabs.com/products/pi-traffic/
"""
def __init__(self):
super(PiTraffic, self).__init__(9, 10, 11)
class SnowPi(LEDBoard):
"""
Extends :class:`LEDBoard` for the `Ryanteck SnowPi`_ board.
The SnowPi pins are fixed and therefore there's no need to specify them
when constructing this class. The following example turns on the eyes, sets
the nose pulsing, and the arms blinking::
from gpiozero import SnowPi
snowman = SnowPi(pwm=True)
snowman.eyes.on()
snowman.nose.pulse()
snowman.arms.blink()
:param bool pwm:
If ``True``, construct :class:`PWMLED` instances to represent each
LED. If ``False`` (the default), construct regular :class:`LED`
instances.
.. _Ryanteck SnowPi: https://ryanteck.uk/raspberry-pi/114-snowpi-the-gpio-snowman-for-raspberry-pi-0635648608303.html
"""
def __init__(self, pwm=False):
super(SnowPi, self).__init__(
arms=LEDBoard(
left=LEDBoard(
top=17, middle=18, bottom=22, pwm=pwm,
_order=('top', 'middle', 'bottom')),
right=LEDBoard(
top=7, middle=8, bottom=9, pwm=pwm,
_order=('top', 'middle', 'bottom'))
),
eyes=LEDBoard(
left=23, right=24, pwm=pwm,
_order=('left', 'right')
),
nose=25, pwm=pwm
)
class TrafficLightsBuzzer(CompositeOutputDevice):
"""
Extends :class:`CompositeDevice` and is a generic class for HATs with

View File

@@ -356,7 +356,7 @@ class CompositeDevice(Device):
raise CompositeDeviceBadName('%s is a reserved name' % name)
self._all = args + tuple(kwargs[v] for v in self._order)
self._named = kwargs
self._tuple = namedtuple('CompositeDeviceValue', chain(
self._tuple = namedtuple('%sValue' % self.__class__.__name__, chain(
(str(i) for i in range(len(args))), self._order),
rename=True)
@@ -375,18 +375,39 @@ class CompositeDevice(Device):
raise AttributeError("can't set attribute %s" % name)
return super(CompositeDevice, self).__setattr__(name, value)
def __repr__(self):
try:
self._check_open()
return "<gpiozero.%s object containing %d devices: %s and %d unnamed>" % (
self.__class__.__name__,
len(self), ','.join(self._named),
len(self) - len(self._named)
)
except DeviceClosed:
return "<gpiozero.%s object closed>"
def __len__(self):
return len(self._all)
def __getitem__(self, index):
return self._all[index]
def __iter__(self):
return iter(self._all)
@property
def all(self):
# XXX Deprecate this in favour of using the instance as a container
return self._all
def close(self):
for device in self._all:
device.close()
self._all = ()
if self._all:
for device in self._all:
device.close()
@property
def closed(self):
return bool(self._all)
return all(device.closed for device in self)
@property
def tuple(self):
@@ -394,7 +415,7 @@ class CompositeDevice(Device):
@property
def value(self):
return self.tuple(*(device.value for device in self._all))
return self.tuple(*(device.value for device in self))
class GPIODevice(Device):

View File

@@ -34,7 +34,7 @@ class SPIBadArgs(SPIError, ValueError):
class GPIODeviceError(GPIOZeroError):
"Base class for errors specific to the GPIODevice hierarchy"
class GPIODeviceClosed(GPIODeviceError):
class GPIODeviceClosed(GPIODeviceError, DeviceClosed):
"Deprecated descendent of :exc:`DeviceClosed`"
class GPIOPinInUse(GPIODeviceError):

View File

@@ -11,7 +11,7 @@ from threading import Lock
from itertools import repeat, cycle, chain
from .exc import OutputDeviceBadValue, GPIOPinMissing
from .devices import GPIODevice, CompositeDevice, SourceMixin
from .devices import GPIODevice, Device, CompositeDevice, SourceMixin
from .threads import GPIOThread
@@ -121,6 +121,16 @@ class DigitalOutputDevice(OutputDevice):
self._blink_thread = None
super(DigitalOutputDevice, self).__init__(pin, active_high, initial_value)
self._lock = Lock()
self._controller = None
@property
def value(self):
return self._read()
@value.setter
def value(self, value):
self._stop_blink()
self._write(value)
def close(self):
self._stop_blink()
@@ -174,6 +184,9 @@ class DigitalOutputDevice(OutputDevice):
self._blink_thread = None
def _stop_blink(self):
if self._controller:
self._controller._stop_blink(self)
self._controller = None
if self._blink_thread:
self._blink_thread.stop()
self._blink_thread = None
@@ -286,6 +299,7 @@ class PWMOutputDevice(OutputDevice):
"""
def __init__(self, pin=None, active_high=True, initial_value=0, frequency=100):
self._blink_thread = None
self._controller = None
if not 0 <= initial_value <= 1:
raise OutputDeviceBadValue("initial_value must be between 0 and 1")
super(PWMOutputDevice, self).__init__(pin, active_high)
@@ -437,6 +451,9 @@ class PWMOutputDevice(OutputDevice):
)
def _stop_blink(self):
if self._controller:
self._controller._stop_blink(self)
self._controller = None
if self._blink_thread:
self._blink_thread.stop()
self._blink_thread = None
@@ -508,10 +525,10 @@ def _led_property(index, doc=None):
return property(getter, setter, doc=doc)
class RGBLED(SourceMixin, CompositeDevice):
class RGBLED(SourceMixin, Device):
"""
Extends :class:`CompositeDevice` and represents a full color LED component
(composed of red, green, and blue LEDs).
Extends :class:`Device` and represents a full color LED component (composed
of red, green, and blue LEDs).
Connect the common cathode (longest leg) to a ground pin; connect each of
the other legs (representing the red, green, and blue anodes) to any GPIO
@@ -556,6 +573,18 @@ class RGBLED(SourceMixin, CompositeDevice):
green = _led_property(1)
blue = _led_property(2)
def close(self):
if self._leds:
self._stop_blink()
for led in self._leds:
led.close()
self._leds = ()
super(RGBLED, self).close()
@property
def closed(self):
return bool(self._leds)
@property
def value(self):
"""
@@ -569,6 +598,7 @@ class RGBLED(SourceMixin, CompositeDevice):
@value.setter
def value(self, value):
self._stop_blink()
self.red, self.green, self.blue = value
@property
@@ -606,11 +636,6 @@ class RGBLED(SourceMixin, CompositeDevice):
r, g, b = self.value
self.value = (1 - r, 1 - g, 1 - b)
def close(self):
self._stop_blink()
for led in self._leds:
led.close()
def blink(
self, on_time=1, off_time=1, fade_in_time=0, fade_out_time=0,
on_color=(1, 1, 1), off_color=(0, 0, 0), n=None, background=True):
@@ -654,7 +679,8 @@ class RGBLED(SourceMixin, CompositeDevice):
self._blink_thread.join()
self._blink_thread = None
def _stop_blink(self):
def _stop_blink(self, led=None):
# If this is called with a single led, we stop all blinking anyway
if self._blink_thread:
self._blink_thread.stop()
self._blink_thread = None
@@ -687,16 +713,19 @@ class RGBLED(SourceMixin, CompositeDevice):
cycle(sequence) if n is None else
chain.from_iterable(repeat(sequence, n))
)
for l in self._leds:
l._controller = self
for value, delay in sequence:
self._leds[0].value, self._leds[1].value, self._leds[2].value = value
for l, v in zip(self._leds, value):
l._write(v)
if self._blink_thread.stopping.wait(delay):
break
class Motor(SourceMixin, CompositeDevice):
"""
Extends :class:`CompositeDevice` and represents a generic motor connected
to a bi-directional motor driver circuit (i.e. an `H-bridge`_).
Extends :class:`CompositeDevice` and represents a generic motor
connected to a bi-directional motor driver circuit (i.e. an `H-bridge`_).
Attach an `H-bridge`_ motor controller to your Pi; connect a power source
(e.g. a battery pack or the 5V pin) to the controller; connect the outputs
@@ -725,33 +754,9 @@ class Motor(SourceMixin, CompositeDevice):
raise GPIOPinMissing(
'forward and backward pins must be provided'
)
super(Motor, self).__init__()
self._forward = PWMOutputDevice(forward)
self._backward = PWMOutputDevice(backward)
def close(self):
self._forward.close()
self._backward.close()
@property
def closed(self):
return self._forward.closed and self._backward.closed
@property
def forward_device(self):
"""
Returns the `PWMOutputDevice` representing the forward pin of the motor
controller.
"""
return self._forward
@property
def backward_device(self):
"""
Returns the `PWMOutputDevice` representing the backward pin of the
motor controller.
"""
return self._backward
super(Motor, self).__init__(
forward_device=PWMOutputDevice(forward),
backward_device=PWMOutputDevice(backward))
@property
def value(self):
@@ -759,7 +764,7 @@ class Motor(SourceMixin, CompositeDevice):
Represents the speed of the motor as a floating point value between -1
(full speed backward) and 1 (full speed forward).
"""
return self._forward.value - self._backward.value
return self.forward_device.value - self.backward_device.value
@value.setter
def value(self, value):
@@ -788,8 +793,8 @@ class Motor(SourceMixin, CompositeDevice):
The speed at which the motor should turn. Can be any value between
0 (stopped) and the default 1 (maximum speed).
"""
self._backward.off()
self._forward.value = speed
self.backward_device.off()
self.forward_device.value = speed
def backward(self, speed=1):
"""
@@ -799,8 +804,8 @@ class Motor(SourceMixin, CompositeDevice):
The speed at which the motor should turn. Can be any value between
0 (stopped) and the default 1 (maximum speed).
"""
self._forward.off()
self._backward.value = speed
self.forward_device.off()
self.backward_device.value = speed
def reverse(self):
"""
@@ -814,5 +819,5 @@ class Motor(SourceMixin, CompositeDevice):
"""
Stop the motor.
"""
self._forward.off()
self._backward.off()
self.forward_device.off()
self.backward_device.off()