mirror of
https://github.com/KevinMidboe/python-gpiozero.git
synced 2025-10-29 17:50:37 +00:00
Add close method and context manager
Permit devices to be explicitly closed and used as context managers. Also deal with cleanup properly at script end and ensure objects don't step on the global cleanup function.
This commit is contained in:
@@ -1,31 +1,96 @@
|
||||
import atexit
|
||||
import weakref
|
||||
from threading import Thread, Event
|
||||
from threading import Thread, Event, RLock
|
||||
from collections import deque
|
||||
|
||||
from RPi import GPIO
|
||||
|
||||
|
||||
_GPIO_THREADS = set()
|
||||
_GPIO_PINS = set()
|
||||
# Due to interactions between RPi.GPIO cleanup and the GPIODevice.close()
|
||||
# method the same thread may attempt to acquire this lock, leading to deadlock
|
||||
# unless the lock is re-entrant
|
||||
_GPIO_PINS_LOCK = RLock()
|
||||
|
||||
def _gpio_threads_shutdown():
|
||||
while _GPIO_THREADS:
|
||||
for t in _GPIO_THREADS.copy():
|
||||
t.stop()
|
||||
with _GPIO_PINS_LOCK:
|
||||
while _GPIO_PINS:
|
||||
GPIO.remove_event_detect(_GPIO_PINS.pop())
|
||||
GPIO.cleanup()
|
||||
|
||||
atexit.register(_gpio_threads_shutdown)
|
||||
GPIO.setmode(GPIO.BCM)
|
||||
GPIO.setwarnings(False)
|
||||
|
||||
|
||||
class GPIODeviceError(Exception):
|
||||
pass
|
||||
|
||||
class GPIODeviceClosed(GPIODeviceError):
|
||||
pass
|
||||
|
||||
|
||||
class GPIODevice(object):
|
||||
"""
|
||||
Generic GPIO Device.
|
||||
"""
|
||||
def __init__(self, pin=None):
|
||||
# self._pin must be set before any possible exceptions can be raised
|
||||
# because it's accessed in __del__. However, it mustn't be given the
|
||||
# value of pin until we've verified that it isn't already allocated
|
||||
self._pin = None
|
||||
if pin is None:
|
||||
raise GPIODeviceError('No GPIO pin number given')
|
||||
with _GPIO_PINS_LOCK:
|
||||
if pin in _GPIO_PINS:
|
||||
raise GPIODeviceError(
|
||||
'pin %d is already in use by another gpiozero object' % pin)
|
||||
_GPIO_PINS.add(pin)
|
||||
self._pin = pin
|
||||
self._active_state = GPIO.HIGH
|
||||
self._inactive_state = GPIO.LOW
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
||||
def _read(self):
|
||||
return GPIO.input(self.pin) == self._active_state
|
||||
try:
|
||||
return GPIO.input(self.pin) == self._active_state
|
||||
except TypeError:
|
||||
self._check_open()
|
||||
raise
|
||||
|
||||
def _fire_events(self):
|
||||
pass
|
||||
|
||||
def _check_open(self):
|
||||
if self.closed:
|
||||
raise GPIODeviceClosed(
|
||||
'%s is closed or uninitialized' % self.__class__.__name__)
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
return self._pin is None
|
||||
|
||||
def close(self):
|
||||
with _GPIO_PINS_LOCK:
|
||||
pin = self._pin
|
||||
self._pin = None
|
||||
if pin in _GPIO_PINS:
|
||||
_GPIO_PINS.remove(pin)
|
||||
GPIO.remove_event_detect(pin)
|
||||
GPIO.cleanup(pin)
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, exc_tb):
|
||||
self.close()
|
||||
|
||||
@property
|
||||
def pin(self):
|
||||
return self._pin
|
||||
@@ -35,17 +100,11 @@ class GPIODevice(object):
|
||||
return self._read()
|
||||
|
||||
def __repr__(self):
|
||||
return "<gpiozero.%s object on pin=%d, is_active=%s>" % (
|
||||
self.__class__.__name__, self.pin, self.is_active)
|
||||
|
||||
|
||||
_GPIO_THREADS = set()
|
||||
|
||||
|
||||
def _gpio_threads_shutdown():
|
||||
while _GPIO_THREADS:
|
||||
for t in _GPIO_THREADS.copy():
|
||||
t.stop()
|
||||
try:
|
||||
return "<gpiozero.%s object on pin=%d, is_active=%s>" % (
|
||||
self.__class__.__name__, self.pin, self.is_active)
|
||||
except GPIODeviceClosed:
|
||||
return "<gpiozero.%s object closed>" % self.__class__.__name__
|
||||
|
||||
|
||||
class GPIOThread(Thread):
|
||||
|
||||
Reference in New Issue
Block a user