mirror of
				https://github.com/KevinMidboe/python-gpiozero.git
				synced 2025-10-29 17:50:37 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			218 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			218 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from __future__ import division
 | 
						|
 | 
						|
from time import sleep, time
 | 
						|
from threading import Event
 | 
						|
 | 
						|
from RPi import GPIO
 | 
						|
from w1thermsensor import W1ThermSensor
 | 
						|
 | 
						|
from .devices import GPIODeviceError, GPIODevice, GPIOQueue
 | 
						|
 | 
						|
 | 
						|
def _alias(key):
 | 
						|
    return property(
 | 
						|
        lambda self: getattr(self, key),
 | 
						|
        lambda self, val: setattr(self, key, val))
 | 
						|
 | 
						|
 | 
						|
class InputDeviceError(GPIODeviceError):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
class InputDevice(GPIODevice):
 | 
						|
    def __init__(self, pin=None, pull_up=False):
 | 
						|
        super(InputDevice, self).__init__(pin)
 | 
						|
        self._pull_up = pull_up
 | 
						|
        self._active_edge = (GPIO.RISING, GPIO.FALLING)[pull_up]
 | 
						|
        self._inactive_edge = (GPIO.FALLING, GPIO.RISING)[pull_up]
 | 
						|
        if pull_up:
 | 
						|
            self._active_state = GPIO.LOW
 | 
						|
            self._inactive_state = GPIO.HIGH
 | 
						|
        GPIO.setup(pin, GPIO.IN, (GPIO.PUD_DOWN, GPIO.PUD_UP)[pull_up])
 | 
						|
 | 
						|
    @property
 | 
						|
    def pull_up(self):
 | 
						|
        return self._pull_up
 | 
						|
 | 
						|
 | 
						|
class WaitableInputDevice(InputDevice):
 | 
						|
    def __init__(self, pin=None, pull_up=False):
 | 
						|
        super(WaitableInputDevice, self).__init__(pin, pull_up)
 | 
						|
        self._active_event = Event()
 | 
						|
        self._inactive_event = Event()
 | 
						|
        self._when_activated = None
 | 
						|
        self._when_deactivated = None
 | 
						|
        self._last_state = None
 | 
						|
 | 
						|
    def wait_for_active(self, timeout=None):
 | 
						|
        return self._active_event.wait(timeout)
 | 
						|
 | 
						|
    def wait_for_inactive(self, timeout=None):
 | 
						|
        return self._inactive_event.wait(timeout)
 | 
						|
 | 
						|
    def _get_when_activated(self):
 | 
						|
        return self._when_activated
 | 
						|
    def _set_when_activated(self, value):
 | 
						|
        if not callable(value) and value is not None:
 | 
						|
            raise InputDeviceError('value must be None or a function')
 | 
						|
        self._when_activated = value
 | 
						|
    when_activated = property(_get_when_activated, _set_when_activated)
 | 
						|
 | 
						|
    def _get_when_deactivated(self):
 | 
						|
        return self._when_deactivated
 | 
						|
    def _set_when_deactivated(self, value):
 | 
						|
        if not callable(value) and value is not None:
 | 
						|
            raise InputDeviceError('value must be None or a function')
 | 
						|
        self._when_deactivated = value
 | 
						|
    when_deactivated = property(_get_when_deactivated, _set_when_deactivated)
 | 
						|
 | 
						|
    def _fire_events(self):
 | 
						|
        old_state = self._last_state
 | 
						|
        new_state = self._last_state = self.is_active
 | 
						|
        if old_state is None:
 | 
						|
            # Initial "indeterminate" state; set events but don't fire
 | 
						|
            # callbacks as there's not necessarily an edge
 | 
						|
            if new_state:
 | 
						|
                self._active_event.set()
 | 
						|
            else:
 | 
						|
                self._inactive_event.set()
 | 
						|
        else:
 | 
						|
            if not old_state and new_state:
 | 
						|
                self._inactive_event.clear()
 | 
						|
                self._active_event.set()
 | 
						|
                if self.when_activated:
 | 
						|
                    self.when_activated()
 | 
						|
            elif old_state and not new_state:
 | 
						|
                self._active_event.clear()
 | 
						|
                self._inactive_event.set()
 | 
						|
                if self.when_deactivated:
 | 
						|
                    self.when_deactivated()
 | 
						|
 | 
						|
 | 
						|
class DigitalInputDevice(WaitableInputDevice):
 | 
						|
    def __init__(self, pin=None, pull_up=False, bouncetime=None):
 | 
						|
        super(DigitalInputDevice, self).__init__(pin, pull_up)
 | 
						|
        # Yes, that's really the default bouncetime in RPi.GPIO...
 | 
						|
        GPIO.add_event_detect(
 | 
						|
                self.pin, GPIO.BOTH, callback=self._fire_events,
 | 
						|
                bouncetime=-666 if bouncetime is None else bouncetime)
 | 
						|
        # Call _fire_events once to set initial state of events
 | 
						|
        super(DigitalInputDevice, self)._fire_events()
 | 
						|
 | 
						|
    def __del__(self):
 | 
						|
        GPIO.remove_event_detect(self.pin)
 | 
						|
 | 
						|
    def _fire_events(self, channel):
 | 
						|
        super(DigitalInputDevice, self)._fire_events()
 | 
						|
 | 
						|
 | 
						|
class SmoothedInputDevice(WaitableInputDevice):
 | 
						|
    def __init__(
 | 
						|
            self, pin=None, pull_up=False, threshold=0.5,
 | 
						|
            queue_len=5, sample_wait=0.0, partial=False):
 | 
						|
        super(SmoothedInputDevice, self).__init__(pin, pull_up)
 | 
						|
        self._queue = GPIOQueue(self, queue_len, sample_wait, partial)
 | 
						|
        self.threshold = float(threshold)
 | 
						|
 | 
						|
    @property
 | 
						|
    def queue_len(self):
 | 
						|
        return self._queue.queue.maxlen
 | 
						|
 | 
						|
    @property
 | 
						|
    def partial(self):
 | 
						|
        return self._queue.partial
 | 
						|
 | 
						|
    @property
 | 
						|
    def value(self):
 | 
						|
        return self._queue.value
 | 
						|
 | 
						|
    def _get_threshold(self):
 | 
						|
        return self._threshold
 | 
						|
 | 
						|
    def _set_threshold(self, value):
 | 
						|
        if not (0.0 < value < 1.0):
 | 
						|
            raise InputDeviceError('threshold must be between zero and one exclusive')
 | 
						|
        self._threshold = float(value)
 | 
						|
    threshold = property(_get_threshold, _set_threshold)
 | 
						|
 | 
						|
    @property
 | 
						|
    def is_active(self):
 | 
						|
        return self.value > self.threshold
 | 
						|
 | 
						|
 | 
						|
class Button(DigitalInputDevice):
 | 
						|
    def __init__(self, pin=None, pull_up=True, bouncetime=None):
 | 
						|
        super(Button, self).__init__(pin, pull_up, bouncetime)
 | 
						|
 | 
						|
    when_pressed = _alias('when_activated')
 | 
						|
    when_released = _alias('when_deactivated')
 | 
						|
 | 
						|
    wait_for_press = _alias('wait_for_active')
 | 
						|
    wait_for_release = _alias('wait_for_inactive')
 | 
						|
 | 
						|
 | 
						|
class MotionSensor(SmoothedInputDevice):
 | 
						|
    def __init__(
 | 
						|
            self, pin=None, queue_len=5, sample_rate=10, threshold=0.5,
 | 
						|
            partial=False):
 | 
						|
        super(MotionSensor, self).__init__(
 | 
						|
                pin, pull_up=False, threshold=threshold,
 | 
						|
                queue_len=queue_len, sample_wait=1 / sample_rate, partial=partial)
 | 
						|
        self._queue.start()
 | 
						|
 | 
						|
    motion_detected = _alias('is_active')
 | 
						|
 | 
						|
    when_motion = _alias('when_activated')
 | 
						|
    when_no_motion = _alias('when_deactivated')
 | 
						|
 | 
						|
    wait_for_motion = _alias('wait_for_active')
 | 
						|
    wait_for_no_motion = _alias('wait_for_inactive')
 | 
						|
 | 
						|
 | 
						|
class LightSensor(SmoothedInputDevice):
 | 
						|
    def __init__(
 | 
						|
            self, pin=None, queue_len=5, charge_time_limit=0.01,
 | 
						|
            threshold=0.1, partial=False):
 | 
						|
        super(LightSensor, self).__init__(
 | 
						|
                pin, pull_up=False, threshold=threshold,
 | 
						|
                queue_len=queue_len, sample_wait=0.0, partial=partial)
 | 
						|
        self._charge_time_limit = charge_time_limit
 | 
						|
        self._charged = Event()
 | 
						|
        GPIO.add_event_detect(self.pin, GPIO.RISING, lambda channel: self._charged.set())
 | 
						|
        self._queue.start()
 | 
						|
 | 
						|
    def __del__(self):
 | 
						|
        GPIO.remove_event_detect(self.pin)
 | 
						|
 | 
						|
    @property
 | 
						|
    def charge_time_limit(self):
 | 
						|
        return self._charge_time_limit
 | 
						|
 | 
						|
    def _read(self):
 | 
						|
        # Drain charge from the capacitor
 | 
						|
        GPIO.setup(self.pin, GPIO.OUT)
 | 
						|
        GPIO.output(self.pin, GPIO.LOW)
 | 
						|
        sleep(0.1)
 | 
						|
        # Time the charging of the capacitor
 | 
						|
        start = time()
 | 
						|
        self._charged.clear()
 | 
						|
        GPIO.setup(self.pin, GPIO.IN)
 | 
						|
        self._charged.wait(self.charge_time_limit)
 | 
						|
        return 1.0 - min(self.charge_time_limit, time() - start) / self.charge_time_limit
 | 
						|
 | 
						|
    light_detected = _alias('is_active')
 | 
						|
 | 
						|
    when_light = _alias('when_activated')
 | 
						|
    when_dark = _alias('when_deactivated')
 | 
						|
 | 
						|
    wait_for_light = _alias('wait_for_active')
 | 
						|
    wait_for_dark = _alias('wait_for_inactive')
 | 
						|
 | 
						|
 | 
						|
 | 
						|
class TemperatureSensor(W1ThermSensor):
 | 
						|
    @property
 | 
						|
    def value(self):
 | 
						|
        return self.get_temperature()
 | 
						|
 |