diff --git a/docs/api_tools.rst b/docs/api_tools.rst index 4add9c6..69b2a60 100644 --- a/docs/api_tools.rst +++ b/docs/api_tools.rst @@ -10,7 +10,7 @@ the :attr:`~gpiozero.SourceMixin.source` and library. These utility routines are in the ``tools`` module of GPIO Zero and are typically imported as follows:: - from gpiozero.tools import scaled, negated, conjunction + from gpiozero.tools import scaled, negated, all_values Given that :attr:`~gpiozero.SourceMixin.source` and :attr:`~gpiozero.ValuesMixin.values` deal with infinite iterators, another @@ -29,6 +29,8 @@ Single source conversions .. autofunction:: absoluted +.. autofunction:: booleanized + .. autofunction:: clamped .. autofunction:: inverted @@ -37,8 +39,12 @@ Single source conversions .. autofunction:: post_delayed +.. autofunction:: post_periodic_filtered + .. autofunction:: pre_delayed +.. autofunction:: pre_periodic_filtered + .. autofunction:: quantized .. autofunction:: queued @@ -56,10 +62,10 @@ Combining sources .. autofunction:: averaged -.. autofunction:: summed - .. autofunction:: multiplied +.. autofunction:: summed + Artificial sources ================== diff --git a/gpiozero/tools.py b/gpiozero/tools.py index 4268cea..c3611f2 100644 --- a/gpiozero/tools.py +++ b/gpiozero/tools.py @@ -41,10 +41,13 @@ def negated(values): yield not v -def inverted(values): +def inverted(values, input_min=0, input_max=1): """ - Returns the inversion of the supplied values (1 becomes 0, 0 becomes 1, - 0.1 becomes 0.9, etc.). For example:: + Returns the inversion of the supplied values (*input_min* becomes + *input_max*, *input_max* becomes *input_min*, `input_min + 0.1` becomes + `input_max - 0.1`, etc.). All items in *values* are assumed to be between + *input_min* and *input_max* (which default to 0 and 1 respectively), and + the output will be in the same range. For example:: from gpiozero import MCP3008, PWMLED from gpiozero.tools import inverted @@ -55,8 +58,10 @@ def inverted(values): led.source = inverted(pot.values) pause() """ + if input_min >= input_max: + raise ValueError('input_min must be smaller than input_max') for v in values: - yield 1 - v + yield input_min + input_max - v def scaled(values, output_min, output_max, input_min=0, input_max=1): @@ -82,6 +87,8 @@ def scaled(values, output_min, output_max, input_min=0, input_max=1): *input_max* (inclusive) then the function will not produce values that lie within *output_min* to *output_max* (inclusive). """ + if input_min >= input_max: + raise ValueError('input_min must be smaller than input_max') input_size = input_max - input_min output_size = output_max - output_min for v in values: @@ -104,6 +111,8 @@ def clamped(values, output_min=0, output_max=1): led.source = clamped(pot.values, 0.5, 1.0) pause() """ + if output_min >= output_max: + raise ValueError('output_min must be smaller than output_max') for v in values: yield min(max(v, output_min), output_max) @@ -128,11 +137,11 @@ def absoluted(values): yield abs(v) -def quantized(values, steps, output_min=0, output_max=1): +def quantized(values, steps, input_min=0, input_max=1): """ Returns *values* quantized to *steps* increments. All items in *values* are - assumed to be between *output_min* and *output_max* (use :func:`scaled` to - ensure this if necessary). + assumed to be between *input_min* and *input_max* (which default to 0 and + 1 respectively), and the output will be in the same range. For example, to quantize values between 0 and 1 to 5 "steps" (0.0, 0.25, 0.5, 0.75, 1.0):: @@ -146,9 +155,72 @@ def quantized(values, steps, output_min=0, output_max=1): led.source = quantized(pot.values, 4) pause() """ - output_size = output_max - output_min - for v in scaled(values, 0, 1, output_min, output_max): - yield ((int(v * steps) / steps) * output_size) + output_min + if steps < 1: + raise ValueError("steps must be 1 or larger") + if input_min >= input_max: + raise ValueError('input_min must be smaller than input_max') + input_size = input_max - input_min + for v in scaled(values, 0, 1, input_min, input_max): + yield ((int(v * steps) / steps) * input_size) + input_min + + +def booleanized(values, min_value, max_value, hysteresis=0): + """ + Returns True for each item in *values* between *min_value* and + *max_value*, and False otherwise. *hysteresis* can optionally be used to + add `hysteresis`_ which prevents the output value rapidly flipping when + the input value is fluctuating near the *min_value* or *max_value* + thresholds. For example, to light an LED only when a potentiometer is + between 1/4 and 3/4 of its full range:: + + from gpiozero import LED, MCP3008 + from gpiozero.tools import booleanized + from signal import pause + + led = LED(4) + pot = MCP3008(channel=0) + led.source = booleanized(pot.values, 0.25, 0.75) + pause() + + .. _hysteresis: https://en.wikipedia.org/wiki/Hysteresis + """ + if min_value >= max_value: + raise ValueError('min_value must be smaller than max_value') + min_value = float(min_value) + max_value = float(max_value) + if hysteresis < 0: + raise ValueError("hysteresis must be 0 or larger") + else: + hysteresis = float(hysteresis) + if (max_value - min_value) <= hysteresis: + raise ValueError('The gap between min_value and max_value must be larger than hysteresis') + last_state = None + for v in values: + if v < min_value: + new_state = 'below' + elif v > max_value: + new_state = 'above' + else: + new_state = 'in' + switch = False + if last_state == None or not hysteresis: + switch = True + elif new_state == last_state: + pass + else: # new_state != last_state + if last_state == 'below' and new_state == 'in': + switch = v >= min_value + hysteresis + elif last_state == 'in' and new_state == 'below': + switch = v < min_value - hysteresis + elif last_state == 'in' and new_state == 'above': + switch = v > max_value + hysteresis + elif last_state == 'above' and new_state == 'in': + switch = v <= max_value - hysteresis + else: # above->below or below->above + switch = True + if switch: + last_state = new_state + yield last_state == 'in' def all_values(*values): @@ -284,6 +356,8 @@ def queued(values, qsize): leds[4].source = btn.values pause() """ + if qsize < 1: + raise ValueError("qsize must be 1 or larger") q = [] it = iter(values) for i in range(qsize): @@ -308,9 +382,11 @@ def smoothed(values, qsize, average=mean): from gpiozero.tools import smoothed with MCP3008(channel=0) as adc: - for smoothvalue in smoothed(adc.values, 5): - print smoothvalue + for value in smoothed(adc.values, 5): + print value """ + if qsize < 1: + raise ValueError("qsize must be 1 or larger") q = [] it = iter(values) for i in range(qsize): @@ -327,6 +403,8 @@ def pre_delayed(values, delay): """ Waits for *delay* seconds before returning each item from *values*. """ + if delay < 0: + raise ValueError("delay must be 0 or larger") for v in values: sleep(delay) yield v @@ -336,11 +414,78 @@ def post_delayed(values, delay): """ Waits for *delay* seconds after returning each item from *values*. """ + if delay < 0: + raise ValueError("delay must be 0 or larger") for v in values: yield v sleep(delay) +def pre_periodic_filtered(values, block, repeat_after): + """ + Blocks the first *block* items from *values*, repeating the block after + every *repeat_after* items, if *repeat_after* is non-zero. For example, to + discard the first 50 values read from an ADC:: + + from gpiozero import MCP3008 + from gpiozero.tools import pre_periodic_filtered + + with MCP3008(channel=0) as adc: + for value in pre_periodic_filtered(adc.values, 50, 0): + print value + + Or to only display every even item read from an ADC:: + + from gpiozero import MCP3008 + from gpiozero.tools import pre_periodic_filtered + + with MCP3008(channel=0) as adc: + for value in pre_periodic_filtered(adc.values, 1, 1): + print value + """ + if block < 1: + raise ValueError("block must be 1 or larger") + if repeat_after < 0: + raise ValueError("repeat_after must be 0 or larger") + it = iter(values) + if repeat_after == 0: + for _ in range(block): + next(it) + while True: + yield next(it) + else: + while True: + for _ in range(block): + next(it) + for _ in range(repeat_after): + yield next(it) + + +def post_periodic_filtered(values, repeat_after, block): + """ + After every *repeat_after* items, blocks the next *block* items from + *values*. Note that unlike :func:`pre_periodic_filtered`, *repeat_after* + can't be 0. For example, to block every tenth item read from an ADC:: + + from gpiozero import MCP3008 + from gpiozero.tools import post_periodic_filtered + + with MCP3008(channel=0) as adc: + for value in post_periodic_filtered(adc.values, 9, 1): + print value + """ + if repeat_after < 1: + raise ValueError("repeat_after must be 1 or larger") + if block < 1: + raise ValueError("block must be 1 or larger") + it = iter(values) + while True: + for _ in range(repeat_after): + yield next(it) + for _ in range(block): + next(it) + + def random_values(): """ Provides an infinite source of random values between 0 and 1. For example, diff --git a/tests/test_tools.py b/tests/test_tools.py index 422dbec..adeef77 100644 --- a/tests/test_tools.py +++ b/tests/test_tools.py @@ -31,24 +31,108 @@ def test_negated(): assert list(negated((True, True, False, False))) == [False, False, True, True] def test_inverted(): + with pytest.raises(ValueError): + list(inverted((), 0, 0)) + with pytest.raises(ValueError): + list(inverted((), 1, 1)) + with pytest.raises(ValueError): + list(inverted((), 1, 0)) assert list(inverted(())) == [] assert list(inverted((1, 0, 0.1, 0.5))) == [0, 1, 0.9, 0.5] + assert list(inverted((1, 0, 0.1, 0.5), 0, 1)) == [0, 1, 0.9, 0.5] + assert list(inverted((-1, 0, -0.1, -0.5), -1, 0)) == [0, -1, -0.9, -0.5] + assert list(inverted((1, 0, 0.1, 0.5, -1, -0.1, -0.5), -1, 1)) == [-1, 0, -0.1, -0.5, 1, 0.1, 0.5] + assert list(inverted((2, 1, 1.1, 1.5), 1, 2)) == [1, 2, 1.9, 1.5] def test_scaled(): + with pytest.raises(ValueError): + list(scaled((), 0, 1, 0, 0)) + with pytest.raises(ValueError): + list(scaled((), 0, 1, 1, 1)) + with pytest.raises(ValueError): + list(scaled((), 0, 1, 1, 0)) + assert list(scaled((), 0, 1)) == [] + # no scale assert list(scaled((0, 1, 0.5, 0.1), 0, 1)) == [0, 1, 0.5, 0.1] + assert list(scaled((0, 1, 0.5, 0.1), 0, 1, 0, 1)) == [0, 1, 0.5, 0.1] + # multiply by 2 + assert list(scaled((0, 1, 0.5, 0.1), 0, 2, 0, 1)) == [0, 2, 1, 0.2] + # add 1 + assert list(scaled((0, 1, 0.5, 0.1), 1, 2, 0, 1)) == [1, 2, 1.5, 1.1] + # multiply by 2 then add 1 + assert list(scaled((0, 1, 0.5, 0.1), 1, 3, 0, 1)) == [1, 3, 2, 1.2] + # add 1 then multiply by 2 + assert list(scaled((0, 1, 0.5, 0.1), 2, 4, 0, 1)) == [2, 4, 3, 2.2] + # invert + assert list(scaled((0, 1, 0.5, 0.1), 1, 0, 0, 1)) == [1, 0, 0.5, 0.9] + # multiply by -1 then subtract 1 + assert list(scaled((0, 1, 0.5, 0.1), -1, -2, 0, 1)) == [-1, -2, -1.5, -1.1] + # scale 0->1 to -1->+1 assert list(scaled((0, 1, 0.5, 0.1), -1, 1)) == [-1, 1, 0.0, -0.8] + assert list(scaled((0, 1, 0.5, 0.1), -1, 1, 0, 1)) == [-1, 1, 0.0, -0.8] + # scale -1->+1 to 0->1 + assert list(scaled((-1, 1, 0.0, -0.5), 0, 1, -1, 1)) == [0, 1, 0.5, 0.25] def test_clamped(): - assert list(clamped((-1, 0, 1, 2))) == [0, 0, 1, 1] + with pytest.raises(ValueError): + list(clamped((), 0, 0)) + with pytest.raises(ValueError): + list(clamped((), 1, 1)) + with pytest.raises(ValueError): + list(clamped((), 1, 0)) + assert list(clamped(())) == [] + assert list(clamped((-2, -1, -0.5, 0, 0.5, 1, 2))) == [0, 0, 0, 0, 0.5, 1, 1] + assert list(clamped((-2, -1, -0.5, 0, 0.5, 1, 2), 0, 1)) == [0, 0, 0, 0, 0.5, 1, 1] + assert list(clamped((-2, -1, -0.5, 0, 0.5, 1, 2), -1, 1)) == [-1, -1, -0.5, 0, 0.5, 1, 1] + assert list(clamped((-2, -1, -0.5, 0, 0.5, 1, 2), -2, 2)) == [-2, -1, -0.5, 0, 0.5, 1, 2] def test_absoluted(): + assert list(absoluted(())) == [] assert list(absoluted((-2, -1, 0, 1, 2))) == [2, 1, 0, 1, 2] def test_quantized(): + with pytest.raises(ValueError): + list(quantized((), 0)) + with pytest.raises(ValueError): + list(quantized((), 4, 0, 0)) + with pytest.raises(ValueError): + list(quantized((), 4, 1, 1)) + with pytest.raises(ValueError): + list(quantized((), 4, 1, 0)) + assert list(quantized((), 4)) == [] assert list(quantized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1), 4)) == [ 0.0, 0.0, 0.0, 0.25, 0.25, 0.5, 0.5, 0.5, 0.75, 0.75, 1.0] + assert list(quantized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1), 4, 0, 1)) == [ + 0.0, 0.0, 0.0, 0.25, 0.25, 0.5, 0.5, 0.5, 0.75, 0.75, 1.0] assert list(quantized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1), 5)) == [ 0.0, 0.0, 0.2, 0.2, 0.4, 0.4, 0.6, 0.6, 0.8, 0.8, 1.0] + assert list(quantized((0, 0.25, 0.5, 0.75, 1.0, 1.5, 1.75, 2.0), 2, 0, 2)) == [ + 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 2.0] + assert list(quantized((1, 1.25, 1.5, 1.75, 2.0, 2.5, 2.75, 3.0), 2, 1, 3)) == [ + 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0] + +def test_booleanized(): + with pytest.raises(ValueError): + list(booleanized((), 0, 0)) + with pytest.raises(ValueError): + list(booleanized((), 1, 1)) + with pytest.raises(ValueError): + list(booleanized((), 1, 0)) + with pytest.raises(ValueError): + list(booleanized((), 0, 0.5, -0.2)) + with pytest.raises(ValueError): + list(booleanized((), 0, 0.5, 0.5)) + assert list(booleanized((), 0, 0.5)) == [] + assert list(booleanized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1), 0, 0.5)) == [ + True, True, True, True, True, True, False, False, False, False, False] + assert list(booleanized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1, 0), 0.25, 0.75)) == [ + False, False, False, True, True, True, True, True, False, False, False, False] + assert list(booleanized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1, 0), 0.25, 0.75, 0.2)) == [ + False, False, False, False, False, True, True, True, True, True, False, False] + assert list(booleanized((1, 0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1, 0, 1), 0.25, 0.75)) == [ + False, False, False, True, True, True, True, True, False, False, False, False] + assert list(booleanized((1, 0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1, 0, 1), 0.25, 0.75, 0.2)) == [ + False, False, False, False, False, True, True, True, True, True, False, False] def test_all_values(): assert list(all_values(())) == [] @@ -62,52 +146,128 @@ def test_any_values(): def test_averaged(): assert list(averaged(())) == [] + assert list(averaged((0, 0.5, 1))) == [0, 0.5, 1] assert list(averaged((0, 0.5, 1), (1, 1, 1))) == [0.5, 0.75, 1] def test_summed(): assert list(summed(())) == [] + assert list(summed((0, 0.5, 0.5, 1))) == [0, 0.5, 0.5, 1] assert list(summed((0, 0.5, 0.5, 1), (1, 0.5, 1, 1))) == [1, 1, 1.5, 2] def test_multiplied(): assert list(multiplied(())) == [] + assert list(multiplied((0, 0.5, 0.5, 1))) == [0, 0.5, 0.5, 1] assert list(multiplied((0, 0.5, 0.5, 1), (1, 0.5, 1, 1))) == [0, 0.25, 0.5, 1] def test_queued(): + with pytest.raises(ValueError): + list(queued((), 0)) + assert list(queued((), 5)) == [] assert list(queued((1, 2, 3, 4, 5), 5)) == [1] assert list(queued((1, 2, 3, 4, 5, 6), 5)) == [1, 2] def test_smoothed(): + with pytest.raises(ValueError): + list(smoothed((), 0)) + assert list(smoothed((), 5)) == [] assert list(smoothed((1, 2, 3, 4, 5), 5)) == [3.0] assert list(smoothed((1, 2, 3, 4, 5, 6), 5)) == [3.0, 4.0] + assert list(smoothed((1, 2, 3, 4, 5, 6), 5, average=mean)) == [3.0, 4.0] assert list(smoothed((1, 1, 1, 4, 5, 5), 5, average=mean)) == [2.4, 3.2] assert list(smoothed((1, 1, 1, 4, 5, 5), 5, average=median)) == [1, 4] def test_pre_delayed(): + with pytest.raises(ValueError): + list(pre_delayed((), -1)) + assert list(pre_delayed((), 0.01)) == [] + count = 0 start = time() for v in pre_delayed((0, 0, 0), 0.01): + count += 1 assert v == 0 assert time() - start >= 0.01 start = time() + assert count == 3 def test_post_delayed(): + with pytest.raises(ValueError): + list(post_delayed((), -1)) + assert list(post_delayed((), 0.01)) == [] + count = 0 start = time() for v in post_delayed((1, 2, 2), 0.01): + count += 1 if v == 1: assert time() - start < 0.01 - else: + elif v == 2: assert time() - start >= 0.01 + else: + assert False start = time() assert time() - start >= 0.01 + assert count == 3 + +def test_pre_periodic_filtered(): + with pytest.raises(ValueError): + list(pre_periodic_filtered((), 2, -1)) + with pytest.raises(ValueError): + list(pre_periodic_filtered((), 0, 0)) + assert list(pre_periodic_filtered((), 2, 0)) == [] + assert list(pre_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2, 0)) == [3, 4, 5, 6, 7, 8, 9, 10] + assert list(pre_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 1, 1)) == [2, 4, 6, 8, 10] + assert list(pre_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 1, 2)) == [2, 3, 5, 6, 8, 9] + assert list(pre_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2, 1)) == [3, 6, 9] + +def test_post_periodic_filtered(): + with pytest.raises(ValueError): + list(post_periodic_filtered((), 1, 0)) + with pytest.raises(ValueError): + list(post_periodic_filtered((), 0, 1)) + assert list(pre_periodic_filtered((), 1, 1)) == [] + assert list(post_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 1, 1)) == [1, 3, 5, 7, 9] + assert list(post_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 1, 2)) == [1, 4, 7, 10] + assert list(post_periodic_filtered((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2, 1)) == [1, 2, 4, 5, 7, 8, 10] def test_random_values(): - for i, v in zip(range(1000), random_values()): + for _, v in zip(range(1000), random_values()): assert 0 <= v <= 1 def test_sin_values(): + firstval = None for i, v in zip(range(1000), sin_values()): + assert -1 <= v <= 1 assert isclose(v, sin(radians(i)), abs_tol=1e-9) + if i == 0: + firstval = v + else: + if i % 360 == 0: + assert v == firstval + for period in (360, 100): + firstval = None + for i, v in zip(range(1000), sin_values(period)): + assert -1 <= v <= 1 + if i == 0: + firstval = v + else: + if i % period == 0: + assert v == firstval def test_cos_values(): + firstval = None for i, v in zip(range(1000), cos_values()): + assert -1 <= v <= 1 assert isclose(v, cos(radians(i)), abs_tol=1e-9) - + if i == 0: + firstval = v + else: + if i % 360 == 0: + assert v == firstval + for period in (360, 100): + firstval = None + for i, v in zip(range(1000), cos_values(period)): + assert -1 <= v <= 1 + if i == 0: + firstval = v + else: + if i % period == 0: + assert v == firstval