mirror of
https://github.com/KevinMidboe/python-gpiozero.git
synced 2025-12-08 20:39:01 +00:00
@@ -63,18 +63,22 @@ class InputDevice(GPIODevice):
|
||||
self._inactive_state = GPIO.HIGH if pull_up else GPIO.LOW
|
||||
pull = GPIO.PUD_UP if pull_up else GPIO.PUD_DOWN
|
||||
|
||||
# NOTE: catch_warnings isn't thread-safe but hopefully no-one's messing
|
||||
# around with GPIO init within background threads...
|
||||
try:
|
||||
# NOTE: catch_warnings isn't thread-safe but hopefully no-one's
|
||||
# messing around with GPIO init within background threads...
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
GPIO.setup(pin, GPIO.IN, pull)
|
||||
# The only warning we want to squash is a RuntimeWarning that is thrown
|
||||
# when setting pins 2 or 3. Anything else should be replayed
|
||||
# The only warning we want to squash is a RuntimeWarning that is
|
||||
# thrown when setting pins 2 or 3. Anything else should be replayed
|
||||
for warning in w:
|
||||
if warning.category != RuntimeWarning or pin not in (2, 3):
|
||||
warnings.showwarning(
|
||||
warning.message, warning.category, warning.filename,
|
||||
warning.lineno, warning.file, warning.line
|
||||
)
|
||||
except:
|
||||
self.close()
|
||||
raise
|
||||
|
||||
@property
|
||||
def pull_up(self):
|
||||
@@ -238,6 +242,7 @@ class DigitalInputDevice(WaitableInputDevice):
|
||||
"""
|
||||
def __init__(self, pin=None, pull_up=False, bounce_time=None):
|
||||
super(DigitalInputDevice, self).__init__(pin, pull_up)
|
||||
try:
|
||||
# Yes, that's really the default bouncetime in RPi.GPIO...
|
||||
GPIO.add_event_detect(
|
||||
self.pin, GPIO.BOTH, callback=self._fire_events,
|
||||
@@ -245,6 +250,9 @@ class DigitalInputDevice(WaitableInputDevice):
|
||||
)
|
||||
# Call _fire_events once to set initial state of events
|
||||
super(DigitalInputDevice, self)._fire_events()
|
||||
except:
|
||||
self.close()
|
||||
raise
|
||||
|
||||
def _fire_events(self, channel):
|
||||
super(DigitalInputDevice, self)._fire_events()
|
||||
@@ -287,13 +295,19 @@ class SmoothedInputDevice(WaitableInputDevice):
|
||||
queue_len=5, sample_wait=0.0, partial=False):
|
||||
self._queue = None
|
||||
super(SmoothedInputDevice, self).__init__(pin, pull_up)
|
||||
try:
|
||||
self._queue = GPIOQueue(self, queue_len, sample_wait, partial)
|
||||
self.threshold = float(threshold)
|
||||
except:
|
||||
self.close()
|
||||
raise
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
self._queue.stop()
|
||||
except AttributeError:
|
||||
# If the queue isn't initialized (it's None) ignore the error
|
||||
# because we're trying to close anyway
|
||||
if self._queue is not None:
|
||||
raise
|
||||
except RuntimeError:
|
||||
@@ -398,7 +412,11 @@ class MotionSensor(SmoothedInputDevice):
|
||||
pin, pull_up=False, threshold=threshold,
|
||||
queue_len=queue_len, sample_wait=1 / sample_rate, partial=partial
|
||||
)
|
||||
try:
|
||||
self._queue.start()
|
||||
except:
|
||||
self.close()
|
||||
raise
|
||||
|
||||
motion_detected = _alias('is_active')
|
||||
|
||||
@@ -425,12 +443,16 @@ class LightSensor(SmoothedInputDevice):
|
||||
pin, pull_up=False, threshold=threshold,
|
||||
queue_len=queue_len, sample_wait=0.0, partial=partial
|
||||
)
|
||||
try:
|
||||
self._charge_time_limit = charge_time_limit
|
||||
self._charged = Event()
|
||||
GPIO.add_event_detect(
|
||||
self.pin, GPIO.RISING, lambda channel: self._charged.set()
|
||||
)
|
||||
self._queue.start()
|
||||
except:
|
||||
self.close()
|
||||
raise
|
||||
|
||||
@property
|
||||
def charge_time_limit(self):
|
||||
|
||||
@@ -22,18 +22,22 @@ class OutputDevice(GPIODevice):
|
||||
"""
|
||||
def __init__(self, pin=None):
|
||||
super(OutputDevice, self).__init__(pin)
|
||||
# NOTE: catch_warnings isn't thread-safe but hopefully no-one's messing
|
||||
# around with GPIO init within background threads...
|
||||
try:
|
||||
# NOTE: catch_warnings isn't thread-safe but hopefully no-one's
|
||||
# messing around with GPIO init within background threads...
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
GPIO.setup(pin, GPIO.OUT)
|
||||
# The only warning we want to squash is a RuntimeWarning that is thrown
|
||||
# when setting pins 2 or 3. Anything else should be replayed
|
||||
# The only warning we want to squash is a RuntimeWarning that is
|
||||
# thrown when setting pins 2 or 3. Anything else should be replayed
|
||||
for warning in w:
|
||||
if warning.category != RuntimeWarning or pin not in (2, 3):
|
||||
warnings.showwarning(
|
||||
warning.message, warning.category, warning.filename,
|
||||
warning.lineno, warning.file, warning.line
|
||||
)
|
||||
except:
|
||||
self.close()
|
||||
raise
|
||||
|
||||
def on(self):
|
||||
"""
|
||||
@@ -158,12 +162,16 @@ class PWMOutputDevice(DigitalOutputDevice):
|
||||
"""
|
||||
def __init__(self, pin=None):
|
||||
super(PWMOutputDevice, self).__init__(pin)
|
||||
try:
|
||||
self._frequency = 100
|
||||
self._pwm = GPIO.PWM(self._pin, self._frequency)
|
||||
self._pwm.start(0)
|
||||
self._min_pwm = 0
|
||||
self._max_pwm = 1
|
||||
self.value = 0
|
||||
except:
|
||||
self.close()
|
||||
raise
|
||||
|
||||
def on(self):
|
||||
"""
|
||||
@@ -195,13 +203,11 @@ class PWMOutputDevice(DigitalOutputDevice):
|
||||
def value(self, n):
|
||||
_min = self._min_pwm
|
||||
_max = self._max_pwm
|
||||
if _min <= n <= _max:
|
||||
dc = n * 100
|
||||
else:
|
||||
raise GPIODeviceError(
|
||||
if not _min <= n <= _max:
|
||||
raise OutputDeviceError(
|
||||
"Value must be between %s and %s" % (_min, _max)
|
||||
)
|
||||
self._pwm.ChangeDutyCycle(dc)
|
||||
self._pwm.ChangeDutyCycle(n * 100)
|
||||
self._value = n
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user