mirror of
https://github.com/KevinMidboe/python-gpiozero.git
synced 2025-10-29 17:50:37 +00:00
Merge pull request #292 from lurch/extra_source_tools
Add extra Source Tools functions: smoothed, summed and multiplied
This commit is contained in:
@@ -43,6 +43,8 @@ Single source conversions
|
||||
|
||||
.. autofunction:: queued
|
||||
|
||||
.. autofunction:: smoothed
|
||||
|
||||
.. autofunction:: scaled
|
||||
|
||||
Combining sources
|
||||
@@ -54,6 +56,10 @@ Combining sources
|
||||
|
||||
.. autofunction:: averaged
|
||||
|
||||
.. autofunction:: summed
|
||||
|
||||
.. autofunction:: multiplied
|
||||
|
||||
Artificial sources
|
||||
==================
|
||||
|
||||
|
||||
@@ -218,6 +218,54 @@ def averaged(*values):
|
||||
yield mean(v)
|
||||
|
||||
|
||||
def summed(*values):
|
||||
"""
|
||||
Returns the sum of all supplied values. One or more *values* can be
|
||||
specified. For example, to light a :class:`PWMLED` as the (scaled) sum of
|
||||
several potentiometers connected to an :class:`MCP3008` ADC::
|
||||
|
||||
from gpiozero import MCP3008, PWMLED
|
||||
from gpiozero.tools import summed, scaled
|
||||
from signal import pause
|
||||
|
||||
pot1 = MCP3008(channel=0)
|
||||
pot2 = MCP3008(channel=1)
|
||||
pot3 = MCP3008(channel=2)
|
||||
led = PWMLED(4)
|
||||
led.source = scaled(summed(pot1.values, pot2.values, pot3.values), 0, 1, 0, 3)
|
||||
pause()
|
||||
"""
|
||||
for v in zip(*values):
|
||||
yield sum(v)
|
||||
|
||||
|
||||
def multiplied(*values):
|
||||
"""
|
||||
Returns the product of all supplied values. One or more *values* can be
|
||||
specified. For example, to light a :class:`PWMLED` as the product (i.e.
|
||||
multiplication) of several potentiometers connected to an :class:`MCP3008`
|
||||
ADC::
|
||||
|
||||
from gpiozero import MCP3008, PWMLED
|
||||
from gpiozero.tools import multiplied
|
||||
from signal import pause
|
||||
|
||||
pot1 = MCP3008(channel=0)
|
||||
pot2 = MCP3008(channel=1)
|
||||
pot3 = MCP3008(channel=2)
|
||||
led = PWMLED(4)
|
||||
led.source = multiplied(pot1.values, pot2.values, pot3.values)
|
||||
pause()
|
||||
"""
|
||||
def _product(it):
|
||||
p = 1
|
||||
for n in it:
|
||||
p *= n
|
||||
return p
|
||||
for v in zip(*values):
|
||||
yield _product(v)
|
||||
|
||||
|
||||
def queued(values, qsize):
|
||||
"""
|
||||
Queues up readings from *values* (the number of readings queued is
|
||||
@@ -248,6 +296,33 @@ def queued(values, qsize):
|
||||
break
|
||||
|
||||
|
||||
def smoothed(values, qsize, average=mean):
|
||||
"""
|
||||
Queues up readings from *values* (the number of readings queued is
|
||||
determined by *qsize*) and begins yielding the *average* of the last
|
||||
*qsize* values when the queue is full. The larger the *qsize*, the more the
|
||||
values are smoothed. For example, to smooth the analog values read from an
|
||||
ADC::
|
||||
|
||||
from gpiozero import MCP3008
|
||||
from gpiozero.tools import smoothed
|
||||
|
||||
with MCP3008(channel=0) as adc:
|
||||
for smoothvalue in smoothed(adc.values, 5):
|
||||
print smoothvalue
|
||||
"""
|
||||
q = []
|
||||
it = iter(values)
|
||||
for i in range(qsize):
|
||||
q.append(next(it))
|
||||
for i in cycle(range(qsize)):
|
||||
yield average(q)
|
||||
try:
|
||||
q[i] = next(it)
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
|
||||
def pre_delayed(values, delay):
|
||||
"""
|
||||
Waits for *delay* seconds before returning each item from *values*.
|
||||
|
||||
@@ -16,6 +16,14 @@ try:
|
||||
from math import isclose
|
||||
except ImportError:
|
||||
from gpiozero.compat import isclose
|
||||
try:
|
||||
from statistics import mean
|
||||
except ImportError:
|
||||
from gpiozero.compat import mean
|
||||
try:
|
||||
from statistics import median
|
||||
except ImportError:
|
||||
from gpiozero.compat import median
|
||||
|
||||
|
||||
def test_negated():
|
||||
@@ -56,10 +64,24 @@ def test_averaged():
|
||||
assert list(averaged(())) == []
|
||||
assert list(averaged((0, 0.5, 1), (1, 1, 1))) == [0.5, 0.75, 1]
|
||||
|
||||
def test_summed():
|
||||
assert list(summed(())) == []
|
||||
assert list(summed((0, 0.5, 0.5, 1), (1, 0.5, 1, 1))) == [1, 1, 1.5, 2]
|
||||
|
||||
def test_multiplied():
|
||||
assert list(multiplied(())) == []
|
||||
assert list(multiplied((0, 0.5, 0.5, 1), (1, 0.5, 1, 1))) == [0, 0.25, 0.5, 1]
|
||||
|
||||
def test_queued():
|
||||
assert list(queued((1, 2, 3, 4, 5), 5)) == [1]
|
||||
assert list(queued((1, 2, 3, 4, 5, 6), 5)) == [1, 2]
|
||||
|
||||
def test_smoothed():
|
||||
assert list(smoothed((1, 2, 3, 4, 5), 5)) == [3.0]
|
||||
assert list(smoothed((1, 2, 3, 4, 5, 6), 5)) == [3.0, 4.0]
|
||||
assert list(smoothed((1, 1, 1, 4, 5, 5), 5, average=mean)) == [2.4, 3.2]
|
||||
assert list(smoothed((1, 1, 1, 4, 5, 5), 5, average=median)) == [1, 4]
|
||||
|
||||
def test_pre_delayed():
|
||||
start = time()
|
||||
for v in pre_delayed((0, 0, 0), 0.01):
|
||||
|
||||
Reference in New Issue
Block a user