Merge pull request #298 from lurch/new_source_tools

New source tools: booleanized, pre_periodic_filtered & post_periodic_filtered
This commit is contained in:
Dave Jones
2016-05-08 11:01:01 +01:00
3 changed files with 330 additions and 19 deletions

View File

@@ -10,7 +10,7 @@ the :attr:`~gpiozero.SourceMixin.source` and
library. These utility routines are in the ``tools`` module of GPIO Zero and
are typically imported as follows::
from gpiozero.tools import scaled, negated, conjunction
from gpiozero.tools import scaled, negated, all_values
Given that :attr:`~gpiozero.SourceMixin.source` and
:attr:`~gpiozero.ValuesMixin.values` deal with infinite iterators, another
@@ -29,6 +29,8 @@ Single source conversions
.. autofunction:: absoluted
.. autofunction:: booleanized
.. autofunction:: clamped
.. autofunction:: inverted
@@ -37,8 +39,12 @@ Single source conversions
.. autofunction:: post_delayed
.. autofunction:: post_periodic_filtered
.. autofunction:: pre_delayed
.. autofunction:: pre_periodic_filtered
.. autofunction:: quantized
.. autofunction:: queued
@@ -56,10 +62,10 @@ Combining sources
.. autofunction:: averaged
.. autofunction:: summed
.. autofunction:: multiplied
.. autofunction:: summed
Artificial sources
==================

View File

@@ -41,10 +41,13 @@ def negated(values):
yield not v
def inverted(values):
def inverted(values, input_min=0, input_max=1):
"""
Returns the inversion of the supplied values (1 becomes 0, 0 becomes 1,
0.1 becomes 0.9, etc.). For example::
Returns the inversion of the supplied values (*input_min* becomes
*input_max*, *input_max* becomes *input_min*, `input_min + 0.1` becomes
`input_max - 0.1`, etc.). All items in *values* are assumed to be between
*input_min* and *input_max* (which default to 0 and 1 respectively), and
the output will be in the same range. For example::
from gpiozero import MCP3008, PWMLED
from gpiozero.tools import inverted
@@ -55,8 +58,10 @@ def inverted(values):
led.source = inverted(pot.values)
pause()
"""
if input_min >= input_max:
raise ValueError('input_min must be smaller than input_max')
for v in values:
yield 1 - v
yield input_min + input_max - v
def scaled(values, output_min, output_max, input_min=0, input_max=1):
@@ -82,6 +87,8 @@ def scaled(values, output_min, output_max, input_min=0, input_max=1):
*input_max* (inclusive) then the function will not produce values that
lie within *output_min* to *output_max* (inclusive).
"""
if input_min >= input_max:
raise ValueError('input_min must be smaller than input_max')
input_size = input_max - input_min
output_size = output_max - output_min
for v in values:
@@ -104,6 +111,8 @@ def clamped(values, output_min=0, output_max=1):
led.source = clamped(pot.values, 0.5, 1.0)
pause()
"""
if output_min >= output_max:
raise ValueError('output_min must be smaller than output_max')
for v in values:
yield min(max(v, output_min), output_max)
@@ -128,11 +137,11 @@ def absoluted(values):
yield abs(v)
def quantized(values, steps, output_min=0, output_max=1):
def quantized(values, steps, input_min=0, input_max=1):
"""
Returns *values* quantized to *steps* increments. All items in *values* are
assumed to be between *output_min* and *output_max* (use :func:`scaled` to
ensure this if necessary).
assumed to be between *input_min* and *input_max* (which default to 0 and
1 respectively), and the output will be in the same range.
For example, to quantize values between 0 and 1 to 5 "steps" (0.0, 0.25,
0.5, 0.75, 1.0)::
@@ -146,9 +155,72 @@ def quantized(values, steps, output_min=0, output_max=1):
led.source = quantized(pot.values, 4)
pause()
"""
output_size = output_max - output_min
for v in scaled(values, 0, 1, output_min, output_max):
yield ((int(v * steps) / steps) * output_size) + output_min
if steps < 1:
raise ValueError("steps must be 1 or larger")
if input_min >= input_max:
raise ValueError('input_min must be smaller than input_max')
input_size = input_max - input_min
for v in scaled(values, 0, 1, input_min, input_max):
yield ((int(v * steps) / steps) * input_size) + input_min
def booleanized(values, min_value, max_value, hysteresis=0):
"""
Returns True for each item in *values* between *min_value* and
*max_value*, and False otherwise. *hysteresis* can optionally be used to
add `hysteresis`_ which prevents the output value rapidly flipping when
the input value is fluctuating near the *min_value* or *max_value*
thresholds. For example, to light an LED only when a potentiometer is
between 1/4 and 3/4 of its full range::
from gpiozero import LED, MCP3008
from gpiozero.tools import booleanized
from signal import pause
led = LED(4)
pot = MCP3008(channel=0)
led.source = booleanized(pot.values, 0.25, 0.75)
pause()
.. _hysteresis: https://en.wikipedia.org/wiki/Hysteresis
"""
if min_value >= max_value:
raise ValueError('min_value must be smaller than max_value')
min_value = float(min_value)
max_value = float(max_value)
if hysteresis < 0:
raise ValueError("hysteresis must be 0 or larger")
else:
hysteresis = float(hysteresis)
if (max_value - min_value) <= hysteresis:
raise ValueError('The gap between min_value and max_value must be larger than hysteresis')
last_state = None
for v in values:
if v < min_value:
new_state = 'below'
elif v > max_value:
new_state = 'above'
else:
new_state = 'in'
switch = False
if last_state == None or not hysteresis:
switch = True
elif new_state == last_state:
pass
else: # new_state != last_state
if last_state == 'below' and new_state == 'in':
switch = v >= min_value + hysteresis
elif last_state == 'in' and new_state == 'below':
switch = v < min_value - hysteresis
elif last_state == 'in' and new_state == 'above':
switch = v > max_value + hysteresis
elif last_state == 'above' and new_state == 'in':
switch = v <= max_value - hysteresis
else: # above->below or below->above
switch = True
if switch:
last_state = new_state
yield last_state == 'in'
def all_values(*values):
@@ -284,6 +356,8 @@ def queued(values, qsize):
leds[4].source = btn.values
pause()
"""
if qsize < 1:
raise ValueError("qsize must be 1 or larger")
q = []
it = iter(values)
for i in range(qsize):
@@ -308,9 +382,11 @@ def smoothed(values, qsize, average=mean):
from gpiozero.tools import smoothed
with MCP3008(channel=0) as adc:
for smoothvalue in smoothed(adc.values, 5):
print smoothvalue
for value in smoothed(adc.values, 5):
print value
"""
if qsize < 1:
raise ValueError("qsize must be 1 or larger")
q = []
it = iter(values)
for i in range(qsize):
@@ -327,6 +403,8 @@ def pre_delayed(values, delay):
"""
Waits for *delay* seconds before returning each item from *values*.
"""
if delay < 0:
raise ValueError("delay must be 0 or larger")
for v in values:
sleep(delay)
yield v
@@ -336,11 +414,78 @@ def post_delayed(values, delay):
"""
Waits for *delay* seconds after returning each item from *values*.
"""
if delay < 0:
raise ValueError("delay must be 0 or larger")
for v in values:
yield v
sleep(delay)
def pre_periodic_filtered(values, block, repeat_after):
"""
Blocks the first *block* items from *values*, repeating the block after
every *repeat_after* items, if *repeat_after* is non-zero. For example, to
discard the first 50 values read from an ADC::
from gpiozero import MCP3008
from gpiozero.tools import pre_periodic_filtered
with MCP3008(channel=0) as adc:
for value in pre_periodic_filtered(adc.values, 50, 0):
print value
Or to only display every even item read from an ADC::
from gpiozero import MCP3008
from gpiozero.tools import pre_periodic_filtered
with MCP3008(channel=0) as adc:
for value in pre_periodic_filtered(adc.values, 1, 1):
print value
"""
if block < 1:
raise ValueError("block must be 1 or larger")
if repeat_after < 0:
raise ValueError("repeat_after must be 0 or larger")
it = iter(values)
if repeat_after == 0:
for _ in range(block):
next(it)
while True:
yield next(it)
else:
while True:
for _ in range(block):
next(it)
for _ in range(repeat_after):
yield next(it)
def post_periodic_filtered(values, repeat_after, block):
"""
After every *repeat_after* items, blocks the next *block* items from
*values*. Note that unlike :func:`pre_periodic_filtered`, *repeat_after*
can't be 0. For example, to block every tenth item read from an ADC::
from gpiozero import MCP3008
from gpiozero.tools import post_periodic_filtered
with MCP3008(channel=0) as adc:
for value in post_periodic_filtered(adc.values, 9, 1):
print value
"""
if repeat_after < 1:
raise ValueError("repeat_after must be 1 or larger")
if block < 1:
raise ValueError("block must be 1 or larger")
it = iter(values)
while True:
for _ in range(repeat_after):
yield next(it)
for _ in range(block):
next(it)
def random_values():
"""
Provides an infinite source of random values between 0 and 1. For example,

View File

@@ -31,24 +31,108 @@ def test_negated():
assert list(negated((True, True, False, False))) == [False, False, True, True]
def test_inverted():
with pytest.raises(ValueError):
list(inverted((), 0, 0))
with pytest.raises(ValueError):
list(inverted((), 1, 1))
with pytest.raises(ValueError):
list(inverted((), 1, 0))
assert list(inverted(())) == []
assert list(inverted((1, 0, 0.1, 0.5))) == [0, 1, 0.9, 0.5]
assert list(inverted((1, 0, 0.1, 0.5), 0, 1)) == [0, 1, 0.9, 0.5]
assert list(inverted((-1, 0, -0.1, -0.5), -1, 0)) == [0, -1, -0.9, -0.5]
assert list(inverted((1, 0, 0.1, 0.5, -1, -0.1, -0.5), -1, 1)) == [-1, 0, -0.1, -0.5, 1, 0.1, 0.5]
assert list(inverted((2, 1, 1.1, 1.5), 1, 2)) == [1, 2, 1.9, 1.5]
def test_scaled():
with pytest.raises(ValueError):
list(scaled((), 0, 1, 0, 0))
with pytest.raises(ValueError):
list(scaled((), 0, 1, 1, 1))
with pytest.raises(ValueError):
list(scaled((), 0, 1, 1, 0))
assert list(scaled((), 0, 1)) == []
# no scale
assert list(scaled((0, 1, 0.5, 0.1), 0, 1)) == [0, 1, 0.5, 0.1]
assert list(scaled((0, 1, 0.5, 0.1), 0, 1, 0, 1)) == [0, 1, 0.5, 0.1]
# multiply by 2
assert list(scaled((0, 1, 0.5, 0.1), 0, 2, 0, 1)) == [0, 2, 1, 0.2]
# add 1
assert list(scaled((0, 1, 0.5, 0.1), 1, 2, 0, 1)) == [1, 2, 1.5, 1.1]
# multiply by 2 then add 1
assert list(scaled((0, 1, 0.5, 0.1), 1, 3, 0, 1)) == [1, 3, 2, 1.2]
# add 1 then multiply by 2
assert list(scaled((0, 1, 0.5, 0.1), 2, 4, 0, 1)) == [2, 4, 3, 2.2]
# invert
assert list(scaled((0, 1, 0.5, 0.1), 1, 0, 0, 1)) == [1, 0, 0.5, 0.9]
# multiply by -1 then subtract 1
assert list(scaled((0, 1, 0.5, 0.1), -1, -2, 0, 1)) == [-1, -2, -1.5, -1.1]
# scale 0->1 to -1->+1
assert list(scaled((0, 1, 0.5, 0.1), -1, 1)) == [-1, 1, 0.0, -0.8]
assert list(scaled((0, 1, 0.5, 0.1), -1, 1, 0, 1)) == [-1, 1, 0.0, -0.8]
# scale -1->+1 to 0->1
assert list(scaled((-1, 1, 0.0, -0.5), 0, 1, -1, 1)) == [0, 1, 0.5, 0.25]
def test_clamped():
assert list(clamped((-1, 0, 1, 2))) == [0, 0, 1, 1]
with pytest.raises(ValueError):
list(clamped((), 0, 0))
with pytest.raises(ValueError):
list(clamped((), 1, 1))
with pytest.raises(ValueError):
list(clamped((), 1, 0))
assert list(clamped(())) == []
assert list(clamped((-2, -1, -0.5, 0, 0.5, 1, 2))) == [0, 0, 0, 0, 0.5, 1, 1]
assert list(clamped((-2, -1, -0.5, 0, 0.5, 1, 2), 0, 1)) == [0, 0, 0, 0, 0.5, 1, 1]
assert list(clamped((-2, -1, -0.5, 0, 0.5, 1, 2), -1, 1)) == [-1, -1, -0.5, 0, 0.5, 1, 1]
assert list(clamped((-2, -1, -0.5, 0, 0.5, 1, 2), -2, 2)) == [-2, -1, -0.5, 0, 0.5, 1, 2]
def test_absoluted():
assert list(absoluted(())) == []
assert list(absoluted((-2, -1, 0, 1, 2))) == [2, 1, 0, 1, 2]
def test_quantized():
with pytest.raises(ValueError):
list(quantized((), 0))
with pytest.raises(ValueError):
list(quantized((), 4, 0, 0))
with pytest.raises(ValueError):
list(quantized((), 4, 1, 1))
with pytest.raises(ValueError):
list(quantized((), 4, 1, 0))
assert list(quantized((), 4)) == []
assert list(quantized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1), 4)) == [
0.0, 0.0, 0.0, 0.25, 0.25, 0.5, 0.5, 0.5, 0.75, 0.75, 1.0]
assert list(quantized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1), 4, 0, 1)) == [
0.0, 0.0, 0.0, 0.25, 0.25, 0.5, 0.5, 0.5, 0.75, 0.75, 1.0]
assert list(quantized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1), 5)) == [
0.0, 0.0, 0.2, 0.2, 0.4, 0.4, 0.6, 0.6, 0.8, 0.8, 1.0]
assert list(quantized((0, 0.25, 0.5, 0.75, 1.0, 1.5, 1.75, 2.0), 2, 0, 2)) == [
0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 2.0]
assert list(quantized((1, 1.25, 1.5, 1.75, 2.0, 2.5, 2.75, 3.0), 2, 1, 3)) == [
1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0]
def test_booleanized():
with pytest.raises(ValueError):
list(booleanized((), 0, 0))
with pytest.raises(ValueError):
list(booleanized((), 1, 1))
with pytest.raises(ValueError):
list(booleanized((), 1, 0))
with pytest.raises(ValueError):
list(booleanized((), 0, 0.5, -0.2))
with pytest.raises(ValueError):
list(booleanized((), 0, 0.5, 0.5))
assert list(booleanized((), 0, 0.5)) == []
assert list(booleanized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1), 0, 0.5)) == [
True, True, True, True, True, True, False, False, False, False, False]
assert list(booleanized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1, 0), 0.25, 0.75)) == [
False, False, False, True, True, True, True, True, False, False, False, False]
assert list(booleanized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1, 0), 0.25, 0.75, 0.2)) == [
False, False, False, False, False, True, True, True, True, True, False, False]
assert list(booleanized((1, 0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1, 0, 1), 0.25, 0.75)) == [
False, False, False, True, True, True, True, True, False, False, False, False]
assert list(booleanized((1, 0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1, 0, 1), 0.25, 0.75, 0.2)) == [
False, False, False, False, False, True, True, True, True, True, False, False]
def test_all_values():
assert list(all_values(())) == []
@@ -62,52 +146,128 @@ def test_any_values():
def test_averaged():
assert list(averaged(())) == []
assert list(averaged((0, 0.5, 1))) == [0, 0.5, 1]
assert list(averaged((0, 0.5, 1), (1, 1, 1))) == [0.5, 0.75, 1]
def test_summed():
assert list(summed(())) == []
assert list(summed((0, 0.5, 0.5, 1))) == [0, 0.5, 0.5, 1]
assert list(summed((0, 0.5, 0.5, 1), (1, 0.5, 1, 1))) == [1, 1, 1.5, 2]
def test_multiplied():
assert list(multiplied(())) == []
assert list(multiplied((0, 0.5, 0.5, 1))) == [0, 0.5, 0.5, 1]
assert list(multiplied((0, 0.5, 0.5, 1), (1, 0.5, 1, 1))) == [0, 0.25, 0.5, 1]
def test_queued():
with pytest.raises(ValueError):
list(queued((), 0))
assert list(queued((), 5)) == []
assert list(queued((1, 2, 3, 4, 5), 5)) == [1]
assert list(queued((1, 2, 3, 4, 5, 6), 5)) == [1, 2]
def test_smoothed():
with pytest.raises(ValueError):
list(smoothed((), 0))
assert list(smoothed((), 5)) == []
assert list(smoothed((1, 2, 3, 4, 5), 5)) == [3.0]
assert list(smoothed((1, 2, 3, 4, 5, 6), 5)) == [3.0, 4.0]
assert list(smoothed((1, 2, 3, 4, 5, 6), 5, average=mean)) == [3.0, 4.0]
assert list(smoothed((1, 1, 1, 4, 5, 5), 5, average=mean)) == [2.4, 3.2]
assert list(smoothed((1, 1, 1, 4, 5, 5), 5, average=median)) == [1, 4]
def test_pre_delayed():
with pytest.raises(ValueError):
list(pre_delayed((), -1))
assert list(pre_delayed((), 0.01)) == []
count = 0
start = time()
for v in pre_delayed((0, 0, 0), 0.01):
count += 1
assert v == 0
assert time() - start >= 0.01
start = time()
assert count == 3
def test_post_delayed():
with pytest.raises(ValueError):
list(post_delayed((), -1))
assert list(post_delayed((), 0.01)) == []
count = 0
start = time()
for v in post_delayed((1, 2, 2), 0.01):
count += 1
if v == 1:
assert time() - start < 0.01
else:
elif v == 2:
assert time() - start >= 0.01
else:
assert False
start = time()
assert time() - start >= 0.01
assert count == 3
def test_pre_periodic_filtered():
with pytest.raises(ValueError):
list(pre_periodic_filtered((), 2, -1))
with pytest.raises(ValueError):
list(pre_periodic_filtered((), 0, 0))
assert list(pre_periodic_filtered((), 2, 0)) == []
assert list(pre_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2, 0)) == [3, 4, 5, 6, 7, 8, 9, 10]
assert list(pre_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 1, 1)) == [2, 4, 6, 8, 10]
assert list(pre_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 1, 2)) == [2, 3, 5, 6, 8, 9]
assert list(pre_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2, 1)) == [3, 6, 9]
def test_post_periodic_filtered():
with pytest.raises(ValueError):
list(post_periodic_filtered((), 1, 0))
with pytest.raises(ValueError):
list(post_periodic_filtered((), 0, 1))
assert list(pre_periodic_filtered((), 1, 1)) == []
assert list(post_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 1, 1)) == [1, 3, 5, 7, 9]
assert list(post_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 1, 2)) == [1, 4, 7, 10]
assert list(post_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2, 1)) == [1, 2, 4, 5, 7, 8, 10]
def test_random_values():
for i, v in zip(range(1000), random_values()):
for _, v in zip(range(1000), random_values()):
assert 0 <= v <= 1
def test_sin_values():
firstval = None
for i, v in zip(range(1000), sin_values()):
assert -1 <= v <= 1
assert isclose(v, sin(radians(i)), abs_tol=1e-9)
if i == 0:
firstval = v
else:
if i % 360 == 0:
assert v == firstval
for period in (360, 100):
firstval = None
for i, v in zip(range(1000), sin_values(period)):
assert -1 <= v <= 1
if i == 0:
firstval = v
else:
if i % period == 0:
assert v == firstval
def test_cos_values():
firstval = None
for i, v in zip(range(1000), cos_values()):
assert -1 <= v <= 1
assert isclose(v, cos(radians(i)), abs_tol=1e-9)
if i == 0:
firstval = v
else:
if i % 360 == 0:
assert v == firstval
for period in (360, 100):
firstval = None
for i, v in zip(range(1000), cos_values(period)):
assert -1 <= v <= 1
if i == 0:
firstval = v
else:
if i % period == 0:
assert v == firstval