mirror of
				https://github.com/KevinMidboe/python-gpiozero.git
				synced 2025-10-29 17:50:37 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			218 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			218 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from __future__ import division
 | |
| 
 | |
| from time import sleep, time
 | |
| from threading import Event
 | |
| 
 | |
| from RPi import GPIO
 | |
| from w1thermsensor import W1ThermSensor
 | |
| 
 | |
| from .devices import GPIODeviceError, GPIODevice, GPIOQueue
 | |
| 
 | |
| 
 | |
| def _alias(key):
 | |
|     return property(
 | |
|         lambda self: getattr(self, key),
 | |
|         lambda self, val: setattr(self, key, val))
 | |
| 
 | |
| 
 | |
| class InputDeviceError(GPIODeviceError):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class InputDevice(GPIODevice):
 | |
|     def __init__(self, pin=None, pull_up=False):
 | |
|         super(InputDevice, self).__init__(pin)
 | |
|         self._pull_up = pull_up
 | |
|         self._active_edge = (GPIO.RISING, GPIO.FALLING)[pull_up]
 | |
|         self._inactive_edge = (GPIO.FALLING, GPIO.RISING)[pull_up]
 | |
|         if pull_up:
 | |
|             self._active_state = GPIO.LOW
 | |
|             self._inactive_state = GPIO.HIGH
 | |
|         GPIO.setup(pin, GPIO.IN, (GPIO.PUD_DOWN, GPIO.PUD_UP)[pull_up])
 | |
| 
 | |
|     @property
 | |
|     def pull_up(self):
 | |
|         return self._pull_up
 | |
| 
 | |
| 
 | |
| class WaitableInputDevice(InputDevice):
 | |
|     def __init__(self, pin=None, pull_up=False):
 | |
|         super(WaitableInputDevice, self).__init__(pin, pull_up)
 | |
|         self._active_event = Event()
 | |
|         self._inactive_event = Event()
 | |
|         self._when_activated = None
 | |
|         self._when_deactivated = None
 | |
|         self._last_state = None
 | |
| 
 | |
|     def wait_for_active(self, timeout=None):
 | |
|         return self._active_event.wait(timeout)
 | |
| 
 | |
|     def wait_for_inactive(self, timeout=None):
 | |
|         return self._inactive_event.wait(timeout)
 | |
| 
 | |
|     def _get_when_activated(self):
 | |
|         return self._when_activated
 | |
|     def _set_when_activated(self, value):
 | |
|         if not callable(value) and value is not None:
 | |
|             raise InputDeviceError('value must be None or a function')
 | |
|         self._when_activated = value
 | |
|     when_activated = property(_get_when_activated, _set_when_activated)
 | |
| 
 | |
|     def _get_when_deactivated(self):
 | |
|         return self._when_deactivated
 | |
|     def _set_when_deactivated(self, value):
 | |
|         if not callable(value) and value is not None:
 | |
|             raise InputDeviceError('value must be None or a function')
 | |
|         self._when_deactivated = value
 | |
|     when_deactivated = property(_get_when_deactivated, _set_when_deactivated)
 | |
| 
 | |
|     def _fire_events(self):
 | |
|         old_state = self._last_state
 | |
|         new_state = self._last_state = self.is_active
 | |
|         if old_state is None:
 | |
|             # Initial "indeterminate" state; set events but don't fire
 | |
|             # callbacks as there's not necessarily an edge
 | |
|             if new_state:
 | |
|                 self._active_event.set()
 | |
|             else:
 | |
|                 self._inactive_event.set()
 | |
|         else:
 | |
|             if not old_state and new_state:
 | |
|                 self._inactive_event.clear()
 | |
|                 self._active_event.set()
 | |
|                 if self.when_activated:
 | |
|                     self.when_activated()
 | |
|             elif old_state and not new_state:
 | |
|                 self._active_event.clear()
 | |
|                 self._inactive_event.set()
 | |
|                 if self.when_deactivated:
 | |
|                     self.when_deactivated()
 | |
| 
 | |
| 
 | |
| class DigitalInputDevice(WaitableInputDevice):
 | |
|     def __init__(self, pin=None, pull_up=False, bouncetime=None):
 | |
|         super(DigitalInputDevice, self).__init__(pin, pull_up)
 | |
|         # Yes, that's really the default bouncetime in RPi.GPIO...
 | |
|         GPIO.add_event_detect(
 | |
|                 self.pin, GPIO.BOTH, callback=self._fire_events,
 | |
|                 bouncetime=-666 if bouncetime is None else bouncetime)
 | |
|         # Call _fire_events once to set initial state of events
 | |
|         super(DigitalInputDevice, self)._fire_events()
 | |
| 
 | |
|     def __del__(self):
 | |
|         GPIO.remove_event_detect(self.pin)
 | |
| 
 | |
|     def _fire_events(self, channel):
 | |
|         super(DigitalInputDevice, self)._fire_events()
 | |
| 
 | |
| 
 | |
| class SmoothedInputDevice(WaitableInputDevice):
 | |
|     def __init__(
 | |
|             self, pin=None, pull_up=False, threshold=0.5,
 | |
|             queue_len=5, sample_wait=0.0, partial=False):
 | |
|         super(SmoothedInputDevice, self).__init__(pin, pull_up)
 | |
|         self._queue = GPIOQueue(self, queue_len, sample_wait, partial)
 | |
|         self.threshold = float(threshold)
 | |
| 
 | |
|     @property
 | |
|     def queue_len(self):
 | |
|         return self._queue.queue.maxlen
 | |
| 
 | |
|     @property
 | |
|     def partial(self):
 | |
|         return self._queue.partial
 | |
| 
 | |
|     @property
 | |
|     def value(self):
 | |
|         return self._queue.value
 | |
| 
 | |
|     def _get_threshold(self):
 | |
|         return self._threshold
 | |
| 
 | |
|     def _set_threshold(self, value):
 | |
|         if not (0.0 < value < 1.0):
 | |
|             raise InputDeviceError('threshold must be between zero and one exclusive')
 | |
|         self._threshold = float(value)
 | |
|     threshold = property(_get_threshold, _set_threshold)
 | |
| 
 | |
|     @property
 | |
|     def is_active(self):
 | |
|         return self.value > self.threshold
 | |
| 
 | |
| 
 | |
| class Button(DigitalInputDevice):
 | |
|     def __init__(self, pin=None, pull_up=True, bouncetime=None):
 | |
|         super(Button, self).__init__(pin, pull_up, bouncetime)
 | |
| 
 | |
|     when_pressed = _alias('when_activated')
 | |
|     when_released = _alias('when_deactivated')
 | |
| 
 | |
|     wait_for_press = _alias('wait_for_active')
 | |
|     wait_for_release = _alias('wait_for_inactive')
 | |
| 
 | |
| 
 | |
| class MotionSensor(SmoothedInputDevice):
 | |
|     def __init__(
 | |
|             self, pin=None, queue_len=5, sample_rate=10, threshold=0.5,
 | |
|             partial=False):
 | |
|         super(MotionSensor, self).__init__(
 | |
|                 pin, pull_up=False, threshold=threshold,
 | |
|                 queue_len=queue_len, sample_wait=1 / sample_rate, partial=partial)
 | |
|         self._queue.start()
 | |
| 
 | |
|     motion_detected = _alias('is_active')
 | |
| 
 | |
|     when_motion = _alias('when_activated')
 | |
|     when_no_motion = _alias('when_deactivated')
 | |
| 
 | |
|     wait_for_motion = _alias('wait_for_active')
 | |
|     wait_for_no_motion = _alias('wait_for_inactive')
 | |
| 
 | |
| 
 | |
| class LightSensor(SmoothedInputDevice):
 | |
|     def __init__(
 | |
|             self, pin=None, queue_len=5, charge_time_limit=0.01,
 | |
|             threshold=0.1, partial=False):
 | |
|         super(LightSensor, self).__init__(
 | |
|                 pin, pull_up=False, threshold=threshold,
 | |
|                 queue_len=queue_len, sample_wait=0.0, partial=partial)
 | |
|         self._charge_time_limit = charge_time_limit
 | |
|         self._charged = Event()
 | |
|         GPIO.add_event_detect(self.pin, GPIO.RISING, lambda channel: self._charged.set())
 | |
|         self._queue.start()
 | |
| 
 | |
|     def __del__(self):
 | |
|         GPIO.remove_event_detect(self.pin)
 | |
| 
 | |
|     @property
 | |
|     def charge_time_limit(self):
 | |
|         return self._charge_time_limit
 | |
| 
 | |
|     def _read(self):
 | |
|         # Drain charge from the capacitor
 | |
|         GPIO.setup(self.pin, GPIO.OUT)
 | |
|         GPIO.output(self.pin, GPIO.LOW)
 | |
|         sleep(0.1)
 | |
|         # Time the charging of the capacitor
 | |
|         start = time()
 | |
|         self._charged.clear()
 | |
|         GPIO.setup(self.pin, GPIO.IN)
 | |
|         self._charged.wait(self.charge_time_limit)
 | |
|         return 1.0 - min(self.charge_time_limit, time() - start) / self.charge_time_limit
 | |
| 
 | |
|     light_detected = _alias('is_active')
 | |
| 
 | |
|     when_light = _alias('when_activated')
 | |
|     when_dark = _alias('when_deactivated')
 | |
| 
 | |
|     wait_for_light = _alias('wait_for_active')
 | |
|     wait_for_dark = _alias('wait_for_inactive')
 | |
| 
 | |
| 
 | |
| 
 | |
| class TemperatureSensor(W1ThermSensor):
 | |
|     @property
 | |
|     def value(self):
 | |
|         return self.get_temperature()
 | |
| 
 |