New source tools: booleanized, pre_periodic_filtered & post_periodic_filtered

Also adds extra parameter validation to the existing source tools, adds input
min and max to inverted, and adds many more source tools unit tests.
This commit is contained in:
Andrew Scheller
2016-04-25 10:41:27 +01:00
parent 848d030ac9
commit 01d5cb928f
3 changed files with 330 additions and 19 deletions

View File

@@ -10,7 +10,7 @@ the :attr:`~gpiozero.SourceMixin.source` and
library. These utility routines are in the ``tools`` module of GPIO Zero and
are typically imported as follows::
from gpiozero.tools import scaled, negated, conjunction
from gpiozero.tools import scaled, negated, all_values
Given that :attr:`~gpiozero.SourceMixin.source` and
:attr:`~gpiozero.ValuesMixin.values` deal with infinite iterators, another
@@ -29,6 +29,8 @@ Single source conversions
.. autofunction:: absoluted
.. autofunction:: booleanized
.. autofunction:: clamped
.. autofunction:: inverted
@@ -37,8 +39,12 @@ Single source conversions
.. autofunction:: post_delayed
.. autofunction:: post_periodic_filtered
.. autofunction:: pre_delayed
.. autofunction:: pre_periodic_filtered
.. autofunction:: quantized
.. autofunction:: queued
@@ -56,10 +62,10 @@ Combining sources
.. autofunction:: averaged
.. autofunction:: summed
.. autofunction:: multiplied
.. autofunction:: summed
Artificial sources
==================

View File

@@ -41,10 +41,13 @@ def negated(values):
yield not v
def inverted(values):
def inverted(values, input_min=0, input_max=1):
"""
Returns the inversion of the supplied values (1 becomes 0, 0 becomes 1,
0.1 becomes 0.9, etc.). For example::
Returns the inversion of the supplied values (*input_min* becomes
*input_max*, *input_max* becomes *input_min*, `input_min + 0.1` becomes
`input_max - 0.1`, etc.). All items in *values* are assumed to be between
*input_min* and *input_max* (which default to 0 and 1 respectively), and
the output will be in the same range. For example::
from gpiozero import MCP3008, PWMLED
from gpiozero.tools import inverted
@@ -55,8 +58,10 @@ def inverted(values):
led.source = inverted(pot.values)
pause()
"""
if input_min >= input_max:
raise ValueError('input_min must be smaller than input_max')
for v in values:
yield 1 - v
yield input_min + input_max - v
def scaled(values, output_min, output_max, input_min=0, input_max=1):
@@ -82,6 +87,8 @@ def scaled(values, output_min, output_max, input_min=0, input_max=1):
*input_max* (inclusive) then the function will not produce values that
lie within *output_min* to *output_max* (inclusive).
"""
if input_min >= input_max:
raise ValueError('input_min must be smaller than input_max')
input_size = input_max - input_min
output_size = output_max - output_min
for v in values:
@@ -104,6 +111,8 @@ def clamped(values, output_min=0, output_max=1):
led.source = clamped(pot.values, 0.5, 1.0)
pause()
"""
if output_min >= output_max:
raise ValueError('output_min must be smaller than output_max')
for v in values:
yield min(max(v, output_min), output_max)
@@ -128,11 +137,11 @@ def absoluted(values):
yield abs(v)
def quantized(values, steps, output_min=0, output_max=1):
def quantized(values, steps, input_min=0, input_max=1):
"""
Returns *values* quantized to *steps* increments. All items in *values* are
assumed to be between *output_min* and *output_max* (use :func:`scaled` to
ensure this if necessary).
assumed to be between *input_min* and *input_max* (which default to 0 and
1 respectively), and the output will be in the same range.
For example, to quantize values between 0 and 1 to 5 "steps" (0.0, 0.25,
0.5, 0.75, 1.0)::
@@ -146,9 +155,72 @@ def quantized(values, steps, output_min=0, output_max=1):
led.source = quantized(pot.values, 4)
pause()
"""
output_size = output_max - output_min
for v in scaled(values, 0, 1, output_min, output_max):
yield ((int(v * steps) / steps) * output_size) + output_min
if steps < 1:
raise ValueError("steps must be 1 or larger")
if input_min >= input_max:
raise ValueError('input_min must be smaller than input_max')
input_size = input_max - input_min
for v in scaled(values, 0, 1, input_min, input_max):
yield ((int(v * steps) / steps) * input_size) + input_min
def booleanized(values, min_value, max_value, hysteresis=0):
"""
Returns True for each item in *values* between *min_value* and
*max_value*, and False otherwise. *hysteresis* can optionally be used to
add `hysteresis`_ which prevents the output value rapidly flipping when
the input value is fluctuating near the *min_value* or *max_value*
thresholds. For example, to light an LED only when a potentiometer is
between 1/4 and 3/4 of its full range::
from gpiozero import LED, MCP3008
from gpiozero.tools import booleanized
from signal import pause
led = LED(4)
pot = MCP3008(channel=0)
led.source = booleanized(pot.values, 0.25, 0.75)
pause()
.. _hysteresis: https://en.wikipedia.org/wiki/Hysteresis
"""
if min_value >= max_value:
raise ValueError('min_value must be smaller than max_value')
min_value = float(min_value)
max_value = float(max_value)
if hysteresis < 0:
raise ValueError("hysteresis must be 0 or larger")
else:
hysteresis = float(hysteresis)
if (max_value - min_value) <= hysteresis:
raise ValueError('The gap between min_value and max_value must be larger than hysteresis')
last_state = None
for v in values:
if v < min_value:
new_state = 'below'
elif v > max_value:
new_state = 'above'
else:
new_state = 'in'
switch = False
if last_state == None or not hysteresis:
switch = True
elif new_state == last_state:
pass
else: # new_state != last_state
if last_state == 'below' and new_state == 'in':
switch = v >= min_value + hysteresis
elif last_state == 'in' and new_state == 'below':
switch = v < min_value - hysteresis
elif last_state == 'in' and new_state == 'above':
switch = v > max_value + hysteresis
elif last_state == 'above' and new_state == 'in':
switch = v <= max_value - hysteresis
else: # above->below or below->above
switch = True
if switch:
last_state = new_state
yield last_state == 'in'
def all_values(*values):
@@ -284,6 +356,8 @@ def queued(values, qsize):
leds[4].source = btn.values
pause()
"""
if qsize < 1:
raise ValueError("qsize must be 1 or larger")
q = []
it = iter(values)
for i in range(qsize):
@@ -308,9 +382,11 @@ def smoothed(values, qsize, average=mean):
from gpiozero.tools import smoothed
with MCP3008(channel=0) as adc:
for smoothvalue in smoothed(adc.values, 5):
print smoothvalue
for value in smoothed(adc.values, 5):
print value
"""
if qsize < 1:
raise ValueError("qsize must be 1 or larger")
q = []
it = iter(values)
for i in range(qsize):
@@ -327,6 +403,8 @@ def pre_delayed(values, delay):
"""
Waits for *delay* seconds before returning each item from *values*.
"""
if delay < 0:
raise ValueError("delay must be 0 or larger")
for v in values:
sleep(delay)
yield v
@@ -336,11 +414,78 @@ def post_delayed(values, delay):
"""
Waits for *delay* seconds after returning each item from *values*.
"""
if delay < 0:
raise ValueError("delay must be 0 or larger")
for v in values:
yield v
sleep(delay)
def pre_periodic_filtered(values, block, repeat_after):
"""
Blocks the first *block* items from *values*, repeating the block after
every *repeat_after* items, if *repeat_after* is non-zero. For example, to
discard the first 50 values read from an ADC::
from gpiozero import MCP3008
from gpiozero.tools import pre_periodic_filtered
with MCP3008(channel=0) as adc:
for value in pre_periodic_filtered(adc.values, 50, 0):
print value
Or to only display every even item read from an ADC::
from gpiozero import MCP3008
from gpiozero.tools import pre_periodic_filtered
with MCP3008(channel=0) as adc:
for value in pre_periodic_filtered(adc.values, 1, 1):
print value
"""
if block < 1:
raise ValueError("block must be 1 or larger")
if repeat_after < 0:
raise ValueError("repeat_after must be 0 or larger")
it = iter(values)
if repeat_after == 0:
for _ in range(block):
next(it)
while True:
yield next(it)
else:
while True:
for _ in range(block):
next(it)
for _ in range(repeat_after):
yield next(it)
def post_periodic_filtered(values, repeat_after, block):
"""
After every *repeat_after* items, blocks the next *block* items from
*values*. Note that unlike :func:`pre_periodic_filtered`, *repeat_after*
can't be 0. For example, to block every tenth item read from an ADC::
from gpiozero import MCP3008
from gpiozero.tools import post_periodic_filtered
with MCP3008(channel=0) as adc:
for value in post_periodic_filtered(adc.values, 9, 1):
print value
"""
if repeat_after < 1:
raise ValueError("repeat_after must be 1 or larger")
if block < 1:
raise ValueError("block must be 1 or larger")
it = iter(values)
while True:
for _ in range(repeat_after):
yield next(it)
for _ in range(block):
next(it)
def random_values():
"""
Provides an infinite source of random values between 0 and 1. For example,

View File

@@ -31,24 +31,108 @@ def test_negated():
assert list(negated((True, True, False, False))) == [False, False, True, True]
def test_inverted():
with pytest.raises(ValueError):
list(inverted((), 0, 0))
with pytest.raises(ValueError):
list(inverted((), 1, 1))
with pytest.raises(ValueError):
list(inverted((), 1, 0))
assert list(inverted(())) == []
assert list(inverted((1, 0, 0.1, 0.5))) == [0, 1, 0.9, 0.5]
assert list(inverted((1, 0, 0.1, 0.5), 0, 1)) == [0, 1, 0.9, 0.5]
assert list(inverted((-1, 0, -0.1, -0.5), -1, 0)) == [0, -1, -0.9, -0.5]
assert list(inverted((1, 0, 0.1, 0.5, -1, -0.1, -0.5), -1, 1)) == [-1, 0, -0.1, -0.5, 1, 0.1, 0.5]
assert list(inverted((2, 1, 1.1, 1.5), 1, 2)) == [1, 2, 1.9, 1.5]
def test_scaled():
with pytest.raises(ValueError):
list(scaled((), 0, 1, 0, 0))
with pytest.raises(ValueError):
list(scaled((), 0, 1, 1, 1))
with pytest.raises(ValueError):
list(scaled((), 0, 1, 1, 0))
assert list(scaled((), 0, 1)) == []
# no scale
assert list(scaled((0, 1, 0.5, 0.1), 0, 1)) == [0, 1, 0.5, 0.1]
assert list(scaled((0, 1, 0.5, 0.1), 0, 1, 0, 1)) == [0, 1, 0.5, 0.1]
# multiply by 2
assert list(scaled((0, 1, 0.5, 0.1), 0, 2, 0, 1)) == [0, 2, 1, 0.2]
# add 1
assert list(scaled((0, 1, 0.5, 0.1), 1, 2, 0, 1)) == [1, 2, 1.5, 1.1]
# multiply by 2 then add 1
assert list(scaled((0, 1, 0.5, 0.1), 1, 3, 0, 1)) == [1, 3, 2, 1.2]
# add 1 then multiply by 2
assert list(scaled((0, 1, 0.5, 0.1), 2, 4, 0, 1)) == [2, 4, 3, 2.2]
# invert
assert list(scaled((0, 1, 0.5, 0.1), 1, 0, 0, 1)) == [1, 0, 0.5, 0.9]
# multiply by -1 then subtract 1
assert list(scaled((0, 1, 0.5, 0.1), -1, -2, 0, 1)) == [-1, -2, -1.5, -1.1]
# scale 0->1 to -1->+1
assert list(scaled((0, 1, 0.5, 0.1), -1, 1)) == [-1, 1, 0.0, -0.8]
assert list(scaled((0, 1, 0.5, 0.1), -1, 1, 0, 1)) == [-1, 1, 0.0, -0.8]
# scale -1->+1 to 0->1
assert list(scaled((-1, 1, 0.0, -0.5), 0, 1, -1, 1)) == [0, 1, 0.5, 0.25]
def test_clamped():
assert list(clamped((-1, 0, 1, 2))) == [0, 0, 1, 1]
with pytest.raises(ValueError):
list(clamped((), 0, 0))
with pytest.raises(ValueError):
list(clamped((), 1, 1))
with pytest.raises(ValueError):
list(clamped((), 1, 0))
assert list(clamped(())) == []
assert list(clamped((-2, -1, -0.5, 0, 0.5, 1, 2))) == [0, 0, 0, 0, 0.5, 1, 1]
assert list(clamped((-2, -1, -0.5, 0, 0.5, 1, 2), 0, 1)) == [0, 0, 0, 0, 0.5, 1, 1]
assert list(clamped((-2, -1, -0.5, 0, 0.5, 1, 2), -1, 1)) == [-1, -1, -0.5, 0, 0.5, 1, 1]
assert list(clamped((-2, -1, -0.5, 0, 0.5, 1, 2), -2, 2)) == [-2, -1, -0.5, 0, 0.5, 1, 2]
def test_absoluted():
assert list(absoluted(())) == []
assert list(absoluted((-2, -1, 0, 1, 2))) == [2, 1, 0, 1, 2]
def test_quantized():
with pytest.raises(ValueError):
list(quantized((), 0))
with pytest.raises(ValueError):
list(quantized((), 4, 0, 0))
with pytest.raises(ValueError):
list(quantized((), 4, 1, 1))
with pytest.raises(ValueError):
list(quantized((), 4, 1, 0))
assert list(quantized((), 4)) == []
assert list(quantized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1), 4)) == [
0.0, 0.0, 0.0, 0.25, 0.25, 0.5, 0.5, 0.5, 0.75, 0.75, 1.0]
assert list(quantized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1), 4, 0, 1)) == [
0.0, 0.0, 0.0, 0.25, 0.25, 0.5, 0.5, 0.5, 0.75, 0.75, 1.0]
assert list(quantized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1), 5)) == [
0.0, 0.0, 0.2, 0.2, 0.4, 0.4, 0.6, 0.6, 0.8, 0.8, 1.0]
assert list(quantized((0, 0.25, 0.5, 0.75, 1.0, 1.5, 1.75, 2.0), 2, 0, 2)) == [
0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 2.0]
assert list(quantized((1, 1.25, 1.5, 1.75, 2.0, 2.5, 2.75, 3.0), 2, 1, 3)) == [
1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0]
def test_booleanized():
with pytest.raises(ValueError):
list(booleanized((), 0, 0))
with pytest.raises(ValueError):
list(booleanized((), 1, 1))
with pytest.raises(ValueError):
list(booleanized((), 1, 0))
with pytest.raises(ValueError):
list(booleanized((), 0, 0.5, -0.2))
with pytest.raises(ValueError):
list(booleanized((), 0, 0.5, 0.5))
assert list(booleanized((), 0, 0.5)) == []
assert list(booleanized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1), 0, 0.5)) == [
True, True, True, True, True, True, False, False, False, False, False]
assert list(booleanized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1, 0), 0.25, 0.75)) == [
False, False, False, True, True, True, True, True, False, False, False, False]
assert list(booleanized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1, 0), 0.25, 0.75, 0.2)) == [
False, False, False, False, False, True, True, True, True, True, False, False]
assert list(booleanized((1, 0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1, 0, 1), 0.25, 0.75)) == [
False, False, False, True, True, True, True, True, False, False, False, False]
assert list(booleanized((1, 0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1, 0, 1), 0.25, 0.75, 0.2)) == [
False, False, False, False, False, True, True, True, True, True, False, False]
def test_all_values():
assert list(all_values(())) == []
@@ -62,52 +146,128 @@ def test_any_values():
def test_averaged():
assert list(averaged(())) == []
assert list(averaged((0, 0.5, 1))) == [0, 0.5, 1]
assert list(averaged((0, 0.5, 1), (1, 1, 1))) == [0.5, 0.75, 1]
def test_summed():
assert list(summed(())) == []
assert list(summed((0, 0.5, 0.5, 1))) == [0, 0.5, 0.5, 1]
assert list(summed((0, 0.5, 0.5, 1), (1, 0.5, 1, 1))) == [1, 1, 1.5, 2]
def test_multiplied():
assert list(multiplied(())) == []
assert list(multiplied((0, 0.5, 0.5, 1))) == [0, 0.5, 0.5, 1]
assert list(multiplied((0, 0.5, 0.5, 1), (1, 0.5, 1, 1))) == [0, 0.25, 0.5, 1]
def test_queued():
with pytest.raises(ValueError):
list(queued((), 0))
assert list(queued((), 5)) == []
assert list(queued((1, 2, 3, 4, 5), 5)) == [1]
assert list(queued((1, 2, 3, 4, 5, 6), 5)) == [1, 2]
def test_smoothed():
with pytest.raises(ValueError):
list(smoothed((), 0))
assert list(smoothed((), 5)) == []
assert list(smoothed((1, 2, 3, 4, 5), 5)) == [3.0]
assert list(smoothed((1, 2, 3, 4, 5, 6), 5)) == [3.0, 4.0]
assert list(smoothed((1, 2, 3, 4, 5, 6), 5, average=mean)) == [3.0, 4.0]
assert list(smoothed((1, 1, 1, 4, 5, 5), 5, average=mean)) == [2.4, 3.2]
assert list(smoothed((1, 1, 1, 4, 5, 5), 5, average=median)) == [1, 4]
def test_pre_delayed():
with pytest.raises(ValueError):
list(pre_delayed((), -1))
assert list(pre_delayed((), 0.01)) == []
count = 0
start = time()
for v in pre_delayed((0, 0, 0), 0.01):
count += 1
assert v == 0
assert time() - start >= 0.01
start = time()
assert count == 3
def test_post_delayed():
with pytest.raises(ValueError):
list(post_delayed((), -1))
assert list(post_delayed((), 0.01)) == []
count = 0
start = time()
for v in post_delayed((1, 2, 2), 0.01):
count += 1
if v == 1:
assert time() - start < 0.01
else:
elif v == 2:
assert time() - start >= 0.01
else:
assert False
start = time()
assert time() - start >= 0.01
assert count == 3
def test_pre_periodic_filtered():
with pytest.raises(ValueError):
list(pre_periodic_filtered((), 2, -1))
with pytest.raises(ValueError):
list(pre_periodic_filtered((), 0, 0))
assert list(pre_periodic_filtered((), 2, 0)) == []
assert list(pre_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2, 0)) == [3, 4, 5, 6, 7, 8, 9, 10]
assert list(pre_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 1, 1)) == [2, 4, 6, 8, 10]
assert list(pre_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 1, 2)) == [2, 3, 5, 6, 8, 9]
assert list(pre_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2, 1)) == [3, 6, 9]
def test_post_periodic_filtered():
with pytest.raises(ValueError):
list(post_periodic_filtered((), 1, 0))
with pytest.raises(ValueError):
list(post_periodic_filtered((), 0, 1))
assert list(pre_periodic_filtered((), 1, 1)) == []
assert list(post_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 1, 1)) == [1, 3, 5, 7, 9]
assert list(post_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 1, 2)) == [1, 4, 7, 10]
assert list(post_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2, 1)) == [1, 2, 4, 5, 7, 8, 10]
def test_random_values():
for i, v in zip(range(1000), random_values()):
for _, v in zip(range(1000), random_values()):
assert 0 <= v <= 1
def test_sin_values():
firstval = None
for i, v in zip(range(1000), sin_values()):
assert -1 <= v <= 1
assert isclose(v, sin(radians(i)), abs_tol=1e-9)
if i == 0:
firstval = v
else:
if i % 360 == 0:
assert v == firstval
for period in (360, 100):
firstval = None
for i, v in zip(range(1000), sin_values(period)):
assert -1 <= v <= 1
if i == 0:
firstval = v
else:
if i % period == 0:
assert v == firstval
def test_cos_values():
firstval = None
for i, v in zip(range(1000), cos_values()):
assert -1 <= v <= 1
assert isclose(v, cos(radians(i)), abs_tol=1e-9)
if i == 0:
firstval = v
else:
if i % 360 == 0:
assert v == firstval
for period in (360, 100):
firstval = None
for i, v in zip(range(1000), cos_values(period)):
assert -1 <= v <= 1
if i == 0:
firstval = v
else:
if i % period == 0:
assert v == firstval