mirror of
				https://github.com/KevinMidboe/python-gpiozero.git
				synced 2025-10-29 17:50:37 +00:00 
			
		
		
		
	Add a nice __repr__ to the GPIODevice base class. This isn't much but generally I think `__repr__` implementations should be deliberately simple: firstly, they're frequently used for debugging so if they're at all complex you risk making a debugging tool buggy (very annoying!). Secondly, if you pour too much info into them you risk making the debugging output cluttered, so I tend to prefer keeping it to straight-forward simple to retrieve/calculate info without excessive detail (if the user wants more, they can always query it directly). There is one refinement here: in SmoothedInputDevice, `__repr__` is tweaked to ensure that when partial is False (the default), and the queue isn't filled, `__repr__` doesn't block (because it should *never* block).
		
			
				
	
	
		
			266 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			266 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from __future__ import division
 | |
| 
 | |
| import inspect
 | |
| from functools import wraps
 | |
| from time import sleep, time
 | |
| from threading import Event
 | |
| 
 | |
| from RPi import GPIO
 | |
| from w1thermsensor import W1ThermSensor
 | |
| 
 | |
| from .devices import GPIODeviceError, GPIODevice, GPIOQueue
 | |
| 
 | |
| 
 | |
| def _alias(key):
 | |
|     return property(
 | |
|         lambda self: getattr(self, key),
 | |
|         lambda self, val: setattr(self, key, val))
 | |
| 
 | |
| 
 | |
| class InputDeviceError(GPIODeviceError):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class InputDevice(GPIODevice):
 | |
|     def __init__(self, pin=None, pull_up=False):
 | |
|         super(InputDevice, self).__init__(pin)
 | |
|         self._pull_up = pull_up
 | |
|         self._active_edge = (GPIO.RISING, GPIO.FALLING)[pull_up]
 | |
|         self._inactive_edge = (GPIO.FALLING, GPIO.RISING)[pull_up]
 | |
|         if pull_up:
 | |
|             self._active_state = GPIO.LOW
 | |
|             self._inactive_state = GPIO.HIGH
 | |
|         GPIO.setup(pin, GPIO.IN, (GPIO.PUD_DOWN, GPIO.PUD_UP)[pull_up])
 | |
| 
 | |
|     @property
 | |
|     def pull_up(self):
 | |
|         return self._pull_up
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return "<gpiozero.%s object on pin=%d, pull_up=%s, is_active=%s>" % (
 | |
|             self.__class__.__name__, self.pin, self.pull_up, self.is_active)
 | |
| 
 | |
| 
 | |
| class WaitableInputDevice(InputDevice):
 | |
|     def __init__(self, pin=None, pull_up=False):
 | |
|         super(WaitableInputDevice, self).__init__(pin, pull_up)
 | |
|         self._active_event = Event()
 | |
|         self._inactive_event = Event()
 | |
|         self._when_activated = None
 | |
|         self._when_deactivated = None
 | |
|         self._last_state = None
 | |
| 
 | |
|     def wait_for_active(self, timeout=None):
 | |
|         return self._active_event.wait(timeout)
 | |
| 
 | |
|     def wait_for_inactive(self, timeout=None):
 | |
|         return self._inactive_event.wait(timeout)
 | |
| 
 | |
|     def _get_when_activated(self):
 | |
|         return self._when_activated
 | |
| 
 | |
|     def _set_when_activated(self, value):
 | |
|         self._when_activated = self._wrap_callback(value)
 | |
| 
 | |
|     when_activated = property(_get_when_activated, _set_when_activated)
 | |
| 
 | |
|     def _get_when_deactivated(self):
 | |
|         return self._when_deactivated
 | |
| 
 | |
|     def _set_when_deactivated(self, value):
 | |
|         self._when_deactivated = self._wrap_callback(value)
 | |
| 
 | |
|     when_deactivated = property(_get_when_deactivated, _set_when_deactivated)
 | |
| 
 | |
|     def _wrap_callback(self, fn):
 | |
|         if fn is None:
 | |
|             return None
 | |
|         elif not callable(fn):
 | |
|             raise InputDeviceError('value must be None or a callable')
 | |
|         else:
 | |
|             # Try binding ourselves to the argspec of the provided callable.
 | |
|             # If this works, assume the function is capable of accepting no
 | |
|             # parameters
 | |
|             try:
 | |
|                 inspect.getcallargs(fn)
 | |
|                 return fn
 | |
|             except TypeError:
 | |
|                 try:
 | |
|                     # If the above fails, try binding with a single parameter
 | |
|                     # (ourselves). If this works, wrap the specified callback
 | |
|                     inspect.getcallargs(fn, self)
 | |
|                     @wraps(fn)
 | |
|                     def wrapper():
 | |
|                         return fn(self)
 | |
|                     return wrapper
 | |
|                 except TypeError:
 | |
|                     raise InputDeviceError(
 | |
|                         'value must be a callable which accepts up to one '
 | |
|                         'mandatory parameter')
 | |
| 
 | |
|     def _fire_events(self):
 | |
|         old_state = self._last_state
 | |
|         new_state = self._last_state = self.is_active
 | |
|         if old_state is None:
 | |
|             # Initial "indeterminate" state; set events but don't fire
 | |
|             # callbacks as there's not necessarily an edge
 | |
|             if new_state:
 | |
|                 self._active_event.set()
 | |
|             else:
 | |
|                 self._inactive_event.set()
 | |
|         else:
 | |
|             if not old_state and new_state:
 | |
|                 self._inactive_event.clear()
 | |
|                 self._active_event.set()
 | |
|                 if self.when_activated:
 | |
|                     self.when_activated()
 | |
|             elif old_state and not new_state:
 | |
|                 self._active_event.clear()
 | |
|                 self._inactive_event.set()
 | |
|                 if self.when_deactivated:
 | |
|                     self.when_deactivated()
 | |
| 
 | |
| 
 | |
| class DigitalInputDevice(WaitableInputDevice):
 | |
|     def __init__(self, pin=None, pull_up=False, bouncetime=None):
 | |
|         super(DigitalInputDevice, self).__init__(pin, pull_up)
 | |
|         # Yes, that's really the default bouncetime in RPi.GPIO...
 | |
|         GPIO.add_event_detect(
 | |
|             self.pin, GPIO.BOTH, callback=self._fire_events,
 | |
|             bouncetime=-666 if bouncetime is None else bouncetime
 | |
|         )
 | |
|         # Call _fire_events once to set initial state of events
 | |
|         super(DigitalInputDevice, self)._fire_events()
 | |
| 
 | |
|     def __del__(self):
 | |
|         GPIO.remove_event_detect(self.pin)
 | |
| 
 | |
|     def _fire_events(self, channel):
 | |
|         super(DigitalInputDevice, self)._fire_events()
 | |
| 
 | |
| 
 | |
| class SmoothedInputDevice(WaitableInputDevice):
 | |
|     def __init__(
 | |
|             self, pin=None, pull_up=False, threshold=0.5,
 | |
|             queue_len=5, sample_wait=0.0, partial=False):
 | |
|         super(SmoothedInputDevice, self).__init__(pin, pull_up)
 | |
|         self._queue = GPIOQueue(self, queue_len, sample_wait, partial)
 | |
|         self.threshold = float(threshold)
 | |
| 
 | |
|     def __repr__(self):
 | |
|         if self.partial or self._queue.full.wait(0):
 | |
|             return super(SmoothedInputDevice, self).__repr__()
 | |
|         else:
 | |
|             return "<gpiozero.%s object on pin=%d, pull_up=%s>" % (
 | |
|                 self.__class__.__name__, self.pin, self.pull_up)
 | |
| 
 | |
|     @property
 | |
|     def queue_len(self):
 | |
|         return self._queue.queue.maxlen
 | |
| 
 | |
|     @property
 | |
|     def partial(self):
 | |
|         return self._queue.partial
 | |
| 
 | |
|     @property
 | |
|     def value(self):
 | |
|         return self._queue.value
 | |
| 
 | |
|     def _get_threshold(self):
 | |
|         return self._threshold
 | |
| 
 | |
|     def _set_threshold(self, value):
 | |
|         if not (0.0 < value < 1.0):
 | |
|             raise InputDeviceError(
 | |
|                 'threshold must be between zero and one exclusive'
 | |
|             )
 | |
|         self._threshold = float(value)
 | |
| 
 | |
|     threshold = property(_get_threshold, _set_threshold)
 | |
| 
 | |
|     @property
 | |
|     def is_active(self):
 | |
|         return self.value > self.threshold
 | |
| 
 | |
| 
 | |
| class Button(DigitalInputDevice):
 | |
|     def __init__(self, pin=None, pull_up=True, bouncetime=None):
 | |
|         super(Button, self).__init__(pin, pull_up, bouncetime)
 | |
| 
 | |
|     when_pressed = _alias('when_activated')
 | |
|     when_released = _alias('when_deactivated')
 | |
| 
 | |
|     wait_for_press = _alias('wait_for_active')
 | |
|     wait_for_release = _alias('wait_for_inactive')
 | |
| 
 | |
| 
 | |
| class MotionSensor(SmoothedInputDevice):
 | |
|     def __init__(
 | |
|             self, pin=None, queue_len=5, sample_rate=10, threshold=0.5,
 | |
|             partial=False):
 | |
|         super(MotionSensor, self).__init__(
 | |
|             pin, pull_up=False, threshold=threshold,
 | |
|             queue_len=queue_len, sample_wait=1 / sample_rate, partial=partial
 | |
|         )
 | |
|         self._queue.start()
 | |
| 
 | |
|     motion_detected = _alias('is_active')
 | |
| 
 | |
|     when_motion = _alias('when_activated')
 | |
|     when_no_motion = _alias('when_deactivated')
 | |
| 
 | |
|     wait_for_motion = _alias('wait_for_active')
 | |
|     wait_for_no_motion = _alias('wait_for_inactive')
 | |
| 
 | |
| 
 | |
| class LightSensor(SmoothedInputDevice):
 | |
|     def __init__(
 | |
|             self, pin=None, queue_len=5, charge_time_limit=0.01,
 | |
|             threshold=0.1, partial=False):
 | |
|         super(LightSensor, self).__init__(
 | |
|             pin, pull_up=False, threshold=threshold,
 | |
|             queue_len=queue_len, sample_wait=0.0, partial=partial
 | |
|         )
 | |
|         self._charge_time_limit = charge_time_limit
 | |
|         self._charged = Event()
 | |
|         GPIO.add_event_detect(
 | |
|             self.pin, GPIO.RISING, lambda channel: self._charged.set()
 | |
|         )
 | |
|         self._queue.start()
 | |
| 
 | |
|     def __del__(self):
 | |
|         GPIO.remove_event_detect(self.pin)
 | |
| 
 | |
|     @property
 | |
|     def charge_time_limit(self):
 | |
|         return self._charge_time_limit
 | |
| 
 | |
|     def _read(self):
 | |
|         # Drain charge from the capacitor
 | |
|         GPIO.setup(self.pin, GPIO.OUT)
 | |
|         GPIO.output(self.pin, GPIO.LOW)
 | |
|         sleep(0.1)
 | |
|         # Time the charging of the capacitor
 | |
|         start = time()
 | |
|         self._charged.clear()
 | |
|         GPIO.setup(self.pin, GPIO.IN)
 | |
|         self._charged.wait(self.charge_time_limit)
 | |
|         return (
 | |
|             1.0 - min(self.charge_time_limit, time() - start) /
 | |
|             self.charge_time_limit
 | |
|         )
 | |
| 
 | |
|     light_detected = _alias('is_active')
 | |
| 
 | |
|     when_light = _alias('when_activated')
 | |
|     when_dark = _alias('when_deactivated')
 | |
| 
 | |
|     wait_for_light = _alias('wait_for_active')
 | |
|     wait_for_dark = _alias('wait_for_inactive')
 | |
| 
 | |
| 
 | |
| class TemperatureSensor(W1ThermSensor):
 | |
|     @property
 | |
|     def value(self):
 | |
|         return self.get_temperature()
 |