mirror of
				https://github.com/KevinMidboe/python-gpiozero.git
				synced 2025-10-29 17:50:37 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			602 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			602 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # vim: set fileencoding=utf-8:
 | |
| 
 | |
| from __future__ import (
 | |
|     unicode_literals,
 | |
|     print_function,
 | |
|     absolute_import,
 | |
|     division,
 | |
| )
 | |
| str = type('')
 | |
| 
 | |
| 
 | |
| from random import random
 | |
| from time import sleep
 | |
| try:
 | |
|     from itertools import izip as zip
 | |
| except ImportError:
 | |
|     pass
 | |
| from itertools import cycle
 | |
| from math import sin, cos, pi
 | |
| try:
 | |
|     from statistics import mean
 | |
| except ImportError:
 | |
|     from .compat import mean
 | |
| 
 | |
| 
 | |
| def negated(values):
 | |
|     """
 | |
|     Returns the negation of the supplied values (``True`` becomes ``False``,
 | |
|     and ``False`` becomes ``True``). For example::
 | |
| 
 | |
|         from gpiozero import Button, LED
 | |
|         from gpiozero.tools import negated
 | |
|         from signal import pause
 | |
| 
 | |
|         led = LED(4)
 | |
|         btn = Button(17)
 | |
|         led.source = negated(btn.values)
 | |
|         pause()
 | |
|     """
 | |
|     for v in values:
 | |
|         yield not v
 | |
| 
 | |
| 
 | |
| def inverted(values, input_min=0, input_max=1):
 | |
|     """
 | |
|     Returns the inversion of the supplied values (*input_min* becomes
 | |
|     *input_max*, *input_max* becomes *input_min*, `input_min + 0.1` becomes
 | |
|     `input_max - 0.1`, etc.). All items in *values* are assumed to be between
 | |
|     *input_min* and *input_max* (which default to 0 and 1 respectively), and
 | |
|     the output will be in the same range. For example::
 | |
| 
 | |
|         from gpiozero import MCP3008, PWMLED
 | |
|         from gpiozero.tools import inverted
 | |
|         from signal import pause
 | |
| 
 | |
|         led = PWMLED(4)
 | |
|         pot = MCP3008(channel=0)
 | |
|         led.source = inverted(pot.values)
 | |
|         pause()
 | |
|     """
 | |
|     if input_min >= input_max:
 | |
|         raise ValueError('input_min must be smaller than input_max')
 | |
|     for v in values:
 | |
|         yield input_min + input_max - v
 | |
| 
 | |
| 
 | |
| def scaled(values, output_min, output_max, input_min=0, input_max=1):
 | |
|     """
 | |
|     Returns *values* scaled from *output_min* to *output_max*, assuming that
 | |
|     all items in *values* lie between *input_min* and *input_max* (which
 | |
|     default to 0 and 1 respectively). For example, to control the direction of
 | |
|     a motor (which is represented as a value between -1 and 1) using a
 | |
|     potentiometer (which typically provides values between 0 and 1)::
 | |
| 
 | |
|         from gpiozero import Motor, MCP3008
 | |
|         from gpiozero.tools import scaled
 | |
|         from signal import pause
 | |
| 
 | |
|         motor = Motor(20, 21)
 | |
|         pot = MCP3008(channel=0)
 | |
|         motor.source = scaled(pot.values, -1, 1)
 | |
|         pause()
 | |
| 
 | |
|     .. warning::
 | |
| 
 | |
|         If *values* contains elements that lie outside *input_min* to
 | |
|         *input_max* (inclusive) then the function will not produce values that
 | |
|         lie within *output_min* to *output_max* (inclusive).
 | |
|     """
 | |
|     if input_min >= input_max:
 | |
|         raise ValueError('input_min must be smaller than input_max')
 | |
|     input_size = input_max - input_min
 | |
|     output_size = output_max - output_min
 | |
|     for v in values:
 | |
|         yield (((v - input_min) / input_size) * output_size) + output_min
 | |
| 
 | |
| 
 | |
| def clamped(values, output_min=0, output_max=1):
 | |
|     """
 | |
|     Returns *values* clamped from *output_min* to *output_max*, i.e. any items
 | |
|     less than *output_min* will be returned as *output_min* and any items
 | |
|     larger than *output_max* will be returned as *output_max* (these default to
 | |
|     0 and 1 respectively). For example::
 | |
| 
 | |
|         from gpiozero import PWMLED, MCP3008
 | |
|         from gpiozero.tools import clamped
 | |
|         from signal import pause
 | |
| 
 | |
|         led = PWMLED(4)
 | |
|         pot = MCP3008(channel=0)
 | |
|         
 | |
|         led.source = clamped(pot.values, 0.5, 1.0)
 | |
|         
 | |
|         pause()
 | |
|     """
 | |
|     if output_min >= output_max:
 | |
|         raise ValueError('output_min must be smaller than output_max')
 | |
|     for v in values:
 | |
|         yield min(max(v, output_min), output_max)
 | |
| 
 | |
| 
 | |
| def absoluted(values):
 | |
|     """
 | |
|     Returns *values* with all negative elements negated (so that they're
 | |
|     positive). For example::
 | |
| 
 | |
|         from gpiozero import PWMLED, Motor, MCP3008
 | |
|         from gpiozero.tools import absoluted, scaled
 | |
|         from signal import pause
 | |
| 
 | |
|         led = PWMLED(4)
 | |
|         motor = Motor(22, 27)
 | |
|         pot = MCP3008(channel=0)
 | |
|         
 | |
|         motor.source = scaled(pot.values, -1, 1)
 | |
|         led.source = absoluted(motor.values)
 | |
|         
 | |
|         pause()
 | |
|     """
 | |
|     for v in values:
 | |
|         yield abs(v)
 | |
| 
 | |
| 
 | |
| def quantized(values, steps, input_min=0, input_max=1):
 | |
|     """
 | |
|     Returns *values* quantized to *steps* increments. All items in *values* are
 | |
|     assumed to be between *input_min* and *input_max* (which default to 0 and
 | |
|     1 respectively), and the output will be in the same range.
 | |
| 
 | |
|     For example, to quantize values between 0 and 1 to 5 "steps" (0.0, 0.25,
 | |
|     0.5, 0.75, 1.0)::
 | |
| 
 | |
|         from gpiozero import PWMLED, MCP3008
 | |
|         from gpiozero.tools import quantized
 | |
|         from signal import pause
 | |
| 
 | |
|         led = PWMLED(4)
 | |
|         pot = MCP3008(channel=0)
 | |
|         led.source = quantized(pot.values, 4)
 | |
|         pause()
 | |
|     """
 | |
|     if steps < 1:
 | |
|         raise ValueError("steps must be 1 or larger")
 | |
|     if input_min >= input_max:
 | |
|         raise ValueError('input_min must be smaller than input_max')
 | |
|     input_size = input_max - input_min
 | |
|     for v in scaled(values, 0, 1, input_min, input_max):
 | |
|         yield ((int(v * steps) / steps) * input_size) + input_min
 | |
| 
 | |
| 
 | |
| def booleanized(values, min_value, max_value, hysteresis=0):
 | |
|     """
 | |
|     Returns True for each item in *values* between *min_value* and
 | |
|     *max_value*, and False otherwise. *hysteresis* can optionally be used to
 | |
|     add `hysteresis`_ which prevents the output value rapidly flipping when
 | |
|     the input value is fluctuating near the *min_value* or *max_value*
 | |
|     thresholds. For example, to light an LED only when a potentiometer is
 | |
|     between 1/4 and 3/4 of its full range::
 | |
| 
 | |
|         from gpiozero import LED, MCP3008
 | |
|         from gpiozero.tools import booleanized
 | |
|         from signal import pause
 | |
| 
 | |
|         led = LED(4)
 | |
|         pot = MCP3008(channel=0)
 | |
|         led.source = booleanized(pot.values, 0.25, 0.75)
 | |
|         pause()
 | |
| 
 | |
|     .. _hysteresis: https://en.wikipedia.org/wiki/Hysteresis
 | |
|     """
 | |
|     if min_value >= max_value:
 | |
|         raise ValueError('min_value must be smaller than max_value')
 | |
|     min_value = float(min_value)
 | |
|     max_value = float(max_value)
 | |
|     if hysteresis < 0:
 | |
|         raise ValueError("hysteresis must be 0 or larger")
 | |
|     else:
 | |
|         hysteresis = float(hysteresis)
 | |
|     if (max_value - min_value) <= hysteresis:
 | |
|         raise ValueError('The gap between min_value and max_value must be larger than hysteresis')
 | |
|     last_state = None
 | |
|     for v in values:
 | |
|         if v < min_value:
 | |
|             new_state = 'below'
 | |
|         elif v > max_value:
 | |
|             new_state = 'above'
 | |
|         else:
 | |
|             new_state = 'in'
 | |
|         switch = False
 | |
|         if last_state == None or not hysteresis:
 | |
|             switch = True
 | |
|         elif new_state == last_state:
 | |
|             pass
 | |
|         else: # new_state != last_state
 | |
|             if last_state == 'below' and new_state == 'in':
 | |
|                 switch = v >= min_value + hysteresis
 | |
|             elif last_state == 'in' and new_state == 'below':
 | |
|                 switch = v < min_value - hysteresis
 | |
|             elif last_state == 'in' and new_state == 'above':
 | |
|                 switch = v > max_value + hysteresis
 | |
|             elif last_state == 'above' and new_state == 'in':
 | |
|                 switch = v <= max_value - hysteresis
 | |
|             else: # above->below or below->above
 | |
|                 switch = True
 | |
|         if switch:
 | |
|             last_state = new_state
 | |
|         yield last_state == 'in'
 | |
| 
 | |
| 
 | |
| def all_values(*values):
 | |
|     """
 | |
|     Returns the `logical conjunction`_ of all supplied values (the result is
 | |
|     only ``True`` if and only if all input values are simultaneously ``True``).
 | |
|     One or more *values* can be specified. For example, to light an
 | |
|     :class:`LED` only when *both* buttons are pressed::
 | |
| 
 | |
|         from gpiozero import LED, Button
 | |
|         from gpiozero.tools import all_values
 | |
|         from signal import pause
 | |
| 
 | |
|         led = LED(4)
 | |
|         btn1 = Button(20)
 | |
|         btn2 = Button(21)
 | |
|         led.source = all_values(btn1.values, btn2.values)
 | |
|         pause()
 | |
| 
 | |
|     .. _logical conjunction: https://en.wikipedia.org/wiki/Logical_conjunction
 | |
|     """
 | |
|     for v in zip(*values):
 | |
|         yield all(v)
 | |
| 
 | |
| 
 | |
| def any_values(*values):
 | |
|     """
 | |
|     Returns the `logical disjunction`_ of all supplied values (the result is
 | |
|     ``True`` if any of the input values are currently ``True``). One or more
 | |
|     *values* can be specified. For example, to light an :class:`LED` when
 | |
|     *any* button is pressed::
 | |
| 
 | |
|         from gpiozero import LED, Button
 | |
|         from gpiozero.tools import any_values
 | |
|         from signal import pause
 | |
| 
 | |
|         led = LED(4)
 | |
|         btn1 = Button(20)
 | |
|         btn2 = Button(21)
 | |
|         led.source = any_values(btn1.values, btn2.values)
 | |
|         pause()
 | |
| 
 | |
|     .. _logical disjunction: https://en.wikipedia.org/wiki/Logical_disjunction
 | |
|     """
 | |
|     for v in zip(*values):
 | |
|         yield any(v)
 | |
| 
 | |
| 
 | |
| def averaged(*values):
 | |
|     """
 | |
|     Returns the mean of all supplied values. One or more *values* can be
 | |
|     specified. For example, to light a :class:`PWMLED` as the average of
 | |
|     several potentiometers connected to an :class:`MCP3008` ADC::
 | |
| 
 | |
|         from gpiozero import MCP3008, PWMLED
 | |
|         from gpiozero.tools import averaged
 | |
|         from signal import pause
 | |
| 
 | |
|         pot1 = MCP3008(channel=0)
 | |
|         pot2 = MCP3008(channel=1)
 | |
|         pot3 = MCP3008(channel=2)
 | |
|         led = PWMLED(4)
 | |
|         
 | |
|         led.source = averaged(pot1.values, pot2.values, pot3.values)
 | |
|         
 | |
|         pause()
 | |
|     """
 | |
|     for v in zip(*values):
 | |
|         yield mean(v)
 | |
| 
 | |
| 
 | |
| def summed(*values):
 | |
|     """
 | |
|     Returns the sum of all supplied values. One or more *values* can be
 | |
|     specified. For example, to light a :class:`PWMLED` as the (scaled) sum of
 | |
|     several potentiometers connected to an :class:`MCP3008` ADC::
 | |
| 
 | |
|         from gpiozero import MCP3008, PWMLED
 | |
|         from gpiozero.tools import summed, scaled
 | |
|         from signal import pause
 | |
| 
 | |
|         pot1 = MCP3008(channel=0)
 | |
|         pot2 = MCP3008(channel=1)
 | |
|         pot3 = MCP3008(channel=2)
 | |
|         led = PWMLED(4)
 | |
| 
 | |
|         led.source = scaled(summed(pot1.values, pot2.values, pot3.values), 0, 1, 0, 3)
 | |
|         
 | |
|         pause()
 | |
|     """
 | |
|     for v in zip(*values):
 | |
|         yield sum(v)
 | |
| 
 | |
| 
 | |
| def multiplied(*values):
 | |
|     """
 | |
|     Returns the product of all supplied values. One or more *values* can be
 | |
|     specified. For example, to light a :class:`PWMLED` as the product (i.e.
 | |
|     multiplication) of several potentiometers connected to an :class:`MCP3008`
 | |
|     ADC::
 | |
| 
 | |
|         from gpiozero import MCP3008, PWMLED
 | |
|         from gpiozero.tools import multiplied
 | |
|         from signal import pause
 | |
| 
 | |
|         pot1 = MCP3008(channel=0)
 | |
|         pot2 = MCP3008(channel=1)
 | |
|         pot3 = MCP3008(channel=2)
 | |
|         led = PWMLED(4)
 | |
|         
 | |
|         led.source = multiplied(pot1.values, pot2.values, pot3.values)
 | |
|         
 | |
|         pause()
 | |
|     """
 | |
|     def _product(it):
 | |
|         p = 1
 | |
|         for n in it:
 | |
|             p *= n
 | |
|         return p
 | |
|     for v in zip(*values):
 | |
|         yield _product(v)
 | |
| 
 | |
| 
 | |
| def queued(values, qsize):
 | |
|     """
 | |
|     Queues up readings from *values* (the number of readings queued is
 | |
|     determined by *qsize*) and begins yielding values only when the queue is
 | |
|     full. For example, to "cascade" values along a sequence of LEDs::
 | |
| 
 | |
|         from gpiozero import LEDBoard, Button
 | |
|         from gpiozero.tools import queued
 | |
|         from signal import pause
 | |
| 
 | |
|         leds = LEDBoard(5, 6, 13, 19, 26)
 | |
|         btn = Button(17)
 | |
|         
 | |
|         for i in range(4):
 | |
|             leds[i].source = queued(leds[i + 1].values, 5)
 | |
|             leds[i].source_delay = 0.01
 | |
|             
 | |
|         leds[4].source = btn.values
 | |
|         
 | |
|         pause()
 | |
|     """
 | |
|     if qsize < 1:
 | |
|         raise ValueError("qsize must be 1 or larger")
 | |
|     q = []
 | |
|     it = iter(values)
 | |
|     for i in range(qsize):
 | |
|         q.append(next(it))
 | |
|     for i in cycle(range(qsize)):
 | |
|         yield q[i]
 | |
|         try:
 | |
|             q[i] = next(it)
 | |
|         except StopIteration:
 | |
|             break
 | |
| 
 | |
| 
 | |
| def smoothed(values, qsize, average=mean):
 | |
|     """
 | |
|     Queues up readings from *values* (the number of readings queued is
 | |
|     determined by *qsize*) and begins yielding the *average* of the last
 | |
|     *qsize* values when the queue is full. The larger the *qsize*, the more the
 | |
|     values are smoothed. For example, to smooth the analog values read from an
 | |
|     ADC::
 | |
| 
 | |
|         from gpiozero import MCP3008
 | |
|         from gpiozero.tools import smoothed
 | |
| 
 | |
|         adc = MCP3008(channel=0)
 | |
|         
 | |
|         for value in smoothed(adc.values, 5):
 | |
|             print(value)
 | |
|     """
 | |
|     if qsize < 1:
 | |
|         raise ValueError("qsize must be 1 or larger")
 | |
|     q = []
 | |
|     it = iter(values)
 | |
|     for i in range(qsize):
 | |
|         q.append(next(it))
 | |
|     for i in cycle(range(qsize)):
 | |
|         yield average(q)
 | |
|         try:
 | |
|             q[i] = next(it)
 | |
|         except StopIteration:
 | |
|             break
 | |
| 
 | |
| 
 | |
| def pre_delayed(values, delay):
 | |
|     """
 | |
|     Waits for *delay* seconds before returning each item from *values*.
 | |
|     """
 | |
|     if delay < 0:
 | |
|         raise ValueError("delay must be 0 or larger")
 | |
|     for v in values:
 | |
|         sleep(delay)
 | |
|         yield v
 | |
| 
 | |
| 
 | |
| def post_delayed(values, delay):
 | |
|     """
 | |
|     Waits for *delay* seconds after returning each item from *values*.
 | |
|     """
 | |
|     if delay < 0:
 | |
|         raise ValueError("delay must be 0 or larger")
 | |
|     for v in values:
 | |
|         yield v
 | |
|         sleep(delay)
 | |
| 
 | |
| 
 | |
| def pre_periodic_filtered(values, block, repeat_after):
 | |
|     """
 | |
|     Blocks the first *block* items from *values*, repeating the block after
 | |
|     every *repeat_after* items, if *repeat_after* is non-zero. For example, to
 | |
|     discard the first 50 values read from an ADC::
 | |
| 
 | |
|         from gpiozero import MCP3008
 | |
|         from gpiozero.tools import pre_periodic_filtered
 | |
| 
 | |
|         adc = MCP3008(channel=0)
 | |
|         
 | |
|         for value in pre_periodic_filtered(adc.values, 50, 0):
 | |
|             print(value)
 | |
| 
 | |
|     Or to only display every even item read from an ADC::
 | |
| 
 | |
|         from gpiozero import MCP3008
 | |
|         from gpiozero.tools import pre_periodic_filtered
 | |
| 
 | |
|         adc = MCP3008(channel=0)
 | |
|         
 | |
|         for value in pre_periodic_filtered(adc.values, 1, 1):
 | |
|             print(value)
 | |
|     """
 | |
|     if block < 1:
 | |
|         raise ValueError("block must be 1 or larger")
 | |
|     if repeat_after < 0:
 | |
|         raise ValueError("repeat_after must be 0 or larger")
 | |
|     it = iter(values)
 | |
|     if repeat_after == 0:
 | |
|         for _ in range(block):
 | |
|             next(it)
 | |
|         while True:
 | |
|             yield next(it)
 | |
|     else:
 | |
|         while True:
 | |
|             for _ in range(block):
 | |
|                 next(it)
 | |
|             for _ in range(repeat_after):
 | |
|                 yield next(it)
 | |
| 
 | |
| 
 | |
| def post_periodic_filtered(values, repeat_after, block):
 | |
|     """
 | |
|     After every *repeat_after* items, blocks the next *block* items from
 | |
|     *values*. Note that unlike :func:`pre_periodic_filtered`, *repeat_after*
 | |
|     can't be 0. For example, to block every tenth item read from an ADC::
 | |
| 
 | |
|         from gpiozero import MCP3008
 | |
|         from gpiozero.tools import post_periodic_filtered
 | |
| 
 | |
|         adc = MCP3008(channel=0)
 | |
|         
 | |
|         for value in post_periodic_filtered(adc.values, 9, 1):
 | |
|             print(value)
 | |
|     """
 | |
|     if repeat_after < 1:
 | |
|         raise ValueError("repeat_after must be 1 or larger")
 | |
|     if block < 1:
 | |
|         raise ValueError("block must be 1 or larger")
 | |
|     it = iter(values)
 | |
|     while True:
 | |
|         for _ in range(repeat_after):
 | |
|             yield next(it)
 | |
|         for _ in range(block):
 | |
|             next(it)
 | |
| 
 | |
| 
 | |
| def random_values():
 | |
|     """
 | |
|     Provides an infinite source of random values between 0 and 1. For example,
 | |
|     to produce a "flickering candle" effect with an LED::
 | |
| 
 | |
|         from gpiozero import PWMLED
 | |
|         from gpiozero.tools import random_values
 | |
|         from signal import pause
 | |
| 
 | |
|         led = PWMLED(4)
 | |
|         
 | |
|         led.source = random_values()
 | |
|         
 | |
|         pause()
 | |
| 
 | |
|     If you require a wider range than 0 to 1, see :func:`scaled`.
 | |
|     """
 | |
|     while True:
 | |
|         yield random()
 | |
| 
 | |
| 
 | |
| def sin_values(period=360):
 | |
|     """
 | |
|     Provides an infinite source of values representing a sine wave (from -1 to
 | |
|     +1) which repeats every *period* values. For example, to produce a "siren"
 | |
|     effect with a couple of LEDs that repeats once a second::
 | |
| 
 | |
|         from gpiozero import PWMLED
 | |
|         from gpiozero.tools import sin_values, scaled, inverted
 | |
|         from signal import pause
 | |
| 
 | |
|         red = PWMLED(2)
 | |
|         blue = PWMLED(3)
 | |
|         
 | |
|         red.source_delay = 0.01
 | |
|         blue.source_delay = red.source_delay
 | |
|         red.source = scaled(sin_values(100), 0, 1, -1, 1)
 | |
|         blue.source = inverted(red.values)
 | |
|         
 | |
|         pause()
 | |
| 
 | |
|     If you require a different range than -1 to +1, see :func:`scaled`.
 | |
|     """
 | |
|     angles = (2 * pi * i / period for i in range(period))
 | |
|     for a in cycle(angles):
 | |
|         yield sin(a)
 | |
| 
 | |
| 
 | |
| def cos_values(period=360):
 | |
|     """
 | |
|     Provides an infinite source of values representing a cosine wave (from -1
 | |
|     to +1) which repeats every *period* values. For example, to produce a
 | |
|     "siren" effect with a couple of LEDs that repeats once a second::
 | |
| 
 | |
|         from gpiozero import PWMLED
 | |
|         from gpiozero.tools import cos_values, scaled, inverted
 | |
|         from signal import pause
 | |
| 
 | |
|         red = PWMLED(2)
 | |
|         blue = PWMLED(3)
 | |
|         
 | |
|         red.source_delay = 0.01
 | |
|         blue.source_delay = red.source_delay
 | |
|         red.source = scaled(cos_values(100), 0, 1, -1, 1)
 | |
|         blue.source = inverted(red.values)
 | |
|         
 | |
|         pause()
 | |
| 
 | |
|     If you require a different range than -1 to +1, see :func:`scaled`.
 | |
|     """
 | |
|     angles = (2 * pi * i / period for i in range(period))
 | |
|     for a in cycle(angles):
 | |
|         yield cos(a)
 | |
| 
 | |
| 
 | |
| def alternating_values(initial_value=False):
 | |
|     """
 | |
|     Provides an infinite source of values alternating between ``True`` and
 | |
|     ``False``, starting wth *initial_value* (which defaults to ``False``). For
 | |
|     example, to produce a flashing LED::
 | |
| 
 | |
|         from gpiozero import LED
 | |
|         from gpiozero.tools import alternating_values
 | |
|         from signal import pause
 | |
| 
 | |
|         red = LED(2)
 | |
| 
 | |
|         red.source_delay = 0.5
 | |
|         red.source = alternating_values()
 | |
| 
 | |
|         pause()
 | |
|     """
 | |
|     value = initial_value
 | |
|     while True:
 | |
|         yield value
 | |
|         value = not value
 |