mirror of
				https://github.com/KevinMidboe/python-gpiozero.git
				synced 2025-10-29 17:50:37 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			273 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			273 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from __future__ import (
 | |
|     unicode_literals,
 | |
|     absolute_import,
 | |
|     print_function,
 | |
|     division,
 | |
|     )
 | |
| str = type('')
 | |
| 
 | |
| 
 | |
| from collections import namedtuple
 | |
| from time import time, sleep
 | |
| from threading import Thread, Event
 | |
| try:
 | |
|     from math import isclose
 | |
| except ImportError:
 | |
|     from ..compat import isclose
 | |
| 
 | |
| from . import Pin
 | |
| from ..exc import PinSetInput, PinPWMUnsupported, PinFixedPull
 | |
| 
 | |
| 
 | |
| PinState = namedtuple('PinState', ('timestamp', 'state'))
 | |
| 
 | |
| class MockPin(Pin):
 | |
|     """
 | |
|     A mock pin used primarily for testing. This class does *not* support PWM.
 | |
|     """
 | |
| 
 | |
|     _PINS = {}
 | |
| 
 | |
|     @classmethod
 | |
|     def clear_pins(cls):
 | |
|         cls._PINS.clear()
 | |
| 
 | |
|     def __new__(cls, number):
 | |
|         if not (0 <= number < 54):
 | |
|             raise ValueError('invalid pin %d specified (must be 0..53)' % number)
 | |
|         try:
 | |
|             old_pin = cls._PINS[number]
 | |
|         except KeyError:
 | |
|             self = super(MockPin, cls).__new__(cls)
 | |
|             cls._PINS[number] = self
 | |
|             self._number = number
 | |
|             self._function = 'input'
 | |
|             self._state = False
 | |
|             self._pull = 'floating'
 | |
|             self._bounce = None
 | |
|             self._edges = 'both'
 | |
|             self._when_changed = None
 | |
|             self.clear_states()
 | |
|             return self
 | |
|         if old_pin.__class__ != cls:
 | |
|             raise ValueError('pin %d is already in use as a %s' % (number, old_pin.__class__.__name__))
 | |
|         return old_pin
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return 'MOCK%d' % self._number
 | |
| 
 | |
|     @property
 | |
|     def number(self):
 | |
|         return self._number
 | |
| 
 | |
|     def close(self):
 | |
|         self.when_changed = None
 | |
|         self.function = 'input'
 | |
| 
 | |
|     def _get_function(self):
 | |
|         return self._function
 | |
| 
 | |
|     def _set_function(self, value):
 | |
|         assert value in ('input', 'output')
 | |
|         self._function = value
 | |
|         if value == 'input':
 | |
|             # Drive the input to the pull
 | |
|             self._set_pull(self._get_pull())
 | |
| 
 | |
|     def _get_state(self):
 | |
|         return self._state
 | |
| 
 | |
|     def _set_state(self, value):
 | |
|         if self._function == 'input':
 | |
|             raise PinSetInput('cannot set state of pin %r' % self)
 | |
|         assert self._function == 'output'
 | |
|         assert 0 <= value <= 1
 | |
|         self._change_state(bool(value))
 | |
| 
 | |
|     def _change_state(self, value):
 | |
|         if self._state != value:
 | |
|             t = time()
 | |
|             self._state = value
 | |
|             self.states.append(PinState(t - self._last_change, value))
 | |
|             self._last_change = t
 | |
|             return True
 | |
|         return False
 | |
| 
 | |
|     def _get_frequency(self):
 | |
|         return None
 | |
| 
 | |
|     def _set_frequency(self, value):
 | |
|         if value is not None:
 | |
|             raise PinPWMUnsupported()
 | |
| 
 | |
|     def _get_pull(self):
 | |
|         return self._pull
 | |
| 
 | |
|     def _set_pull(self, value):
 | |
|         assert self._function == 'input'
 | |
|         assert value in ('floating', 'up', 'down')
 | |
|         self._pull = value
 | |
|         if value == 'up':
 | |
|             self.drive_high()
 | |
|         elif value == 'down':
 | |
|             self.drive_low()
 | |
| 
 | |
|     def _get_bounce(self):
 | |
|         return self._bounce
 | |
| 
 | |
|     def _set_bounce(self, value):
 | |
|         # XXX Need to implement this
 | |
|         self._bounce = value
 | |
| 
 | |
|     def _get_edges(self):
 | |
|         return self._edges
 | |
| 
 | |
|     def _set_edges(self, value):
 | |
|         assert value in ('none', 'falling', 'rising', 'both')
 | |
|         self._edges = value
 | |
| 
 | |
|     def _get_when_changed(self):
 | |
|         return self._when_changed
 | |
| 
 | |
|     def _set_when_changed(self, value):
 | |
|         self._when_changed = value
 | |
| 
 | |
|     def drive_high(self):
 | |
|         assert self._function == 'input'
 | |
|         if self._change_state(True):
 | |
|             if self._edges in ('both', 'rising') and self._when_changed is not None:
 | |
|                 self._when_changed()
 | |
| 
 | |
|     def drive_low(self):
 | |
|         assert self._function == 'input'
 | |
|         if self._change_state(False):
 | |
|             if self._edges in ('both', 'falling') and self._when_changed is not None:
 | |
|                 self._when_changed()
 | |
| 
 | |
|     def clear_states(self):
 | |
|         self._last_change = time()
 | |
|         self.states = [PinState(0.0, self._state)]
 | |
| 
 | |
|     def assert_states(self, expected_states):
 | |
|         # Tests that the pin went through the expected states (a list of values)
 | |
|         for actual, expected in zip(self.states, expected_states):
 | |
|             assert actual.state == expected
 | |
| 
 | |
|     def assert_states_and_times(self, expected_states):
 | |
|         # Tests that the pin went through the expected states at the expected
 | |
|         # times (times are compared with a tolerance of tens-of-milliseconds as
 | |
|         # that's about all we can reasonably expect in a non-realtime
 | |
|         # environment on a Pi 1)
 | |
|         for actual, expected in zip(self.states, expected_states):
 | |
|             assert isclose(actual.timestamp, expected[0], rel_tol=0.01, abs_tol=0.01)
 | |
|             assert isclose(actual.state, expected[1])
 | |
| 
 | |
| 
 | |
| class MockPulledUpPin(MockPin):
 | |
|     """
 | |
|     This derivative of :class:`MockPin` emulates a pin with a physical pull-up
 | |
|     resistor.
 | |
|     """
 | |
|     def _set_pull(self, value):
 | |
|         if value != 'up':
 | |
|             raise PinFixedPull('pin has a physical pull-up resistor')
 | |
| 
 | |
| 
 | |
| class MockChargingPin(MockPin):
 | |
|     """
 | |
|     This derivative of :class:`MockPin` emulates a pin which, when set to
 | |
|     input, waits a predetermined length of time and then drives itself high
 | |
|     (as if attached to, e.g. a typical circuit using an LDR and a capacitor
 | |
|     to time the charging rate).
 | |
|     """
 | |
|     def __init__(self, number):
 | |
|         super(MockChargingPin, self).__init__()
 | |
|         self.charge_time = 0.01 # dark charging time
 | |
|         self._charge_stop = Event()
 | |
|         self._charge_thread = None
 | |
| 
 | |
|     def _set_function(self, value):
 | |
|         super(MockChargingPin, self)._set_function(value)
 | |
|         if value == 'input':
 | |
|             if self._charge_thread:
 | |
|                 self._charge_stop.set()
 | |
|                 self._charge_thread.join()
 | |
|             self._charge_stop.clear()
 | |
|             self._charge_thread = Thread(target=self._charge)
 | |
|             self._charge_thread.start()
 | |
|         elif value == 'output':
 | |
|             if self._charge_thread:
 | |
|                 self._charge_stop.set()
 | |
|                 self._charge_thread.join()
 | |
| 
 | |
|     def _charge(self):
 | |
|         if not self._charge_stop.wait(self.charge_time):
 | |
|             try:
 | |
|                 self.drive_high()
 | |
|             except AssertionError:
 | |
|                 # Charging pins are typically flipped between input and output
 | |
|                 # repeatedly; if another thread has already flipped us to
 | |
|                 # output ignore the assertion-error resulting from attempting
 | |
|                 # to drive the pin high
 | |
|                 pass
 | |
| 
 | |
| 
 | |
| class MockTriggerPin(MockPin):
 | |
|     """
 | |
|     This derivative of :class:`MockPin` is intended to be used with another
 | |
|     :class:`MockPin` to emulate a distance sensor. Set :attr:`echo_pin` to the
 | |
|     corresponding pin instance. When this pin is driven high it will trigger
 | |
|     the echo pin to drive high for the echo time.
 | |
|     """
 | |
|     def __init__(self, number):
 | |
|         super(MockTriggerPin, self).__init__()
 | |
|         self.echo_pin = None
 | |
|         self.echo_time = 0.04 # longest echo time
 | |
|         self._echo_thread = None
 | |
| 
 | |
|     def _set_state(self, value):
 | |
|         super(MockTriggerPin, self)._set_state(value)
 | |
|         if value:
 | |
|             if self._echo_thread:
 | |
|                 self._echo_thread.join()
 | |
|             self._echo_thread = Thread(target=self._echo)
 | |
|             self._echo_thread.start()
 | |
| 
 | |
|     def _echo(self):
 | |
|         sleep(0.001)
 | |
|         self.echo_pin.drive_high()
 | |
|         sleep(self.echo_time)
 | |
|         self.echo_pin.drive_low()
 | |
| 
 | |
| 
 | |
| class MockPWMPin(MockPin):
 | |
|     """
 | |
|     This derivative of :class:`MockPin` adds PWM support.
 | |
|     """
 | |
| 
 | |
|     def __init__(self, number):
 | |
|         super(MockPWMPin, self).__init__()
 | |
|         self._frequency = None
 | |
| 
 | |
|     def close(self):
 | |
|         self.frequency = None
 | |
|         super(MockPWMPin, self).close()
 | |
| 
 | |
|     def _set_state(self, value):
 | |
|         if self._function == 'input':
 | |
|             raise PinSetInput('cannot set state of pin %r' % self)
 | |
|         assert self._function == 'output'
 | |
|         assert 0 <= value <= 1
 | |
|         self._change_state(float(value))
 | |
| 
 | |
|     def _get_frequency(self):
 | |
|         return self._frequency
 | |
| 
 | |
|     def _set_frequency(self, value):
 | |
|         if value is not None:
 | |
|             assert self._function == 'output'
 | |
|         self._frequency = value
 | |
|         if value is None:
 | |
|             self._change_state(0.0)
 | |
| 
 |