from __future__ import division import inspect from functools import wraps from time import sleep, time from threading import Event from RPi import GPIO from w1thermsensor import W1ThermSensor from .devices import GPIODeviceError, GPIODevice, GPIOQueue def _alias(key): return property( lambda self: getattr(self, key), lambda self, val: setattr(self, key, val)) class InputDeviceError(GPIODeviceError): pass class InputDevice(GPIODevice): def __init__(self, pin=None, pull_up=False): super(InputDevice, self).__init__(pin) self._pull_up = pull_up self._active_edge = (GPIO.RISING, GPIO.FALLING)[pull_up] self._inactive_edge = (GPIO.FALLING, GPIO.RISING)[pull_up] if pull_up: self._active_state = GPIO.LOW self._inactive_state = GPIO.HIGH GPIO.setup(pin, GPIO.IN, (GPIO.PUD_DOWN, GPIO.PUD_UP)[pull_up]) @property def pull_up(self): return self._pull_up class WaitableInputDevice(InputDevice): def __init__(self, pin=None, pull_up=False): super(WaitableInputDevice, self).__init__(pin, pull_up) self._active_event = Event() self._inactive_event = Event() self._when_activated = None self._when_deactivated = None self._last_state = None def wait_for_active(self, timeout=None): return self._active_event.wait(timeout) def wait_for_inactive(self, timeout=None): return self._inactive_event.wait(timeout) def _get_when_activated(self): return self._when_activated def _set_when_activated(self, value): self._when_activated = self._wrap_callback(value) when_activated = property(_get_when_activated, _set_when_activated) def _get_when_deactivated(self): return self._when_deactivated def _set_when_deactivated(self, value): self._when_deactivated = self._wrap_callback(value) when_deactivated = property(_get_when_deactivated, _set_when_deactivated) def _wrap_callback(self, fn): if fn is None: return None elif not callable(fn): raise InputDeviceError('value must be None or a callable') else: # Try binding ourselves to the argspec of the provided callable. # If this works, assume the function is capable of accepting no # parameters try: inspect.getcallargs(fn) return fn except TypeError: try: # If the above fails, try binding with a single parameter # (ourselves). If this works, wrap the specified callback inspect.getcallargs(fn, self) @wraps(fn) def wrapper(): return fn(self) return wrapper except TypeError: raise InputDeviceError( 'value must be a callable which accepts up to one ' 'mandatory parameter') def _fire_events(self): old_state = self._last_state new_state = self._last_state = self.is_active if old_state is None: # Initial "indeterminate" state; set events but don't fire # callbacks as there's not necessarily an edge if new_state: self._active_event.set() else: self._inactive_event.set() else: if not old_state and new_state: self._inactive_event.clear() self._active_event.set() if self.when_activated: self.when_activated() elif old_state and not new_state: self._active_event.clear() self._inactive_event.set() if self.when_deactivated: self.when_deactivated() class DigitalInputDevice(WaitableInputDevice): def __init__(self, pin=None, pull_up=False, bouncetime=None): super(DigitalInputDevice, self).__init__(pin, pull_up) # Yes, that's really the default bouncetime in RPi.GPIO... GPIO.add_event_detect( self.pin, GPIO.BOTH, callback=self._fire_events, bouncetime=-666 if bouncetime is None else bouncetime ) # Call _fire_events once to set initial state of events super(DigitalInputDevice, self)._fire_events() def __del__(self): GPIO.remove_event_detect(self.pin) def _fire_events(self, channel): super(DigitalInputDevice, self)._fire_events() class SmoothedInputDevice(WaitableInputDevice): def __init__( self, pin=None, pull_up=False, threshold=0.5, queue_len=5, sample_wait=0.0, partial=False): super(SmoothedInputDevice, self).__init__(pin, pull_up) self._queue = GPIOQueue(self, queue_len, sample_wait, partial) self.threshold = float(threshold) @property def queue_len(self): return self._queue.queue.maxlen @property def partial(self): return self._queue.partial @property def value(self): return self._queue.value def _get_threshold(self): return self._threshold def _set_threshold(self, value): if not (0.0 < value < 1.0): raise InputDeviceError( 'threshold must be between zero and one exclusive' ) self._threshold = float(value) threshold = property(_get_threshold, _set_threshold) @property def is_active(self): return self.value > self.threshold class Button(DigitalInputDevice): def __init__(self, pin=None, pull_up=True, bouncetime=None): super(Button, self).__init__(pin, pull_up, bouncetime) when_pressed = _alias('when_activated') when_released = _alias('when_deactivated') wait_for_press = _alias('wait_for_active') wait_for_release = _alias('wait_for_inactive') class MotionSensor(SmoothedInputDevice): def __init__( self, pin=None, queue_len=5, sample_rate=10, threshold=0.5, partial=False): super(MotionSensor, self).__init__( pin, pull_up=False, threshold=threshold, queue_len=queue_len, sample_wait=1 / sample_rate, partial=partial ) self._queue.start() motion_detected = _alias('is_active') when_motion = _alias('when_activated') when_no_motion = _alias('when_deactivated') wait_for_motion = _alias('wait_for_active') wait_for_no_motion = _alias('wait_for_inactive') class LightSensor(SmoothedInputDevice): def __init__( self, pin=None, queue_len=5, charge_time_limit=0.01, threshold=0.1, partial=False): super(LightSensor, self).__init__( pin, pull_up=False, threshold=threshold, queue_len=queue_len, sample_wait=0.0, partial=partial ) self._charge_time_limit = charge_time_limit self._charged = Event() GPIO.add_event_detect( self.pin, GPIO.RISING, lambda channel: self._charged.set() ) self._queue.start() def __del__(self): GPIO.remove_event_detect(self.pin) @property def charge_time_limit(self): return self._charge_time_limit def _read(self): # Drain charge from the capacitor GPIO.setup(self.pin, GPIO.OUT) GPIO.output(self.pin, GPIO.LOW) sleep(0.1) # Time the charging of the capacitor start = time() self._charged.clear() GPIO.setup(self.pin, GPIO.IN) self._charged.wait(self.charge_time_limit) return ( 1.0 - min(self.charge_time_limit, time() - start) / self.charge_time_limit ) light_detected = _alias('is_active') when_light = _alias('when_activated') when_dark = _alias('when_deactivated') wait_for_light = _alias('wait_for_active') wait_for_dark = _alias('wait_for_inactive') class TemperatureSensor(W1ThermSensor): @property def value(self): return self.get_temperature()