diff --git a/docs/api_tools.rst b/docs/api_tools.rst index 8b46d2b..4add9c6 100644 --- a/docs/api_tools.rst +++ b/docs/api_tools.rst @@ -43,6 +43,8 @@ Single source conversions .. autofunction:: queued +.. autofunction:: smoothed + .. autofunction:: scaled Combining sources @@ -54,6 +56,10 @@ Combining sources .. autofunction:: averaged +.. autofunction:: summed + +.. autofunction:: multiplied + Artificial sources ================== diff --git a/gpiozero/tools.py b/gpiozero/tools.py index 11a079a..4268cea 100644 --- a/gpiozero/tools.py +++ b/gpiozero/tools.py @@ -218,6 +218,54 @@ def averaged(*values): yield mean(v) +def summed(*values): + """ + Returns the sum of all supplied values. One or more *values* can be + specified. For example, to light a :class:`PWMLED` as the (scaled) sum of + several potentiometers connected to an :class:`MCP3008` ADC:: + + from gpiozero import MCP3008, PWMLED + from gpiozero.tools import summed, scaled + from signal import pause + + pot1 = MCP3008(channel=0) + pot2 = MCP3008(channel=1) + pot3 = MCP3008(channel=2) + led = PWMLED(4) + led.source = scaled(summed(pot1.values, pot2.values, pot3.values), 0, 1, 0, 3) + pause() + """ + for v in zip(*values): + yield sum(v) + + +def multiplied(*values): + """ + Returns the product of all supplied values. One or more *values* can be + specified. For example, to light a :class:`PWMLED` as the product (i.e. + multiplication) of several potentiometers connected to an :class:`MCP3008` + ADC:: + + from gpiozero import MCP3008, PWMLED + from gpiozero.tools import multiplied + from signal import pause + + pot1 = MCP3008(channel=0) + pot2 = MCP3008(channel=1) + pot3 = MCP3008(channel=2) + led = PWMLED(4) + led.source = multiplied(pot1.values, pot2.values, pot3.values) + pause() + """ + def _product(it): + p = 1 + for n in it: + p *= n + return p + for v in zip(*values): + yield _product(v) + + def queued(values, qsize): """ Queues up readings from *values* (the number of readings queued is @@ -248,6 +296,33 @@ def queued(values, qsize): break +def smoothed(values, qsize, average=mean): + """ + Queues up readings from *values* (the number of readings queued is + determined by *qsize*) and begins yielding the *average* of the last + *qsize* values when the queue is full. The larger the *qsize*, the more the + values are smoothed. For example, to smooth the analog values read from an + ADC:: + + from gpiozero import MCP3008 + from gpiozero.tools import smoothed + + with MCP3008(channel=0) as adc: + for smoothvalue in smoothed(adc.values, 5): + print smoothvalue + """ + q = [] + it = iter(values) + for i in range(qsize): + q.append(next(it)) + for i in cycle(range(qsize)): + yield average(q) + try: + q[i] = next(it) + except StopIteration: + break + + def pre_delayed(values, delay): """ Waits for *delay* seconds before returning each item from *values*. diff --git a/tests/test_tools.py b/tests/test_tools.py index 9749b80..422dbec 100644 --- a/tests/test_tools.py +++ b/tests/test_tools.py @@ -16,6 +16,14 @@ try: from math import isclose except ImportError: from gpiozero.compat import isclose +try: + from statistics import mean +except ImportError: + from gpiozero.compat import mean +try: + from statistics import median +except ImportError: + from gpiozero.compat import median def test_negated(): @@ -56,10 +64,24 @@ def test_averaged(): assert list(averaged(())) == [] assert list(averaged((0, 0.5, 1), (1, 1, 1))) == [0.5, 0.75, 1] +def test_summed(): + assert list(summed(())) == [] + assert list(summed((0, 0.5, 0.5, 1), (1, 0.5, 1, 1))) == [1, 1, 1.5, 2] + +def test_multiplied(): + assert list(multiplied(())) == [] + assert list(multiplied((0, 0.5, 0.5, 1), (1, 0.5, 1, 1))) == [0, 0.25, 0.5, 1] + def test_queued(): assert list(queued((1, 2, 3, 4, 5), 5)) == [1] assert list(queued((1, 2, 3, 4, 5, 6), 5)) == [1, 2] +def test_smoothed(): + assert list(smoothed((1, 2, 3, 4, 5), 5)) == [3.0] + assert list(smoothed((1, 2, 3, 4, 5, 6), 5)) == [3.0, 4.0] + assert list(smoothed((1, 1, 1, 4, 5, 5), 5, average=mean)) == [2.4, 3.2] + assert list(smoothed((1, 1, 1, 4, 5, 5), 5, average=median)) == [1, 4] + def test_pre_delayed(): start = time() for v in pre_delayed((0, 0, 0), 0.01):