mirror of
				https://github.com/KevinMidboe/python-gpiozero.git
				synced 2025-10-29 17:50:37 +00:00 
			
		
		
		
	Also adds extra parameter validation to the existing source tools, adds input min and max to inverted, and adds many more source tools unit tests.
		
			
				
	
	
		
			557 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			557 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # vim: set fileencoding=utf-8:
 | |
| 
 | |
| from __future__ import (
 | |
|     unicode_literals,
 | |
|     print_function,
 | |
|     absolute_import,
 | |
|     division,
 | |
| )
 | |
| str = type('')
 | |
| 
 | |
| 
 | |
| from random import random
 | |
| from time import sleep
 | |
| try:
 | |
|     from itertools import izip as zip
 | |
| except ImportError:
 | |
|     pass
 | |
| from itertools import cycle
 | |
| from math import sin, cos, pi
 | |
| try:
 | |
|     from statistics import mean
 | |
| except ImportError:
 | |
|     from .compat import mean
 | |
| 
 | |
| 
 | |
| def negated(values):
 | |
|     """
 | |
|     Returns the negation of the supplied values (``True`` becomes ``False``,
 | |
|     and ``False`` becomes ``True``). For example::
 | |
| 
 | |
|         from gpiozero import Button, LED
 | |
|         from gpiozero.tools import negated
 | |
|         from signal import pause
 | |
| 
 | |
|         led = LED(4)
 | |
|         btn = Button(17)
 | |
|         led.source = negated(btn.values)
 | |
|         pause()
 | |
|     """
 | |
|     for v in values:
 | |
|         yield not v
 | |
| 
 | |
| 
 | |
| def inverted(values, input_min=0, input_max=1):
 | |
|     """
 | |
|     Returns the inversion of the supplied values (*input_min* becomes
 | |
|     *input_max*, *input_max* becomes *input_min*, `input_min + 0.1` becomes
 | |
|     `input_max - 0.1`, etc.). All items in *values* are assumed to be between
 | |
|     *input_min* and *input_max* (which default to 0 and 1 respectively), and
 | |
|     the output will be in the same range. For example::
 | |
| 
 | |
|         from gpiozero import MCP3008, PWMLED
 | |
|         from gpiozero.tools import inverted
 | |
|         from signal import pause
 | |
| 
 | |
|         led = PWMLED(4)
 | |
|         pot = MCP3008(channel=0)
 | |
|         led.source = inverted(pot.values)
 | |
|         pause()
 | |
|     """
 | |
|     if input_min >= input_max:
 | |
|         raise ValueError('input_min must be smaller than input_max')
 | |
|     for v in values:
 | |
|         yield input_min + input_max - v
 | |
| 
 | |
| 
 | |
| def scaled(values, output_min, output_max, input_min=0, input_max=1):
 | |
|     """
 | |
|     Returns *values* scaled from *output_min* to *output_max*, assuming that
 | |
|     all items in *values* lie between *input_min* and *input_max* (which
 | |
|     default to 0 and 1 respectively). For example, to control the direction of
 | |
|     a motor (which is represented as a value between -1 and 1) using a
 | |
|     potentiometer (which typically provides values between 0 and 1)::
 | |
| 
 | |
|         from gpiozero import Motor, MCP3008
 | |
|         from gpiozero.tools import scaled
 | |
|         from signal import pause
 | |
| 
 | |
|         motor = Motor(20, 21)
 | |
|         pot = MCP3008(channel=0)
 | |
|         motor.source = scaled(pot.values, -1, 1)
 | |
|         pause()
 | |
| 
 | |
|     .. warning::
 | |
| 
 | |
|         If *values* contains elements that lie outside *input_min* to
 | |
|         *input_max* (inclusive) then the function will not produce values that
 | |
|         lie within *output_min* to *output_max* (inclusive).
 | |
|     """
 | |
|     if input_min >= input_max:
 | |
|         raise ValueError('input_min must be smaller than input_max')
 | |
|     input_size = input_max - input_min
 | |
|     output_size = output_max - output_min
 | |
|     for v in values:
 | |
|         yield (((v - input_min) / input_size) * output_size) + output_min
 | |
| 
 | |
| 
 | |
| def clamped(values, output_min=0, output_max=1):
 | |
|     """
 | |
|     Returns *values* clamped from *output_min* to *output_max*, i.e. any items
 | |
|     less than *output_min* will be returned as *output_min* and any items
 | |
|     larger than *output_max* will be returned as *output_max* (these default to
 | |
|     0 and 1 respectively). For example::
 | |
| 
 | |
|         from gpiozero import PWMLED, MCP3008
 | |
|         from gpiozero.tools import clamped
 | |
|         from signal import pause
 | |
| 
 | |
|         led = PWMLED(4)
 | |
|         pot = MCP3008(channel=0)
 | |
|         led.source = clamped(pot.values, 0.5, 1.0)
 | |
|         pause()
 | |
|     """
 | |
|     if output_min >= output_max:
 | |
|         raise ValueError('output_min must be smaller than output_max')
 | |
|     for v in values:
 | |
|         yield min(max(v, output_min), output_max)
 | |
| 
 | |
| 
 | |
| def absoluted(values):
 | |
|     """
 | |
|     Returns *values* with all negative elements negated (so that they're
 | |
|     positive). For example::
 | |
| 
 | |
|         from gpiozero import PWMLED, Motor, MCP3008
 | |
|         from gpiozero.tools import absoluted, scaled
 | |
|         from signal import pause
 | |
| 
 | |
|         led = PWMLED(4)
 | |
|         motor = Motor(22, 27)
 | |
|         pot = MCP3008(channel=0)
 | |
|         motor.source = scaled(pot.values, -1, 1)
 | |
|         led.source = absoluted(motor.values)
 | |
|         pause()
 | |
|     """
 | |
|     for v in values:
 | |
|         yield abs(v)
 | |
| 
 | |
| 
 | |
| def quantized(values, steps, input_min=0, input_max=1):
 | |
|     """
 | |
|     Returns *values* quantized to *steps* increments. All items in *values* are
 | |
|     assumed to be between *input_min* and *input_max* (which default to 0 and
 | |
|     1 respectively), and the output will be in the same range.
 | |
| 
 | |
|     For example, to quantize values between 0 and 1 to 5 "steps" (0.0, 0.25,
 | |
|     0.5, 0.75, 1.0)::
 | |
| 
 | |
|         from gpiozero import PWMLED, MCP3008
 | |
|         from gpiozero.tools import quantized
 | |
|         from signal import pause
 | |
| 
 | |
|         led = PWMLED(4)
 | |
|         pot = MCP3008(channel=0)
 | |
|         led.source = quantized(pot.values, 4)
 | |
|         pause()
 | |
|     """
 | |
|     if steps < 1:
 | |
|         raise ValueError("steps must be 1 or larger")
 | |
|     if input_min >= input_max:
 | |
|         raise ValueError('input_min must be smaller than input_max')
 | |
|     input_size = input_max - input_min
 | |
|     for v in scaled(values, 0, 1, input_min, input_max):
 | |
|         yield ((int(v * steps) / steps) * input_size) + input_min
 | |
| 
 | |
| 
 | |
| def booleanized(values, min_value, max_value, hysteresis=0):
 | |
|     """
 | |
|     Returns True for each item in *values* between *min_value* and
 | |
|     *max_value*, and False otherwise. *hysteresis* can optionally be used to
 | |
|     add `hysteresis`_ which prevents the output value rapidly flipping when
 | |
|     the input value is fluctuating near the *min_value* or *max_value*
 | |
|     thresholds. For example, to light an LED only when a potentiometer is
 | |
|     between 1/4 and 3/4 of its full range::
 | |
| 
 | |
|         from gpiozero import LED, MCP3008
 | |
|         from gpiozero.tools import booleanized
 | |
|         from signal import pause
 | |
| 
 | |
|         led = LED(4)
 | |
|         pot = MCP3008(channel=0)
 | |
|         led.source = booleanized(pot.values, 0.25, 0.75)
 | |
|         pause()
 | |
| 
 | |
|     .. _hysteresis: https://en.wikipedia.org/wiki/Hysteresis
 | |
|     """
 | |
|     if min_value >= max_value:
 | |
|         raise ValueError('min_value must be smaller than max_value')
 | |
|     min_value = float(min_value)
 | |
|     max_value = float(max_value)
 | |
|     if hysteresis < 0:
 | |
|         raise ValueError("hysteresis must be 0 or larger")
 | |
|     else:
 | |
|         hysteresis = float(hysteresis)
 | |
|     if (max_value - min_value) <= hysteresis:
 | |
|         raise ValueError('The gap between min_value and max_value must be larger than hysteresis')
 | |
|     last_state = None
 | |
|     for v in values:
 | |
|         if v < min_value:
 | |
|             new_state = 'below'
 | |
|         elif v > max_value:
 | |
|             new_state = 'above'
 | |
|         else:
 | |
|             new_state = 'in'
 | |
|         switch = False
 | |
|         if last_state == None or not hysteresis:
 | |
|             switch = True
 | |
|         elif new_state == last_state:
 | |
|             pass
 | |
|         else: # new_state != last_state
 | |
|             if last_state == 'below' and new_state == 'in':
 | |
|                 switch = v >= min_value + hysteresis
 | |
|             elif last_state == 'in' and new_state == 'below':
 | |
|                 switch = v < min_value - hysteresis
 | |
|             elif last_state == 'in' and new_state == 'above':
 | |
|                 switch = v > max_value + hysteresis
 | |
|             elif last_state == 'above' and new_state == 'in':
 | |
|                 switch = v <= max_value - hysteresis
 | |
|             else: # above->below or below->above
 | |
|                 switch = True
 | |
|         if switch:
 | |
|             last_state = new_state
 | |
|         yield last_state == 'in'
 | |
| 
 | |
| 
 | |
| def all_values(*values):
 | |
|     """
 | |
|     Returns the `logical conjunction`_ of all supplied values (the result is
 | |
|     only ``True`` if and only if all input values are simultaneously ``True``).
 | |
|     One or more *values* can be specified. For example, to light an
 | |
|     :class:`LED` only when *both* buttons are pressed::
 | |
| 
 | |
|         from gpiozero import LED, Button
 | |
|         from gpiozero.tools import all_values
 | |
|         from signal import pause
 | |
| 
 | |
|         led = LED(4)
 | |
|         btn1 = Button(20)
 | |
|         btn2 = Button(21)
 | |
|         led.source = all_values(btn1.values, btn2.values)
 | |
|         pause()
 | |
| 
 | |
|     .. _logical conjunction: https://en.wikipedia.org/wiki/Logical_conjunction
 | |
|     """
 | |
|     for v in zip(*values):
 | |
|         yield all(v)
 | |
| 
 | |
| 
 | |
| def any_values(*values):
 | |
|     """
 | |
|     Returns the `logical disjunction`_ of all supplied values (the result is
 | |
|     ``True`` if any of the input values are currently ``True``). One or more
 | |
|     *values* can be specified. For example, to light an :class:`LED` when
 | |
|     *any* button is pressed::
 | |
| 
 | |
|         from gpiozero import LED, Button
 | |
|         from gpiozero.tools import any_values
 | |
|         from signal import pause
 | |
| 
 | |
|         led = LED(4)
 | |
|         btn1 = Button(20)
 | |
|         btn2 = Button(21)
 | |
|         led.source = any_values(btn1.values, btn2.values)
 | |
|         pause()
 | |
| 
 | |
|     .. _logical disjunction: https://en.wikipedia.org/wiki/Logical_disjunction
 | |
|     """
 | |
|     for v in zip(*values):
 | |
|         yield any(v)
 | |
| 
 | |
| 
 | |
| def averaged(*values):
 | |
|     """
 | |
|     Returns the mean of all supplied values. One or more *values* can be
 | |
|     specified. For example, to light a :class:`PWMLED` as the average of
 | |
|     several potentiometers connected to an :class:`MCP3008` ADC::
 | |
| 
 | |
|         from gpiozero import MCP3008, PWMLED
 | |
|         from gpiozero.tools import averaged
 | |
|         from signal import pause
 | |
| 
 | |
|         pot1 = MCP3008(channel=0)
 | |
|         pot2 = MCP3008(channel=1)
 | |
|         pot3 = MCP3008(channel=2)
 | |
|         led = PWMLED(4)
 | |
|         led.source = averaged(pot1.values, pot2.values, pot3.values)
 | |
|         pause()
 | |
|     """
 | |
|     for v in zip(*values):
 | |
|         yield mean(v)
 | |
| 
 | |
| 
 | |
| def summed(*values):
 | |
|     """
 | |
|     Returns the sum of all supplied values. One or more *values* can be
 | |
|     specified. For example, to light a :class:`PWMLED` as the (scaled) sum of
 | |
|     several potentiometers connected to an :class:`MCP3008` ADC::
 | |
| 
 | |
|         from gpiozero import MCP3008, PWMLED
 | |
|         from gpiozero.tools import summed, scaled
 | |
|         from signal import pause
 | |
| 
 | |
|         pot1 = MCP3008(channel=0)
 | |
|         pot2 = MCP3008(channel=1)
 | |
|         pot3 = MCP3008(channel=2)
 | |
|         led = PWMLED(4)
 | |
|         led.source = scaled(summed(pot1.values, pot2.values, pot3.values), 0, 1, 0, 3)
 | |
|         pause()
 | |
|     """
 | |
|     for v in zip(*values):
 | |
|         yield sum(v)
 | |
| 
 | |
| 
 | |
| def multiplied(*values):
 | |
|     """
 | |
|     Returns the product of all supplied values. One or more *values* can be
 | |
|     specified. For example, to light a :class:`PWMLED` as the product (i.e.
 | |
|     multiplication) of several potentiometers connected to an :class:`MCP3008`
 | |
|     ADC::
 | |
| 
 | |
|         from gpiozero import MCP3008, PWMLED
 | |
|         from gpiozero.tools import multiplied
 | |
|         from signal import pause
 | |
| 
 | |
|         pot1 = MCP3008(channel=0)
 | |
|         pot2 = MCP3008(channel=1)
 | |
|         pot3 = MCP3008(channel=2)
 | |
|         led = PWMLED(4)
 | |
|         led.source = multiplied(pot1.values, pot2.values, pot3.values)
 | |
|         pause()
 | |
|     """
 | |
|     def _product(it):
 | |
|         p = 1
 | |
|         for n in it:
 | |
|             p *= n
 | |
|         return p
 | |
|     for v in zip(*values):
 | |
|         yield _product(v)
 | |
| 
 | |
| 
 | |
| def queued(values, qsize):
 | |
|     """
 | |
|     Queues up readings from *values* (the number of readings queued is
 | |
|     determined by *qsize*) and begins yielding values only when the queue is
 | |
|     full. For example, to "cascade" values along a sequence of LEDs::
 | |
| 
 | |
|         from gpiozero import LEDBoard, Button
 | |
|         from gpiozero.tools import queued
 | |
|         from signal import pause
 | |
| 
 | |
|         leds = LEDBoard(5, 6, 13, 19, 26)
 | |
|         btn = Button(17)
 | |
|         for i in range(4):
 | |
|             leds[i].source = queued(leds[i + 1].values, 5)
 | |
|             leds[i].source_delay = 0.01
 | |
|         leds[4].source = btn.values
 | |
|         pause()
 | |
|     """
 | |
|     if qsize < 1:
 | |
|         raise ValueError("qsize must be 1 or larger")
 | |
|     q = []
 | |
|     it = iter(values)
 | |
|     for i in range(qsize):
 | |
|         q.append(next(it))
 | |
|     for i in cycle(range(qsize)):
 | |
|         yield q[i]
 | |
|         try:
 | |
|             q[i] = next(it)
 | |
|         except StopIteration:
 | |
|             break
 | |
| 
 | |
| 
 | |
| def smoothed(values, qsize, average=mean):
 | |
|     """
 | |
|     Queues up readings from *values* (the number of readings queued is
 | |
|     determined by *qsize*) and begins yielding the *average* of the last
 | |
|     *qsize* values when the queue is full. The larger the *qsize*, the more the
 | |
|     values are smoothed. For example, to smooth the analog values read from an
 | |
|     ADC::
 | |
| 
 | |
|         from gpiozero import MCP3008
 | |
|         from gpiozero.tools import smoothed
 | |
| 
 | |
|         with MCP3008(channel=0) as adc:
 | |
|             for value in smoothed(adc.values, 5):
 | |
|                 print value
 | |
|     """
 | |
|     if qsize < 1:
 | |
|         raise ValueError("qsize must be 1 or larger")
 | |
|     q = []
 | |
|     it = iter(values)
 | |
|     for i in range(qsize):
 | |
|         q.append(next(it))
 | |
|     for i in cycle(range(qsize)):
 | |
|         yield average(q)
 | |
|         try:
 | |
|             q[i] = next(it)
 | |
|         except StopIteration:
 | |
|             break
 | |
| 
 | |
| 
 | |
| def pre_delayed(values, delay):
 | |
|     """
 | |
|     Waits for *delay* seconds before returning each item from *values*.
 | |
|     """
 | |
|     if delay < 0:
 | |
|         raise ValueError("delay must be 0 or larger")
 | |
|     for v in values:
 | |
|         sleep(delay)
 | |
|         yield v
 | |
| 
 | |
| 
 | |
| def post_delayed(values, delay):
 | |
|     """
 | |
|     Waits for *delay* seconds after returning each item from *values*.
 | |
|     """
 | |
|     if delay < 0:
 | |
|         raise ValueError("delay must be 0 or larger")
 | |
|     for v in values:
 | |
|         yield v
 | |
|         sleep(delay)
 | |
| 
 | |
| 
 | |
| def pre_periodic_filtered(values, block, repeat_after):
 | |
|     """
 | |
|     Blocks the first *block* items from *values*, repeating the block after
 | |
|     every *repeat_after* items, if *repeat_after* is non-zero. For example, to
 | |
|     discard the first 50 values read from an ADC::
 | |
| 
 | |
|         from gpiozero import MCP3008
 | |
|         from gpiozero.tools import pre_periodic_filtered
 | |
| 
 | |
|         with MCP3008(channel=0) as adc:
 | |
|             for value in pre_periodic_filtered(adc.values, 50, 0):
 | |
|                 print value
 | |
| 
 | |
|     Or to only display every even item read from an ADC::
 | |
| 
 | |
|         from gpiozero import MCP3008
 | |
|         from gpiozero.tools import pre_periodic_filtered
 | |
| 
 | |
|         with MCP3008(channel=0) as adc:
 | |
|             for value in pre_periodic_filtered(adc.values, 1, 1):
 | |
|                 print value
 | |
|     """
 | |
|     if block < 1:
 | |
|         raise ValueError("block must be 1 or larger")
 | |
|     if repeat_after < 0:
 | |
|         raise ValueError("repeat_after must be 0 or larger")
 | |
|     it = iter(values)
 | |
|     if repeat_after == 0:
 | |
|         for _ in range(block):
 | |
|             next(it)
 | |
|         while True:
 | |
|             yield next(it)
 | |
|     else:
 | |
|         while True:
 | |
|             for _ in range(block):
 | |
|                 next(it)
 | |
|             for _ in range(repeat_after):
 | |
|                 yield next(it)
 | |
| 
 | |
| 
 | |
| def post_periodic_filtered(values, repeat_after, block):
 | |
|     """
 | |
|     After every *repeat_after* items, blocks the next *block* items from
 | |
|     *values*. Note that unlike :func:`pre_periodic_filtered`, *repeat_after*
 | |
|     can't be 0. For example, to block every tenth item read from an ADC::
 | |
| 
 | |
|         from gpiozero import MCP3008
 | |
|         from gpiozero.tools import post_periodic_filtered
 | |
| 
 | |
|         with MCP3008(channel=0) as adc:
 | |
|             for value in post_periodic_filtered(adc.values, 9, 1):
 | |
|                 print value
 | |
|     """
 | |
|     if repeat_after < 1:
 | |
|         raise ValueError("repeat_after must be 1 or larger")
 | |
|     if block < 1:
 | |
|         raise ValueError("block must be 1 or larger")
 | |
|     it = iter(values)
 | |
|     while True:
 | |
|         for _ in range(repeat_after):
 | |
|             yield next(it)
 | |
|         for _ in range(block):
 | |
|             next(it)
 | |
| 
 | |
| 
 | |
| def random_values():
 | |
|     """
 | |
|     Provides an infinite source of random values between 0 and 1. For example,
 | |
|     to produce a "flickering candle" effect with an LED::
 | |
| 
 | |
|         from gpiozero import PWMLED
 | |
|         from gpiozero.tools import random_values
 | |
|         from signal import pause
 | |
| 
 | |
|         led = PWMLED(4)
 | |
|         led.source = random_values()
 | |
|         pause()
 | |
| 
 | |
|     If you require a wider range than 0 to 1, see :func:`scaled`.
 | |
|     """
 | |
|     while True:
 | |
|         yield random()
 | |
| 
 | |
| 
 | |
| def sin_values(period=360):
 | |
|     """
 | |
|     Provides an infinite source of values representing a sine wave (from -1 to
 | |
|     +1) which repeats every *period* values. For example, to produce a "siren"
 | |
|     effect with a couple of LEDs that repeats once a second::
 | |
| 
 | |
|         from gpiozero import PWMLED
 | |
|         from gpiozero.tools import sin_values, scaled, inverted
 | |
|         from signal import pause
 | |
| 
 | |
|         red = PWMLED(2)
 | |
|         blue = PWMLED(3)
 | |
|         red.source_delay = 0.01
 | |
|         blue.source_delay = 0.01
 | |
|         red.source = scaled(sin_values(100), 0, 1, -1, 1)
 | |
|         blue.source = inverted(red.values)
 | |
|         pause()
 | |
| 
 | |
|     If you require a different range than -1 to +1, see :func:`scaled`.
 | |
|     """
 | |
|     angles = (2 * pi * i / period for i in range(period))
 | |
|     for a in cycle(angles):
 | |
|         yield sin(a)
 | |
| 
 | |
| 
 | |
| def cos_values(period=360):
 | |
|     """
 | |
|     Provides an infinite source of values representing a cosine wave (from -1
 | |
|     to +1) which repeats every *period* values. For example, to produce a
 | |
|     "siren" effect with a couple of LEDs that repeats once a second::
 | |
| 
 | |
|         from gpiozero import PWMLED
 | |
|         from gpiozero.tools import cos_values, scaled, inverted
 | |
|         from signal import pause
 | |
| 
 | |
|         red = PWMLED(2)
 | |
|         blue = PWMLED(3)
 | |
|         red.source_delay = 0.01
 | |
|         blue.source_delay = 0.01
 | |
|         red.source = scaled(cos_values(100), 0, 1, -1, 1)
 | |
|         blue.source = inverted(red.values)
 | |
|         pause()
 | |
| 
 | |
|     If you require a different range than -1 to +1, see :func:`scaled`.
 | |
|     """
 | |
|     angles = (2 * pi * i / period for i in range(period))
 | |
|     for a in cycle(angles):
 | |
|         yield cos(a)
 | |
| 
 |