mirror of
https://github.com/KevinMidboe/python-gpiozero.git
synced 2025-10-29 17:50:37 +00:00
Merge pull request #141 from waveform80/pins
Refactor pins implementation
This commit is contained in:
84
docs/api_pins.rst
Normal file
84
docs/api_pins.rst
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
====
|
||||||
|
Pins
|
||||||
|
====
|
||||||
|
|
||||||
|
.. currentmodule:: gpiozero
|
||||||
|
|
||||||
|
As of release 1.1, the GPIO Zero library can be roughly divided into two
|
||||||
|
things: pins and the devices that are connected to them. The majority of the
|
||||||
|
documentation focuses on devices as pins are below the level that most users
|
||||||
|
are concerned with. However, some users may wish to take advantage of the
|
||||||
|
capabilities of alternative GPIO implementations or (in future) use GPIO
|
||||||
|
extender chips. This is the purpose of the pins portion of the library.
|
||||||
|
|
||||||
|
When you construct a device, you pass in a GPIO pin number. However, what the
|
||||||
|
library actually expects is a :class:`Pin` implementation. If it finds a simple
|
||||||
|
integer number instead, it uses one of the following classes to provide the
|
||||||
|
:class:`Pin` implementation (classes are listed in favoured order):
|
||||||
|
|
||||||
|
1. :class:`gpiozero.pins.rpigpio.RPiGPIOPin`
|
||||||
|
|
||||||
|
2. :class:`gpiozero.pins.rpio.RPIOPin`
|
||||||
|
|
||||||
|
3. :class:`gpiozero.pins.native.NativePin`
|
||||||
|
|
||||||
|
You can change the default pin implementation by over-writing the
|
||||||
|
``DefaultPin`` global in devices like so::
|
||||||
|
|
||||||
|
from gpiozero.pins.native import NativePin
|
||||||
|
import gpiozero.devices
|
||||||
|
# Force the default pin implementation to be NativePin
|
||||||
|
gpiozero.devices.DefaultPin = NativePin
|
||||||
|
|
||||||
|
from gpiozero import LED
|
||||||
|
|
||||||
|
# This will now use NativePin instead of RPiGPIOPin
|
||||||
|
led = LED(16)
|
||||||
|
|
||||||
|
In future, this separation should allow the library to utilize pins that are
|
||||||
|
part of IO extender chips. For example::
|
||||||
|
|
||||||
|
from gpiozero import IOExtender, LED
|
||||||
|
|
||||||
|
ext = IOExtender()
|
||||||
|
led = LED(ext.pins[0])
|
||||||
|
led.on()
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
|
||||||
|
While the devices API is now considered stable and won't change in
|
||||||
|
backwards incompatible ways, the pins API is *not* yet considered stable.
|
||||||
|
It is potentially subject to change in future versions. We welcome any
|
||||||
|
comments from testers!
|
||||||
|
|
||||||
|
|
||||||
|
Abstract Pin
|
||||||
|
============
|
||||||
|
|
||||||
|
.. autoclass:: Pin
|
||||||
|
:members:
|
||||||
|
|
||||||
|
|
||||||
|
RPiGPIOPin
|
||||||
|
==========
|
||||||
|
|
||||||
|
.. currentmodule:: gpiozero.pins.rpigpio
|
||||||
|
|
||||||
|
.. autoclass:: RPiGPIOPin
|
||||||
|
|
||||||
|
|
||||||
|
RPIOPin
|
||||||
|
=======
|
||||||
|
|
||||||
|
.. currentmodule:: gpiozero.pins.rpio
|
||||||
|
|
||||||
|
.. autoclass:: RPIOPin
|
||||||
|
|
||||||
|
|
||||||
|
NativePin
|
||||||
|
=========
|
||||||
|
|
||||||
|
.. currentmodule:: gpiozero.pins.native
|
||||||
|
|
||||||
|
.. autoclass:: NativePin
|
||||||
|
|
||||||
@@ -37,6 +37,8 @@ class Mock(object):
|
|||||||
|
|
||||||
sys.modules['RPi'] = Mock()
|
sys.modules['RPi'] = Mock()
|
||||||
sys.modules['RPi.GPIO'] = sys.modules['RPi'].GPIO
|
sys.modules['RPi.GPIO'] = sys.modules['RPi'].GPIO
|
||||||
|
sys.modules['RPIO'] = Mock()
|
||||||
|
sys.modules['RPIO.PWM'] = sys.modules['RPIO'].PWM
|
||||||
sys.modules['w1thermsensor'] = Mock()
|
sys.modules['w1thermsensor'] = Mock()
|
||||||
sys.modules['spidev'] = Mock()
|
sys.modules['spidev'] = Mock()
|
||||||
|
|
||||||
|
|||||||
@@ -12,5 +12,6 @@ Table of Contents
|
|||||||
api_output
|
api_output
|
||||||
api_boards
|
api_boards
|
||||||
api_generic
|
api_generic
|
||||||
|
api_pins
|
||||||
changelog
|
changelog
|
||||||
license
|
license
|
||||||
|
|||||||
@@ -281,7 +281,7 @@ Each button plays a different sound!
|
|||||||
|
|
||||||
buttons = [Button(pin) for pin in sound_pins]
|
buttons = [Button(pin) for pin in sound_pins]
|
||||||
for button in buttons:
|
for button in buttons:
|
||||||
sound = sound_pins[button.pin]
|
sound = sound_pins[button.pin.number]
|
||||||
button.when_pressed = sound.play
|
button.when_pressed = sound.play
|
||||||
|
|
||||||
pause()
|
pause()
|
||||||
|
|||||||
@@ -5,6 +5,23 @@ from __future__ import (
|
|||||||
division,
|
division,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from .pins.exc import (
|
||||||
|
PinError,
|
||||||
|
PinFixedFunction,
|
||||||
|
PinInvalidFunction,
|
||||||
|
PinInvalidState,
|
||||||
|
PinInvalidPull,
|
||||||
|
PinInvalidEdges,
|
||||||
|
PinSetInput,
|
||||||
|
PinFixedPull,
|
||||||
|
PinEdgeDetectUnsupported,
|
||||||
|
PinPWMError,
|
||||||
|
PinPWMUnsupported,
|
||||||
|
PinPWMFixedValue,
|
||||||
|
)
|
||||||
|
from .pins import (
|
||||||
|
Pin,
|
||||||
|
)
|
||||||
from .exc import (
|
from .exc import (
|
||||||
GPIODeviceClosed,
|
GPIODeviceClosed,
|
||||||
GPIODeviceError,
|
GPIODeviceError,
|
||||||
|
|||||||
@@ -13,29 +13,45 @@ from threading import Thread, Event, RLock
|
|||||||
from collections import deque
|
from collections import deque
|
||||||
from types import FunctionType
|
from types import FunctionType
|
||||||
|
|
||||||
from RPi import GPIO
|
|
||||||
|
|
||||||
from .exc import GPIODeviceError, GPIODeviceClosed, InputDeviceError
|
from .exc import GPIODeviceError, GPIODeviceClosed, InputDeviceError
|
||||||
|
|
||||||
_GPIO_THREADS = set()
|
# Get a pin implementation to use as the default; we prefer RPi.GPIO's here
|
||||||
_GPIO_PINS = set()
|
# as it supports PWM, and all Pi revisions. If no third-party libraries are
|
||||||
|
# available, however, we fall back to a pure Python implementation which
|
||||||
|
# supports platforms like PyPy
|
||||||
|
from .pins import PINS_CLEANUP
|
||||||
|
try:
|
||||||
|
from .pins.rpigpio import RPiGPIOPin
|
||||||
|
DefaultPin = RPiGPIOPin
|
||||||
|
except ImportError:
|
||||||
|
try:
|
||||||
|
from .pins.rpio import RPIOPin
|
||||||
|
DefaultPin = RPIOPin
|
||||||
|
except ImportError:
|
||||||
|
from .pins.native import NativePin
|
||||||
|
DefaultPin = NativePin
|
||||||
|
|
||||||
|
|
||||||
|
_THREADS = set()
|
||||||
|
_PINS = set()
|
||||||
# Due to interactions between RPi.GPIO cleanup and the GPIODevice.close()
|
# Due to interactions between RPi.GPIO cleanup and the GPIODevice.close()
|
||||||
# method the same thread may attempt to acquire this lock, leading to deadlock
|
# method the same thread may attempt to acquire this lock, leading to deadlock
|
||||||
# unless the lock is re-entrant
|
# unless the lock is re-entrant
|
||||||
_GPIO_PINS_LOCK = RLock()
|
_PINS_LOCK = RLock()
|
||||||
|
|
||||||
def _gpio_threads_shutdown():
|
def _shutdown():
|
||||||
while _GPIO_THREADS:
|
while _THREADS:
|
||||||
for t in _GPIO_THREADS.copy():
|
for t in _THREADS.copy():
|
||||||
t.stop()
|
t.stop()
|
||||||
with _GPIO_PINS_LOCK:
|
with _PINS_LOCK:
|
||||||
while _GPIO_PINS:
|
while _PINS:
|
||||||
GPIO.remove_event_detect(_GPIO_PINS.pop())
|
_PINS.pop().close()
|
||||||
GPIO.cleanup()
|
# Any cleanup routines registered by pins libraries must be called *after*
|
||||||
|
# cleanup of pin objects used by devices
|
||||||
|
for routine in PINS_CLEANUP:
|
||||||
|
routine()
|
||||||
|
|
||||||
atexit.register(_gpio_threads_shutdown)
|
atexit.register(_shutdown)
|
||||||
GPIO.setmode(GPIO.BCM)
|
|
||||||
GPIO.setwarnings(False)
|
|
||||||
|
|
||||||
|
|
||||||
class GPIOMeta(type):
|
class GPIOMeta(type):
|
||||||
@@ -196,20 +212,22 @@ class GPIODevice(ValuesMixin, GPIOBase):
|
|||||||
# value of pin until we've verified that it isn't already allocated
|
# value of pin until we've verified that it isn't already allocated
|
||||||
self._pin = None
|
self._pin = None
|
||||||
if pin is None:
|
if pin is None:
|
||||||
raise GPIODeviceError('No GPIO pin number given')
|
raise GPIODeviceError('No pin given')
|
||||||
with _GPIO_PINS_LOCK:
|
if isinstance(pin, int):
|
||||||
if pin in _GPIO_PINS:
|
pin = DefaultPin(pin)
|
||||||
|
with _PINS_LOCK:
|
||||||
|
if pin in _PINS:
|
||||||
raise GPIODeviceError(
|
raise GPIODeviceError(
|
||||||
'pin %d is already in use by another gpiozero object' % pin
|
'pin %r is already in use by another gpiozero object' % pin
|
||||||
)
|
)
|
||||||
_GPIO_PINS.add(pin)
|
_PINS.add(pin)
|
||||||
self._pin = pin
|
self._pin = pin
|
||||||
self._active_state = GPIO.HIGH
|
self._active_state = True
|
||||||
self._inactive_state = GPIO.LOW
|
self._inactive_state = False
|
||||||
|
|
||||||
def _read(self):
|
def _read(self):
|
||||||
try:
|
try:
|
||||||
return GPIO.input(self.pin) == self._active_state
|
return self.pin.state == self._active_state
|
||||||
except TypeError:
|
except TypeError:
|
||||||
self._check_open()
|
self._check_open()
|
||||||
raise
|
raise
|
||||||
@@ -260,13 +278,12 @@ class GPIODevice(ValuesMixin, GPIOBase):
|
|||||||
...
|
...
|
||||||
"""
|
"""
|
||||||
super(GPIODevice, self).close()
|
super(GPIODevice, self).close()
|
||||||
with _GPIO_PINS_LOCK:
|
with _PINS_LOCK:
|
||||||
pin = self._pin
|
pin = self._pin
|
||||||
self._pin = None
|
self._pin = None
|
||||||
if pin in _GPIO_PINS:
|
if pin in _PINS:
|
||||||
_GPIO_PINS.remove(pin)
|
_PINS.remove(pin)
|
||||||
GPIO.remove_event_detect(pin)
|
pin.close()
|
||||||
GPIO.cleanup(pin)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def closed(self):
|
def closed(self):
|
||||||
@@ -275,9 +292,10 @@ class GPIODevice(ValuesMixin, GPIOBase):
|
|||||||
@property
|
@property
|
||||||
def pin(self):
|
def pin(self):
|
||||||
"""
|
"""
|
||||||
The pin (in BCM numbering) that the device is connected to. This will
|
The :class:`Pin` that the device is connected to. This will be ``None``
|
||||||
be ``None`` if the device has been closed (see the :meth:`close`
|
if the device has been closed (see the :meth:`close` method). When
|
||||||
method).
|
dealing with GPIO pins, query ``pin.number`` to discover the GPIO
|
||||||
|
pin (in BCM numbering) that the device is connected to.
|
||||||
"""
|
"""
|
||||||
return self._pin
|
return self._pin
|
||||||
|
|
||||||
@@ -293,7 +311,7 @@ class GPIODevice(ValuesMixin, GPIOBase):
|
|||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
try:
|
try:
|
||||||
return "<gpiozero.%s object on pin=%d, is_active=%s>" % (
|
return "<gpiozero.%s object on pin %r, is_active=%s>" % (
|
||||||
self.__class__.__name__, self.pin, self.is_active)
|
self.__class__.__name__, self.pin, self.is_active)
|
||||||
except GPIODeviceClosed:
|
except GPIODeviceClosed:
|
||||||
return "<gpiozero.%s object closed>" % self.__class__.__name__
|
return "<gpiozero.%s object closed>" % self.__class__.__name__
|
||||||
@@ -307,7 +325,7 @@ class GPIOThread(Thread):
|
|||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
self.stopping.clear()
|
self.stopping.clear()
|
||||||
_GPIO_THREADS.add(self)
|
_THREADS.add(self)
|
||||||
super(GPIOThread, self).start()
|
super(GPIOThread, self).start()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
@@ -316,7 +334,7 @@ class GPIOThread(Thread):
|
|||||||
|
|
||||||
def join(self):
|
def join(self):
|
||||||
super(GPIOThread, self).join()
|
super(GPIOThread, self).join()
|
||||||
_GPIO_THREADS.discard(self)
|
_THREADS.discard(self)
|
||||||
|
|
||||||
|
|
||||||
class GPIOQueue(GPIOThread):
|
class GPIOQueue(GPIOThread):
|
||||||
|
|||||||
@@ -13,7 +13,6 @@ from functools import wraps
|
|||||||
from time import sleep, time
|
from time import sleep, time
|
||||||
from threading import Event
|
from threading import Event
|
||||||
|
|
||||||
from RPi import GPIO
|
|
||||||
from spidev import SpiDev
|
from spidev import SpiDev
|
||||||
|
|
||||||
from .exc import InputDeviceError, GPIODeviceError, GPIODeviceClosed
|
from .exc import InputDeviceError, GPIODeviceError, GPIODeviceClosed
|
||||||
@@ -39,39 +38,18 @@ class InputDevice(GPIODevice):
|
|||||||
``False`` (the default), the pin will be pulled low.
|
``False`` (the default), the pin will be pulled low.
|
||||||
"""
|
"""
|
||||||
def __init__(self, pin=None, pull_up=False):
|
def __init__(self, pin=None, pull_up=False):
|
||||||
if pin in (2, 3) and not pull_up:
|
|
||||||
raise InputDeviceError(
|
|
||||||
'GPIO pins 2 and 3 are fitted with physical pull up '
|
|
||||||
'resistors; you cannot initialize them with pull_up=False'
|
|
||||||
)
|
|
||||||
# _pull_up should be assigned first as __repr__ relies upon it to
|
|
||||||
# support the case where __repr__ is called during debugging of an
|
|
||||||
# instance that has failed to initialize (due to an exception in the
|
|
||||||
# super-class __init__)
|
|
||||||
self._pull_up = pull_up
|
|
||||||
super(InputDevice, self).__init__(pin)
|
super(InputDevice, self).__init__(pin)
|
||||||
self._active_edge = GPIO.FALLING if pull_up else GPIO.RISING
|
|
||||||
self._inactive_edge = GPIO.RISING if pull_up else GPIO.FALLING
|
|
||||||
self._active_state = GPIO.LOW if pull_up else GPIO.HIGH
|
|
||||||
self._inactive_state = GPIO.HIGH if pull_up else GPIO.LOW
|
|
||||||
pull = GPIO.PUD_UP if pull_up else GPIO.PUD_DOWN
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# NOTE: catch_warnings isn't thread-safe but hopefully no-one's
|
if self.pin.function != 'input':
|
||||||
# messing around with GPIO init within background threads...
|
self.pin.function = 'input'
|
||||||
with warnings.catch_warnings(record=True) as w:
|
pull = 'up' if pull_up else 'down'
|
||||||
GPIO.setup(pin, GPIO.IN, pull)
|
if self.pin.pull != pull:
|
||||||
# The only warning we want to squash is a RuntimeWarning that is
|
self.pin.pull = pull
|
||||||
# thrown when setting pins 2 or 3. Anything else should be replayed
|
|
||||||
for warning in w:
|
|
||||||
if warning.category != RuntimeWarning or pin not in (2, 3):
|
|
||||||
warnings.showwarning(
|
|
||||||
warning.message, warning.category, warning.filename,
|
|
||||||
warning.lineno, warning.file, warning.line
|
|
||||||
)
|
|
||||||
except:
|
except:
|
||||||
self.close()
|
self.close()
|
||||||
raise
|
raise
|
||||||
|
self._active_state = False if pull_up else True
|
||||||
|
self._inactive_state = True if pull_up else False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def pull_up(self):
|
def pull_up(self):
|
||||||
@@ -79,11 +57,11 @@ class InputDevice(GPIODevice):
|
|||||||
If ``True``, the device uses a pull-up resistor to set the GPIO pin
|
If ``True``, the device uses a pull-up resistor to set the GPIO pin
|
||||||
"high" by default. Defaults to ``False``.
|
"high" by default. Defaults to ``False``.
|
||||||
"""
|
"""
|
||||||
return self._pull_up
|
return self.pin.pull == 'up'
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
try:
|
try:
|
||||||
return "<gpiozero.%s object on pin=%d, pull_up=%s, is_active=%s>" % (
|
return "<gpiozero.%s object on pin %r, pull_up=%s, is_active=%s>" % (
|
||||||
self.__class__.__name__, self.pin, self.pull_up, self.is_active)
|
self.__class__.__name__, self.pin, self.pull_up, self.is_active)
|
||||||
except:
|
except:
|
||||||
return super(InputDevice, self).__repr__()
|
return super(InputDevice, self).__repr__()
|
||||||
@@ -245,20 +223,15 @@ class DigitalInputDevice(WaitableInputDevice):
|
|||||||
def __init__(self, pin=None, pull_up=False, bounce_time=None):
|
def __init__(self, pin=None, pull_up=False, bounce_time=None):
|
||||||
super(DigitalInputDevice, self).__init__(pin, pull_up)
|
super(DigitalInputDevice, self).__init__(pin, pull_up)
|
||||||
try:
|
try:
|
||||||
# Yes, that's really the default bouncetime in RPi.GPIO...
|
self.pin.bounce = bounce_time
|
||||||
GPIO.add_event_detect(
|
self.pin.edges = 'both'
|
||||||
self.pin, GPIO.BOTH, callback=self._fire_events,
|
self.pin.when_changed = self._fire_events
|
||||||
bouncetime=-666 if bounce_time is None else int(bounce_time * 1000)
|
|
||||||
)
|
|
||||||
# Call _fire_events once to set initial state of events
|
# Call _fire_events once to set initial state of events
|
||||||
super(DigitalInputDevice, self)._fire_events()
|
self._fire_events()
|
||||||
except:
|
except:
|
||||||
self.close()
|
self.close()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def _fire_events(self, channel):
|
|
||||||
super(DigitalInputDevice, self)._fire_events()
|
|
||||||
|
|
||||||
|
|
||||||
class SmoothedInputDevice(WaitableInputDevice):
|
class SmoothedInputDevice(WaitableInputDevice):
|
||||||
"""
|
"""
|
||||||
@@ -565,9 +538,9 @@ class LightSensor(SmoothedInputDevice):
|
|||||||
try:
|
try:
|
||||||
self._charge_time_limit = charge_time_limit
|
self._charge_time_limit = charge_time_limit
|
||||||
self._charged = Event()
|
self._charged = Event()
|
||||||
GPIO.add_event_detect(
|
self.pin.edges = 'rising'
|
||||||
self.pin, GPIO.RISING, lambda channel: self._charged.set()
|
self.pin.bounce = None
|
||||||
)
|
self.pin.when_changed = self._charged.set
|
||||||
self._queue.start()
|
self._queue.start()
|
||||||
except:
|
except:
|
||||||
self.close()
|
self.close()
|
||||||
@@ -579,13 +552,13 @@ class LightSensor(SmoothedInputDevice):
|
|||||||
|
|
||||||
def _read(self):
|
def _read(self):
|
||||||
# Drain charge from the capacitor
|
# Drain charge from the capacitor
|
||||||
GPIO.setup(self.pin, GPIO.OUT)
|
self.pin.function = 'output'
|
||||||
GPIO.output(self.pin, GPIO.LOW)
|
self.pin.state = False
|
||||||
sleep(0.1)
|
sleep(0.1)
|
||||||
# Time the charging of the capacitor
|
# Time the charging of the capacitor
|
||||||
start = time()
|
start = time()
|
||||||
self._charged.clear()
|
self._charged.clear()
|
||||||
GPIO.setup(self.pin, GPIO.IN)
|
self.pin.function = 'input'
|
||||||
self._charged.wait(self.charge_time_limit)
|
self._charged.wait(self.charge_time_limit)
|
||||||
return (
|
return (
|
||||||
1.0 - min(self.charge_time_limit, time() - start) /
|
1.0 - min(self.charge_time_limit, time() - start) /
|
||||||
|
|||||||
@@ -10,8 +10,6 @@ from time import sleep
|
|||||||
from threading import Lock
|
from threading import Lock
|
||||||
from itertools import repeat, cycle, chain
|
from itertools import repeat, cycle, chain
|
||||||
|
|
||||||
from RPi import GPIO
|
|
||||||
|
|
||||||
from .exc import OutputDeviceError, GPIODeviceError, GPIODeviceClosed
|
from .exc import OutputDeviceError, GPIODeviceError, GPIODeviceClosed
|
||||||
from .devices import GPIODevice, GPIOThread, CompositeDevice, SourceMixin
|
from .devices import GPIODevice, GPIOThread, CompositeDevice, SourceMixin
|
||||||
|
|
||||||
@@ -42,36 +40,20 @@ class OutputDevice(SourceMixin, GPIODevice):
|
|||||||
def __init__(self, pin=None, active_high=True, initial_value=False):
|
def __init__(self, pin=None, active_high=True, initial_value=False):
|
||||||
self._active_high = active_high
|
self._active_high = active_high
|
||||||
super(OutputDevice, self).__init__(pin)
|
super(OutputDevice, self).__init__(pin)
|
||||||
self._active_state = GPIO.HIGH if active_high else GPIO.LOW
|
self._active_state = True if active_high else False
|
||||||
self._inactive_state = GPIO.LOW if active_high else GPIO.HIGH
|
self._inactive_state = False if active_high else True
|
||||||
try:
|
if initial_value is None:
|
||||||
# NOTE: catch_warnings isn't thread-safe but hopefully no-one's
|
self.pin.function = 'output'
|
||||||
# messing around with GPIO init within background threads...
|
elif initial_value:
|
||||||
with warnings.catch_warnings(record=True) as w:
|
self.pin.output_with_state(self._active_state)
|
||||||
# This is horrid, but we can't specify initial=None with setup
|
else:
|
||||||
if initial_value is None:
|
self.pin.output_with_state(self._inactive_state)
|
||||||
GPIO.setup(pin, GPIO.OUT)
|
|
||||||
else:
|
|
||||||
GPIO.setup(pin, GPIO.OUT, initial=
|
|
||||||
[self._inactive_state, self._active_state][bool(initial_value)])
|
|
||||||
GPIO.setup(pin, GPIO.OUT)
|
|
||||||
# The only warning we want to squash is a RuntimeWarning that is
|
|
||||||
# thrown when setting pins 2 or 3. Anything else should be replayed
|
|
||||||
for warning in w:
|
|
||||||
if warning.category != RuntimeWarning or pin not in (2, 3):
|
|
||||||
warnings.showwarning(
|
|
||||||
warning.message, warning.category, warning.filename,
|
|
||||||
warning.lineno, warning.file, warning.line
|
|
||||||
)
|
|
||||||
except:
|
|
||||||
self.close()
|
|
||||||
raise
|
|
||||||
|
|
||||||
def _write(self, value):
|
def _write(self, value):
|
||||||
if not self.active_high:
|
if not self.active_high:
|
||||||
value = not value
|
value = not value
|
||||||
try:
|
try:
|
||||||
GPIO.output(self.pin, bool(value))
|
self.pin.state = bool(value)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
self._check_open()
|
self._check_open()
|
||||||
raise
|
raise
|
||||||
@@ -102,7 +84,7 @@ class OutputDevice(SourceMixin, GPIODevice):
|
|||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
try:
|
try:
|
||||||
return '<gpiozero.%s object on pin=%d, active_high=%s, is_active=%s>' % (
|
return '<gpiozero.%s object on pin %r, active_high=%s, is_active=%s>' % (
|
||||||
self.__class__.__name__, self.pin, self.active_high, self.is_active)
|
self.__class__.__name__, self.pin, self.active_high, self.is_active)
|
||||||
except:
|
except:
|
||||||
return super(OutputDevice, self).__repr__()
|
return super(OutputDevice, self).__repr__()
|
||||||
@@ -285,40 +267,33 @@ class PWMOutputDevice(OutputDevice):
|
|||||||
to 100Hz.
|
to 100Hz.
|
||||||
"""
|
"""
|
||||||
def __init__(self, pin=None, active_high=True, initial_value=0, frequency=100):
|
def __init__(self, pin=None, active_high=True, initial_value=0, frequency=100):
|
||||||
self._pwm = None
|
|
||||||
self._blink_thread = None
|
self._blink_thread = None
|
||||||
if not 0 <= initial_value <= 1:
|
if not 0 <= initial_value <= 1:
|
||||||
raise OutputDeviceError("initial_value must be between 0 and 1")
|
raise OutputDeviceError("initial_value must be between 0 and 1")
|
||||||
super(PWMOutputDevice, self).__init__(pin, active_high)
|
super(PWMOutputDevice, self).__init__(pin, active_high)
|
||||||
try:
|
try:
|
||||||
self._pwm = GPIO.PWM(self.pin, frequency)
|
# XXX need a way of setting these together
|
||||||
self._value = initial_value
|
self.pin.frequency = frequency
|
||||||
if not active_high:
|
self.value = initial_value
|
||||||
initial_value = 1 - initial_value
|
|
||||||
self._pwm.start(100 * initial_value)
|
|
||||||
self._frequency = frequency
|
|
||||||
except:
|
except:
|
||||||
self.close()
|
self.close()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self._stop_blink()
|
self._stop_blink()
|
||||||
if self._pwm:
|
try:
|
||||||
# Ensure we wipe out the PWM object so that re-runs don't attempt
|
self.pin.frequency = None
|
||||||
# to re-stop the PWM thread (otherwise, the fact that close is
|
except AttributeError:
|
||||||
# called from __del__ can easily result in us stopping the PWM
|
# If the pin's already None, ignore the exception
|
||||||
# on *another* instance on the same pin)
|
pass
|
||||||
p = self._pwm
|
|
||||||
self._pwm = None
|
|
||||||
p.stop()
|
|
||||||
super(PWMOutputDevice, self).close()
|
super(PWMOutputDevice, self).close()
|
||||||
|
|
||||||
def _read(self):
|
def _read(self):
|
||||||
self._check_open()
|
self._check_open()
|
||||||
if self.active_high:
|
if self.active_high:
|
||||||
return self._value
|
return self.pin.state
|
||||||
else:
|
else:
|
||||||
return 1 - self._value
|
return 1 - self.pin.state
|
||||||
|
|
||||||
def _write(self, value):
|
def _write(self, value):
|
||||||
if not self.active_high:
|
if not self.active_high:
|
||||||
@@ -326,11 +301,10 @@ class PWMOutputDevice(OutputDevice):
|
|||||||
if not 0 <= value <= 1:
|
if not 0 <= value <= 1:
|
||||||
raise OutputDeviceError("PWM value must be between 0 and 1")
|
raise OutputDeviceError("PWM value must be between 0 and 1")
|
||||||
try:
|
try:
|
||||||
self._pwm.ChangeDutyCycle(value * 100)
|
self.pin.state = value
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
self._check_open()
|
self._check_open()
|
||||||
raise
|
raise
|
||||||
self._value = value
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def value(self):
|
def value(self):
|
||||||
@@ -361,7 +335,7 @@ class PWMOutputDevice(OutputDevice):
|
|||||||
toggle it to 0.9, and so on.
|
toggle it to 0.9, and so on.
|
||||||
"""
|
"""
|
||||||
self._stop_blink()
|
self._stop_blink()
|
||||||
self.value = 1.0 - self.value
|
self.value = 1 - self.value
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_active(self):
|
def is_active(self):
|
||||||
@@ -377,12 +351,11 @@ class PWMOutputDevice(OutputDevice):
|
|||||||
The frequency of the pulses used with the PWM device, in Hz. The
|
The frequency of the pulses used with the PWM device, in Hz. The
|
||||||
default is 100Hz.
|
default is 100Hz.
|
||||||
"""
|
"""
|
||||||
return self._frequency
|
return self.pin.frequency
|
||||||
|
|
||||||
@frequency.setter
|
@frequency.setter
|
||||||
def frequency(self, value):
|
def frequency(self, value):
|
||||||
self._pwm.ChangeFrequency(value)
|
self.pin.frequency = value
|
||||||
self._frequency = value
|
|
||||||
|
|
||||||
def blink(
|
def blink(
|
||||||
self, on_time=1, off_time=1, fade_in_time=0, fade_out_time=0,
|
self, on_time=1, off_time=1, fade_in_time=0, fade_out_time=0,
|
||||||
|
|||||||
241
gpiozero/pins/__init__.py
Normal file
241
gpiozero/pins/__init__.py
Normal file
@@ -0,0 +1,241 @@
|
|||||||
|
from __future__ import (
|
||||||
|
unicode_literals,
|
||||||
|
absolute_import,
|
||||||
|
print_function,
|
||||||
|
division,
|
||||||
|
)
|
||||||
|
str = type('')
|
||||||
|
|
||||||
|
from .exc import (
|
||||||
|
PinFixedFunction,
|
||||||
|
PinSetInput,
|
||||||
|
PinFixedPull,
|
||||||
|
PinPWMUnsupported,
|
||||||
|
PinEdgeDetectUnsupported,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
PINS_CLEANUP = []
|
||||||
|
|
||||||
|
|
||||||
|
class Pin(object):
|
||||||
|
"""
|
||||||
|
Abstract base class representing a GPIO pin or a pin from an IO extender.
|
||||||
|
|
||||||
|
Descendents should override property getters and setters to accurately
|
||||||
|
represent the capabilities of pins. The following functions *must* be
|
||||||
|
overridden:
|
||||||
|
|
||||||
|
* :meth:`_get_function`
|
||||||
|
* :meth:`_get_state`
|
||||||
|
|
||||||
|
The following functions *may* be overridden if applicable:
|
||||||
|
|
||||||
|
* :meth:`close`
|
||||||
|
* :meth:`_set_function`
|
||||||
|
* :meth:`_set_state`
|
||||||
|
* :meth:`_get_frequency`
|
||||||
|
* :meth:`_set_frequency`
|
||||||
|
* :meth:`_get_pull`
|
||||||
|
* :meth:`_set_pull`
|
||||||
|
* :meth:`_get_bounce`
|
||||||
|
* :meth:`_set_bounce`
|
||||||
|
* :meth:`_get_edges`
|
||||||
|
* :meth:`_set_edges`
|
||||||
|
* :meth:`_get_when_changed`
|
||||||
|
* :meth:`_set_when_changed`
|
||||||
|
* :meth:`output_with_state`
|
||||||
|
* :meth:`input_with_pull`
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
|
||||||
|
Descendents must ensure that pin instances representing the same
|
||||||
|
physical hardware are identical, right down to object identity. The
|
||||||
|
framework relies on this to correctly clean up resources at interpreter
|
||||||
|
shutdown.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "Abstract pin"
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""
|
||||||
|
Cleans up the resources allocated to the pin. After this method is
|
||||||
|
called, this :class:`Pin` instance may no longer be used to query or
|
||||||
|
control the pin's state.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def output_with_state(self, state):
|
||||||
|
"""
|
||||||
|
Sets the pin's function to "output" and specifies an initial state
|
||||||
|
for the pin. By default this is equivalent to performing::
|
||||||
|
|
||||||
|
pin.function = 'output'
|
||||||
|
pin.state = state
|
||||||
|
|
||||||
|
However, descendents may override this in order to provide the smallest
|
||||||
|
possible delay between configuring the pin for output and specifying an
|
||||||
|
initial value (which can be important for avoiding "blips" in
|
||||||
|
active-low configurations).
|
||||||
|
"""
|
||||||
|
self.function = 'output'
|
||||||
|
self.state = state
|
||||||
|
|
||||||
|
def input_with_pull(self, pull):
|
||||||
|
"""
|
||||||
|
Sets the pin's function to "input" and specifies an initial pull-up
|
||||||
|
for the pin. By default this is equivalent to performing::
|
||||||
|
|
||||||
|
pin.function = 'input'
|
||||||
|
pin.pull = pull
|
||||||
|
|
||||||
|
However, descendents may override this order to provide the smallest
|
||||||
|
possible delay between configuring the pin for input and pulling the
|
||||||
|
pin up/down (which can be important for avoiding "blips" in some
|
||||||
|
configurations).
|
||||||
|
"""
|
||||||
|
self.function = 'input'
|
||||||
|
self.pull = pull
|
||||||
|
|
||||||
|
def _get_function(self):
|
||||||
|
return "input"
|
||||||
|
|
||||||
|
def _set_function(self, value):
|
||||||
|
raise PinFixedFunction("Cannot set the function of pin %r" % self)
|
||||||
|
|
||||||
|
function = property(
|
||||||
|
lambda self: self._get_function(),
|
||||||
|
lambda self, value: self._set_function(value),
|
||||||
|
doc="""\
|
||||||
|
The function of the pin. This property is a string indicating the
|
||||||
|
current function or purpose of the pin. Typically this is the string
|
||||||
|
"input" or "output". However, in some circumstances it can be other
|
||||||
|
strings indicating non-GPIO related functionality.
|
||||||
|
|
||||||
|
With certain pin types (e.g. GPIO pins), this attribute can be changed
|
||||||
|
to configure the function of a pin. If an invalid function is
|
||||||
|
specified, for this attribute, :exc:`PinInvalidFunction` will be
|
||||||
|
raised. If this pin is fixed function and an attempt is made to set
|
||||||
|
this attribute, :exc:`PinFixedFunction` will be raised.
|
||||||
|
""")
|
||||||
|
|
||||||
|
def _get_state(self):
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def _set_state(self, value):
|
||||||
|
raise PinSetInput("Cannot set the state of input pin %r" % self)
|
||||||
|
|
||||||
|
state = property(
|
||||||
|
lambda self: self._get_state(),
|
||||||
|
lambda self, value: self._set_state(value),
|
||||||
|
doc="""\
|
||||||
|
The state of the pin. This is 0 for low, and 1 for high. As a low level
|
||||||
|
view of the pin, no swapping is performed in the case of pull ups (see
|
||||||
|
:attr:`pull` for more information).
|
||||||
|
|
||||||
|
If PWM is currently active (when :attr:`frequency` is not ``None``),
|
||||||
|
this represents the PWM duty cycle as a value between 0.0 and 1.0.
|
||||||
|
|
||||||
|
If a pin is currently configured for input, and an attempt is made to
|
||||||
|
set this attribute, :exc:`PinSetInput` will be raised. If an invalid
|
||||||
|
value is specified for this attribute, :exc:`PinInvalidState` will be
|
||||||
|
raised.
|
||||||
|
""")
|
||||||
|
|
||||||
|
def _get_pull(self):
|
||||||
|
return 'floating'
|
||||||
|
|
||||||
|
def _set_pull(self, value):
|
||||||
|
raise PinFixedPull("Cannot change pull-up on pin %r" % self)
|
||||||
|
|
||||||
|
pull = property(
|
||||||
|
lambda self: self._get_pull(),
|
||||||
|
lambda self, value: self._set_pull(value),
|
||||||
|
doc="""\
|
||||||
|
The pull-up state of the pin represented as a string. This is typically
|
||||||
|
one of the strings "up", "down", or "floating" but additional values
|
||||||
|
may be supported by the underlying hardware.
|
||||||
|
|
||||||
|
If the pin does not support changing pull-up state (for example because
|
||||||
|
of a fixed pull-up resistor), attempts to set this property will raise
|
||||||
|
:exc:`PinFixedPull`. If the specified value is not supported by the
|
||||||
|
underlying hardware, :exc:`PinInvalidPull` is raised.
|
||||||
|
""")
|
||||||
|
|
||||||
|
def _get_frequency(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _set_frequency(self, value):
|
||||||
|
if value is not None:
|
||||||
|
raise PinPWMUnsupported("PWM is not supported on pin %r" % self)
|
||||||
|
|
||||||
|
frequency = property(
|
||||||
|
lambda self: self._get_frequency(),
|
||||||
|
lambda self, value: self._set_frequency(value),
|
||||||
|
doc="""\
|
||||||
|
The frequency (in Hz) for the pin's PWM implementation, or ``None`` if
|
||||||
|
PWM is not currently in use. This value always defaults to ``None`` and
|
||||||
|
may be changed with certain pin types to activate or deactivate PWM.
|
||||||
|
|
||||||
|
If the pin does not support PWM, :exc:`PinPWMUnsupported` will be
|
||||||
|
raised when attempting to set this to a value other than ``None``.
|
||||||
|
""")
|
||||||
|
|
||||||
|
def _get_bounce(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _set_bounce(self, value):
|
||||||
|
if value is not None:
|
||||||
|
raise PinEdgeDetectUnsupported("Edge detection is not supported on pin %r" % self)
|
||||||
|
|
||||||
|
bounce = property(
|
||||||
|
lambda self: self._get_bounce(),
|
||||||
|
lambda self, value: self._set_bounce(value),
|
||||||
|
doc="""\
|
||||||
|
The amount of bounce detection (elimination) currently in use by edge
|
||||||
|
detection, measured in seconds. If bounce detection is not currently in
|
||||||
|
use, this is ``None``.
|
||||||
|
|
||||||
|
If the pin does not support edge detection, attempts to set this
|
||||||
|
property will raise :exc:`PinEdgeDetectUnsupported`. If the pin
|
||||||
|
supports edge detection, the class must implement bounce detection,
|
||||||
|
even if only in software.
|
||||||
|
""")
|
||||||
|
|
||||||
|
def _get_edges(self):
|
||||||
|
return 'both'
|
||||||
|
|
||||||
|
def _set_edges(self, value):
|
||||||
|
raise PinEdgeDetectUnsupported("Edge detection is not supported on pin %r" % self)
|
||||||
|
|
||||||
|
edges = property(
|
||||||
|
lambda self: self._get_edges(),
|
||||||
|
lambda self, value: self._set_edges(value),
|
||||||
|
doc="""\
|
||||||
|
The edge that will trigger execution of the function or bound method
|
||||||
|
assigned to :attr:`when_changed`. This can be one of the strings
|
||||||
|
"both" (the default), "rising", or "falling".
|
||||||
|
|
||||||
|
If the pin does not support edge detection, attempts to set this
|
||||||
|
property will raise :exc:`PinEdgeDetectUnsupported`.
|
||||||
|
""")
|
||||||
|
|
||||||
|
def _get_when_changed(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _set_when_changed(self, value):
|
||||||
|
raise PinEdgeDetectUnsupported("Edge detection is not supported on pin %r" % self)
|
||||||
|
|
||||||
|
when_changed = property(
|
||||||
|
lambda self: self._get_when_changed(),
|
||||||
|
lambda self, value: self._set_when_changed(value),
|
||||||
|
doc="""\
|
||||||
|
A function or bound method to be called when the pin's state changes
|
||||||
|
(more specifically when the edge specified by :attr:`edges` is detected
|
||||||
|
on the pin). The function or bound method must take no parameters.
|
||||||
|
|
||||||
|
If the pin does not support edge detection, attempts to set this
|
||||||
|
property will raise :exc:`PinEdgeDetectUnsupported`.
|
||||||
|
""")
|
||||||
|
|
||||||
45
gpiozero/pins/exc.py
Normal file
45
gpiozero/pins/exc.py
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
from __future__ import (
|
||||||
|
unicode_literals,
|
||||||
|
absolute_import,
|
||||||
|
print_function,
|
||||||
|
division,
|
||||||
|
)
|
||||||
|
str = type('')
|
||||||
|
|
||||||
|
|
||||||
|
class PinError(Exception):
|
||||||
|
"Base class for errors related to pin implementations"
|
||||||
|
|
||||||
|
class PinFixedFunction(PinError, AttributeError):
|
||||||
|
"Error raised when attempting to change the function of a fixed type pin"
|
||||||
|
|
||||||
|
class PinInvalidFunction(PinError, ValueError):
|
||||||
|
"Error raised when attempting to change the function of a pin to an invalid value"
|
||||||
|
|
||||||
|
class PinInvalidState(PinError, ValueError):
|
||||||
|
"Error raised when attempting to assign an invalid state to a pin"
|
||||||
|
|
||||||
|
class PinInvalidPull(PinError, ValueError):
|
||||||
|
"Error raised when attempting to assign an invalid pull-up to a pin"
|
||||||
|
|
||||||
|
class PinInvalidEdges(PinError, ValueError):
|
||||||
|
"Error raised when attempting to assign an invalid edge detection to a pin"
|
||||||
|
|
||||||
|
class PinSetInput(PinError, AttributeError):
|
||||||
|
"Error raised when attempting to set a read-only pin"
|
||||||
|
|
||||||
|
class PinFixedPull(PinError, AttributeError):
|
||||||
|
"Error raised when attempting to set the pull of a pin with fixed pull-up"
|
||||||
|
|
||||||
|
class PinEdgeDetectUnsupported(PinError, AttributeError):
|
||||||
|
"Error raised when attempting to use edge detection on unsupported pins"
|
||||||
|
|
||||||
|
class PinPWMError(PinError):
|
||||||
|
"Base class for errors related to PWM implementations"
|
||||||
|
|
||||||
|
class PinPWMUnsupported(PinPWMError, AttributeError):
|
||||||
|
"Error raised when attempting to activate PWM on unsupported pins"
|
||||||
|
|
||||||
|
class PinPWMFixedValue(PinPWMError, AttributeError):
|
||||||
|
"Error raised when attempting to initialize PWM on an input pin"
|
||||||
|
|
||||||
336
gpiozero/pins/native.py
Normal file
336
gpiozero/pins/native.py
Normal file
@@ -0,0 +1,336 @@
|
|||||||
|
from __future__ import (
|
||||||
|
unicode_literals,
|
||||||
|
absolute_import,
|
||||||
|
print_function,
|
||||||
|
division,
|
||||||
|
)
|
||||||
|
nstr = str
|
||||||
|
str = type('')
|
||||||
|
|
||||||
|
import io
|
||||||
|
import os
|
||||||
|
import mmap
|
||||||
|
import errno
|
||||||
|
import struct
|
||||||
|
from time import sleep
|
||||||
|
from threading import Thread, Event, Lock
|
||||||
|
from collections import Counter
|
||||||
|
|
||||||
|
from . import Pin, PINS_CLEANUP
|
||||||
|
from .exc import (
|
||||||
|
PinInvalidPull,
|
||||||
|
PinInvalidEdges,
|
||||||
|
PinInvalidFunction,
|
||||||
|
PinFixedPull,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class GPIOMemory(object):
|
||||||
|
|
||||||
|
GPIO_BASE_OFFSET = 0x200000
|
||||||
|
PERI_BASE_OFFSET = {
|
||||||
|
'BCM2708': 0x20000000,
|
||||||
|
'BCM2835': 0x20000000,
|
||||||
|
'BCM2709': 0x3f000000,
|
||||||
|
'BCM2836': 0x3f000000,
|
||||||
|
}
|
||||||
|
|
||||||
|
# From BCM2835 data-sheet, p.91
|
||||||
|
GPFSEL_OFFSET = 0x00 >> 2
|
||||||
|
GPSET_OFFSET = 0x1c >> 2
|
||||||
|
GPCLR_OFFSET = 0x28 >> 2
|
||||||
|
GPLEV_OFFSET = 0x34 >> 2
|
||||||
|
GPEDS_OFFSET = 0x40 >> 2
|
||||||
|
GPREN_OFFSET = 0x4c >> 2
|
||||||
|
GPFEN_OFFSET = 0x58 >> 2
|
||||||
|
GPHEN_OFFSET = 0x64 >> 2
|
||||||
|
GPLEN_OFFSET = 0x70 >> 2
|
||||||
|
GPAREN_OFFSET = 0x7c >> 2
|
||||||
|
GPAFEN_OFFSET = 0x88 >> 2
|
||||||
|
GPPUD_OFFSET = 0x94 >> 2
|
||||||
|
GPPUDCLK_OFFSET = 0x98 >> 2
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
try:
|
||||||
|
self.fd = os.open('/dev/gpiomem', os.O_RDWR | os.O_SYNC)
|
||||||
|
except OSError:
|
||||||
|
try:
|
||||||
|
self.fd = os.open('/dev/mem', os.O_RDWR | os.O_SYNC)
|
||||||
|
except OSError:
|
||||||
|
raise IOError(
|
||||||
|
'unable to open /dev/gpiomem or /dev/mem; '
|
||||||
|
'upgrade your kernel or run as root')
|
||||||
|
else:
|
||||||
|
offset = self.peripheral_base() + self.GPIO_BASE_OFFSET
|
||||||
|
else:
|
||||||
|
offset = 0
|
||||||
|
self.mem = mmap.mmap(self.fd, 4096, offset=offset)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.mem.close()
|
||||||
|
os.close(self.fd)
|
||||||
|
|
||||||
|
def peripheral_base(self):
|
||||||
|
try:
|
||||||
|
with io.open('/proc/device-tree/soc/ranges', 'rb') as f:
|
||||||
|
f.seek(4)
|
||||||
|
return struct.unpack(nstr('>L'), f.read(4))[0]
|
||||||
|
except IOError:
|
||||||
|
with io.open('/proc/cpuinfo', 'r') as f:
|
||||||
|
for line in f:
|
||||||
|
if line.startswith('Hardware'):
|
||||||
|
try:
|
||||||
|
return self.PERI_BASE_OFFSET[line.split(':')[1].strip()]
|
||||||
|
except KeyError:
|
||||||
|
raise IOError('unable to determine RPi revision')
|
||||||
|
raise IOError('unable to determine peripheral base')
|
||||||
|
|
||||||
|
def __getitem__(self, index):
|
||||||
|
return struct.unpack_from(nstr('<L'), self.mem, index * 4)[0]
|
||||||
|
|
||||||
|
def __setitem__(self, index, value):
|
||||||
|
struct.pack_into(nstr('<L'), self.mem, index * 4, value)
|
||||||
|
|
||||||
|
|
||||||
|
class GPIOFS(object):
|
||||||
|
|
||||||
|
GPIO_PATH = '/sys/class/gpio'
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._lock = Lock()
|
||||||
|
self._pin_refs = Counter()
|
||||||
|
|
||||||
|
def path(self, name):
|
||||||
|
return os.path.join(self.GPIO_PATH, name)
|
||||||
|
|
||||||
|
def export(self, pin):
|
||||||
|
with self._lock:
|
||||||
|
if self._pin_refs[pin] == 0:
|
||||||
|
# Set the count to 1 to indicate the GPIO is already exported
|
||||||
|
# (we'll correct this if we find it isn't, but this enables us
|
||||||
|
# to "leave the system in the state we found it")
|
||||||
|
self._pin_refs[pin] = 1
|
||||||
|
result = None
|
||||||
|
# Dirty hack to wait for udev to set permissions on
|
||||||
|
# gpioN/direction; there's no other way around this as there's
|
||||||
|
# no synchronous mechanism for setting permissions on sysfs
|
||||||
|
for i in range(10):
|
||||||
|
try:
|
||||||
|
result = io.open(self.path('gpio%d/value' % pin), 'w+b', buffering=0)
|
||||||
|
except IOError as e:
|
||||||
|
if e.errno == errno.ENOENT:
|
||||||
|
with io.open(self.path('export'), 'wb') as f:
|
||||||
|
f.write(str(pin).encode('ascii'))
|
||||||
|
# Pin wasn't exported, so correct the ref-count
|
||||||
|
self._pin_refs[pin] = 0
|
||||||
|
elif e.errno == errno.EACCES:
|
||||||
|
sleep(i / 100)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
if not result:
|
||||||
|
raise RuntimeError('failed to export pin %d' % pin)
|
||||||
|
else:
|
||||||
|
result = io.open(self.path('gpio%d/value' % pin), 'w+b', buffering=0)
|
||||||
|
self._pin_refs[pin] += 1
|
||||||
|
return result
|
||||||
|
|
||||||
|
def unexport(self, pin):
|
||||||
|
with self._lock:
|
||||||
|
self._pin_refs[pin] -= 1
|
||||||
|
if self._pin_refs[pin] == 0:
|
||||||
|
with io.open(self.path('unexport'), 'wb') as f:
|
||||||
|
f.write(str(pin).encode('ascii'))
|
||||||
|
|
||||||
|
|
||||||
|
class NativePin(Pin):
|
||||||
|
"""
|
||||||
|
Uses a built-in pure Python implementation to interface to the Pi's GPIO
|
||||||
|
pins. This is the default pin implementation if no third-party libraries
|
||||||
|
are discovered.
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
|
||||||
|
This implementation does *not* currently support PWM. Attempting to
|
||||||
|
use any class which requests PWM will raise an exception. This
|
||||||
|
implementation is also experimental; we make no guarantees it will
|
||||||
|
not eat your Pi for breakfast!
|
||||||
|
"""
|
||||||
|
|
||||||
|
_MEM = None
|
||||||
|
_PINS = {}
|
||||||
|
|
||||||
|
PULL_NAMES = {
|
||||||
|
0b00: 'floating',
|
||||||
|
0b01: 'down',
|
||||||
|
0b10: 'up',
|
||||||
|
0b11: 'reserved',
|
||||||
|
}
|
||||||
|
|
||||||
|
FUNCTION_NAMES = {
|
||||||
|
0b000: 'input',
|
||||||
|
0b001: 'output',
|
||||||
|
0b100: 'alt0',
|
||||||
|
0b101: 'alt1',
|
||||||
|
0b110: 'alt2',
|
||||||
|
0b111: 'alt3',
|
||||||
|
0b011: 'alt4',
|
||||||
|
0b010: 'alt5',
|
||||||
|
}
|
||||||
|
|
||||||
|
EDGE_NAMES = {
|
||||||
|
(True, True): 'both',
|
||||||
|
(True, False): 'rising',
|
||||||
|
(False, True): 'falling',
|
||||||
|
(False, False): 'none',
|
||||||
|
}
|
||||||
|
|
||||||
|
PULL_VALUES = {v: k for (k, v) in PULL_NAMES.items()}
|
||||||
|
FUNCTION_VALUES = {v: k for (k, v) in FUNCTION_NAMES.items()}
|
||||||
|
EDGE_VALUES = {v: k for (k, v) in EDGE_NAMES.items()}
|
||||||
|
|
||||||
|
def __new__(cls, number):
|
||||||
|
if not cls._PINS:
|
||||||
|
cls._MEM = GPIOMemory()
|
||||||
|
PINS_CLEANUP.append(cls._MEM.close)
|
||||||
|
if not (0 <= number < 54):
|
||||||
|
raise ValueError('invalid pin %d specified (must be 0..53)' % number)
|
||||||
|
try:
|
||||||
|
return cls._PINS[number]
|
||||||
|
except KeyError:
|
||||||
|
self = super(NativePin, cls).__new__(cls)
|
||||||
|
cls._PINS[number] = self
|
||||||
|
self._number = number
|
||||||
|
self._func_offset = self._MEM.GPFSEL_OFFSET + (number // 10)
|
||||||
|
self._func_shift = (number % 10) * 3
|
||||||
|
self._set_offset = self._MEM.GPSET_OFFSET + (number // 32)
|
||||||
|
self._set_shift = number % 32
|
||||||
|
self._clear_offset = self._MEM.GPCLR_OFFSET + (number // 32)
|
||||||
|
self._clear_shift = number % 32
|
||||||
|
self._level_offset = self._MEM.GPLEV_OFFSET + (number // 32)
|
||||||
|
self._level_shift = number % 32
|
||||||
|
self._pull_offset = self._MEM.GPPUDCLK_OFFSET + (number // 32)
|
||||||
|
self._pull_shift = number % 32
|
||||||
|
self._edge_offset = self._MEM.GPEDS_OFFSET + (number // 32)
|
||||||
|
self._edge_shift = number % 32
|
||||||
|
self._rising_offset = self._MEM.GPREN_OFFSET + (number // 32)
|
||||||
|
self._rising_shift = number % 32
|
||||||
|
self._falling_offset = self._MEM.GPFEN_OFFSET + (number // 32)
|
||||||
|
self._falling_shift = number % 32
|
||||||
|
self._when_changed = None
|
||||||
|
self._change_thread = None
|
||||||
|
self._change_event = Event()
|
||||||
|
self.function = 'input'
|
||||||
|
self.pull = 'up' if number in (2, 3) else 'floating'
|
||||||
|
self.bounce = None
|
||||||
|
self.edges = 'both'
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "GPIO%d" % self._number
|
||||||
|
|
||||||
|
@property
|
||||||
|
def number(self):
|
||||||
|
return self._number
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.when_changed = None
|
||||||
|
self.function = 'input'
|
||||||
|
|
||||||
|
def _get_function(self):
|
||||||
|
return self.FUNCTION_NAMES[(self._MEM[self._func_offset] >> self._func_shift) & 7]
|
||||||
|
|
||||||
|
def _set_function(self, value):
|
||||||
|
try:
|
||||||
|
value = self.FUNCTION_VALUES[value]
|
||||||
|
except KeyError:
|
||||||
|
raise PinInvalidFunction('invalid function "%s" for pin %r' % self)
|
||||||
|
self._MEM[self._func_offset] = (
|
||||||
|
self._MEM[self._func_offset]
|
||||||
|
& ~(7 << self._func_shift)
|
||||||
|
| (value << self._func_shift)
|
||||||
|
)
|
||||||
|
|
||||||
|
def _get_state(self):
|
||||||
|
return bool(self._MEM[self._level_offset] & (1 << self._level_shift))
|
||||||
|
|
||||||
|
def _set_state(self, value):
|
||||||
|
if value:
|
||||||
|
self._MEM[self._set_offset] = 1 << self._set_shift
|
||||||
|
else:
|
||||||
|
self._MEM[self._clear_offset] = 1 << self._clear_shift
|
||||||
|
|
||||||
|
def _get_pull(self):
|
||||||
|
return self.PULL_NAMES[self._pull]
|
||||||
|
|
||||||
|
def _set_pull(self, value):
|
||||||
|
if self.function != 'input':
|
||||||
|
raise PinFixedPull('cannot set pull on non-input pin %r' % self)
|
||||||
|
if value != 'up' and self._number in (2, 3):
|
||||||
|
raise PinFixedPull('%r has a physical pull-up resistor' % self)
|
||||||
|
try:
|
||||||
|
value = self.PULL_VALUES[value]
|
||||||
|
except KeyError:
|
||||||
|
raise PinInvalidPull('invalid pull direction "%s" for pin %r' % self)
|
||||||
|
self._MEM[self._MEM.GPPUD_OFFSET] = value
|
||||||
|
sleep(0.000000214)
|
||||||
|
self._MEM[self._pull_offset] = 1 << self._pull_shift
|
||||||
|
sleep(0.000000214)
|
||||||
|
self._MEM[self._MEM.GPPUD_OFFSET] = 0
|
||||||
|
self._MEM[self._pull_offset] = 0
|
||||||
|
self._pull = value
|
||||||
|
|
||||||
|
def _get_edges(self):
|
||||||
|
rising = bool(self._MEM[self._rising_offset] & (1 << self._rising_shift))
|
||||||
|
falling = bool(self._MEM[self._falling_offset] & (1 << self._falling_shift))
|
||||||
|
return self.EDGE_NAMES[(rising, falling)]
|
||||||
|
|
||||||
|
def _set_edges(self, value):
|
||||||
|
try:
|
||||||
|
rising, falling = self.EDGE_VALUES[value]
|
||||||
|
except KeyError:
|
||||||
|
raise PinInvalidEdges('invalid edge specification "%s" for pin %r' % self)
|
||||||
|
f = self.when_changed
|
||||||
|
self.when_changed = None
|
||||||
|
try:
|
||||||
|
self._MEM[self._rising_offset] = (
|
||||||
|
self._MEM[self._rising_offset]
|
||||||
|
& ~(1 << self._rising_shift)
|
||||||
|
| (rising << self._rising_shift)
|
||||||
|
)
|
||||||
|
self._MEM[self._falling_offset] = (
|
||||||
|
self._MEM[self._falling_offset]
|
||||||
|
& ~(1 << self._falling_shift)
|
||||||
|
| (falling << self._falling_shift)
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
self.when_changed = f
|
||||||
|
|
||||||
|
def _get_when_changed(self):
|
||||||
|
return self._when_changed
|
||||||
|
|
||||||
|
def _set_when_changed(self, value):
|
||||||
|
if self._when_changed is None and value is not None:
|
||||||
|
self._when_changed = value
|
||||||
|
self._change_thread = Thread(target=self._change_watch)
|
||||||
|
self._change_thread.daemon = True
|
||||||
|
self._change_event.clear()
|
||||||
|
self._change_thread.start()
|
||||||
|
elif self._when_changed is not None and value is None:
|
||||||
|
self._change_event.set()
|
||||||
|
self._change_thread.join()
|
||||||
|
self._change_thread = None
|
||||||
|
self._when_changed = None
|
||||||
|
else:
|
||||||
|
self._when_changed = value
|
||||||
|
|
||||||
|
def _change_watch(self):
|
||||||
|
offset = self._edge_offset
|
||||||
|
mask = 1 << self._edge_shift
|
||||||
|
self._MEM[offset] = mask # clear any existing detection bit
|
||||||
|
while not self._change_event.wait(0.001):
|
||||||
|
if self._MEM[offset] & mask:
|
||||||
|
self._MEM[offset] = mask
|
||||||
|
self._when_changed()
|
||||||
|
|
||||||
205
gpiozero/pins/rpigpio.py
Normal file
205
gpiozero/pins/rpigpio.py
Normal file
@@ -0,0 +1,205 @@
|
|||||||
|
from __future__ import (
|
||||||
|
unicode_literals,
|
||||||
|
absolute_import,
|
||||||
|
print_function,
|
||||||
|
division,
|
||||||
|
)
|
||||||
|
str = type('')
|
||||||
|
|
||||||
|
from RPi import GPIO
|
||||||
|
|
||||||
|
from . import Pin
|
||||||
|
from .exc import (
|
||||||
|
PinInvalidFunction,
|
||||||
|
PinSetInput,
|
||||||
|
PinFixedPull,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class RPiGPIOPin(Pin):
|
||||||
|
"""
|
||||||
|
Uses the `RPi.GPIO`_ library to interface to the Pi's GPIO pins. This is
|
||||||
|
the default pin implementation if the RPi.GPIO library is installed.
|
||||||
|
Supports all features including PWM (via software).
|
||||||
|
|
||||||
|
.. _RPi.GPIO: https://pypi.python.org/pypi/RPi.GPIO
|
||||||
|
"""
|
||||||
|
|
||||||
|
_PINS = {}
|
||||||
|
|
||||||
|
GPIO_FUNCTIONS = {
|
||||||
|
'input': GPIO.IN,
|
||||||
|
'output': GPIO.OUT,
|
||||||
|
'i2c': GPIO.I2C,
|
||||||
|
'spi': GPIO.SPI,
|
||||||
|
'pwm': GPIO.PWM,
|
||||||
|
'serial': GPIO.SERIAL,
|
||||||
|
'unknown': GPIO.UNKNOWN,
|
||||||
|
}
|
||||||
|
|
||||||
|
GPIO_PULL_UPS = {
|
||||||
|
'up': GPIO.PUD_UP,
|
||||||
|
'down': GPIO.PUD_DOWN,
|
||||||
|
'floating': GPIO.PUD_OFF,
|
||||||
|
}
|
||||||
|
|
||||||
|
GPIO_EDGES = {
|
||||||
|
'both': GPIO.BOTH,
|
||||||
|
'rising': GPIO.RISING,
|
||||||
|
'falling': GPIO.FALLING,
|
||||||
|
}
|
||||||
|
|
||||||
|
GPIO_FUNCTION_NAMES = {v: k for (k, v) in GPIO_FUNCTIONS.items()}
|
||||||
|
GPIO_PULL_UP_NAMES = {v: k for (k, v) in GPIO_PULL_UPS.items()}
|
||||||
|
GPIO_EDGES_NAMES = {v: k for (k, v) in GPIO_EDGES.items()}
|
||||||
|
|
||||||
|
def __new__(cls, number):
|
||||||
|
if not cls._PINS:
|
||||||
|
GPIO.setmode(GPIO.BCM)
|
||||||
|
GPIO.setwarnings(False)
|
||||||
|
try:
|
||||||
|
return cls._PINS[number]
|
||||||
|
except KeyError:
|
||||||
|
self = super(RPiGPIOPin, cls).__new__(cls)
|
||||||
|
cls._PINS[number] = self
|
||||||
|
self._number = number
|
||||||
|
self._pull = 'up' if number in (2, 3) else 'floating'
|
||||||
|
self._pwm = None
|
||||||
|
self._frequency = None
|
||||||
|
self._duty_cycle = None
|
||||||
|
self._bounce = -666
|
||||||
|
self._when_changed = None
|
||||||
|
self._edges = GPIO.BOTH
|
||||||
|
GPIO.setup(self._number, GPIO.IN, self.GPIO_PULL_UPS[self._pull])
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "GPIO%d" % self._number
|
||||||
|
|
||||||
|
@property
|
||||||
|
def number(self):
|
||||||
|
return self._number
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.frequency = None
|
||||||
|
self.when_changed = None
|
||||||
|
GPIO.cleanup(self._number)
|
||||||
|
|
||||||
|
def output_with_state(self, state):
|
||||||
|
self._pull = 'floating'
|
||||||
|
GPIO.setup(self._number, GPIO.OUT, initial=state)
|
||||||
|
|
||||||
|
def input_with_pull(self, pull):
|
||||||
|
if pull != 'up' and self._number in (2, 3):
|
||||||
|
raise PinFixedPull('%r has a physical pull-up resistor' % self)
|
||||||
|
try:
|
||||||
|
GPIO.setup(self._number, GPIO.IN, self.GPIO_PULL_UPS[pull])
|
||||||
|
self._pull = pull
|
||||||
|
except KeyError:
|
||||||
|
raise PinInvalidPull('invalid pull "%s" for pin %r' % (pull, self))
|
||||||
|
|
||||||
|
def _get_function(self):
|
||||||
|
return self.GPIO_FUNCTION_NAMES[GPIO.gpio_function(self._number)]
|
||||||
|
|
||||||
|
def _set_function(self, value):
|
||||||
|
if value != 'input':
|
||||||
|
self._pull = 'floating'
|
||||||
|
try:
|
||||||
|
GPIO.setup(self._number, self.GPIO_FUNCTIONS[value], self.GPIO_PULL_UPS[self._pull])
|
||||||
|
except KeyError:
|
||||||
|
raise PinInvalidFunction('invalid function "%s" for pin %r' % (value, self))
|
||||||
|
|
||||||
|
def _get_state(self):
|
||||||
|
if self._pwm:
|
||||||
|
return self._duty_cycle
|
||||||
|
else:
|
||||||
|
return GPIO.input(self._number)
|
||||||
|
|
||||||
|
def _set_state(self, value):
|
||||||
|
if self._pwm:
|
||||||
|
try:
|
||||||
|
self._pwm.ChangeDutyCycle(value * 100)
|
||||||
|
except ValueError:
|
||||||
|
raise PinInvalidValue('invalid state "%s" for pin %r' % (value, self))
|
||||||
|
self._duty_cycle = value
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
GPIO.output(self._number, value)
|
||||||
|
except ValueError:
|
||||||
|
raise PinInvalidState('invalid state "%s" for pin %r' % (value, self))
|
||||||
|
except RuntimeError:
|
||||||
|
raise PinSetInput('cannot set state of pin %r' % self)
|
||||||
|
|
||||||
|
def _get_pull(self):
|
||||||
|
return self._pull
|
||||||
|
|
||||||
|
def _set_pull(self, value):
|
||||||
|
if self.function != 'input':
|
||||||
|
raise PinFixedPull('cannot set pull on non-input pin %r' % self)
|
||||||
|
if value != 'up' and self._number in (2, 3):
|
||||||
|
raise PinFixedPull('%r has a physical pull-up resistor' % self)
|
||||||
|
try:
|
||||||
|
GPIO.setup(self._number, GPIO.IN, self.GPIO_PULL_UPS[value])
|
||||||
|
self._pull = value
|
||||||
|
except KeyError:
|
||||||
|
raise PinInvalidPull('invalid pull "%s" for pin %r' % (value, self))
|
||||||
|
|
||||||
|
def _get_frequency(self):
|
||||||
|
return self._frequency
|
||||||
|
|
||||||
|
def _set_frequency(self, value):
|
||||||
|
if self._frequency is None and value is not None:
|
||||||
|
try:
|
||||||
|
self._pwm = GPIO.PWM(self._number, value)
|
||||||
|
except RuntimeError:
|
||||||
|
raise PinPWMFixedValue('cannot start PWM on pin %r' % self)
|
||||||
|
self._pwm.start(0)
|
||||||
|
self._duty_cycle = 0
|
||||||
|
self._frequency = value
|
||||||
|
elif self._frequency is not None and value is not None:
|
||||||
|
self._pwm.ChangeFrequency(value)
|
||||||
|
self._frequency = value
|
||||||
|
elif self._frequency is not None and value is None:
|
||||||
|
self._pwm.stop()
|
||||||
|
self._pwm = None
|
||||||
|
self._duty_cycle = None
|
||||||
|
self._frequency = None
|
||||||
|
|
||||||
|
def _get_bounce(self):
|
||||||
|
return None if self._bounce == -666 else (self._bounce / 1000)
|
||||||
|
|
||||||
|
def _set_bounce(self, value):
|
||||||
|
f = self.when_changed
|
||||||
|
self.when_changed = None
|
||||||
|
try:
|
||||||
|
self._bounce = -666 if value is None else (value * 1000)
|
||||||
|
finally:
|
||||||
|
self.when_changed = f
|
||||||
|
|
||||||
|
def _get_edges(self):
|
||||||
|
return self.GPIO_EDGES_NAMES[self._edges]
|
||||||
|
|
||||||
|
def _set_edges(self, value):
|
||||||
|
f = self.when_changed
|
||||||
|
self.when_changed = None
|
||||||
|
try:
|
||||||
|
self._edges = self.GPIO_EDGES[value]
|
||||||
|
finally:
|
||||||
|
self.when_changed = f
|
||||||
|
|
||||||
|
def _get_when_changed(self):
|
||||||
|
return self._when_changed
|
||||||
|
|
||||||
|
def _set_when_changed(self, value):
|
||||||
|
if self._when_changed is None and value is not None:
|
||||||
|
self._when_changed = value
|
||||||
|
GPIO.add_event_detect(
|
||||||
|
self._number, self._edges,
|
||||||
|
callback=lambda channel: self._when_changed(),
|
||||||
|
bouncetime=self._bounce)
|
||||||
|
elif self._when_changed is not None and value is None:
|
||||||
|
GPIO.remove_event_detect(self._number)
|
||||||
|
self._when_changed = None
|
||||||
|
else:
|
||||||
|
self._when_changed = value
|
||||||
|
|
||||||
204
gpiozero/pins/rpio.py
Normal file
204
gpiozero/pins/rpio.py
Normal file
@@ -0,0 +1,204 @@
|
|||||||
|
from __future__ import (
|
||||||
|
unicode_literals,
|
||||||
|
absolute_import,
|
||||||
|
print_function,
|
||||||
|
division,
|
||||||
|
)
|
||||||
|
str = type('')
|
||||||
|
|
||||||
|
|
||||||
|
from threading import Lock
|
||||||
|
|
||||||
|
import RPIO
|
||||||
|
import RPIO.PWM
|
||||||
|
|
||||||
|
from . import Pin, PINS_CLEANUP
|
||||||
|
from .exc import (
|
||||||
|
PinInvalidFunction,
|
||||||
|
PinSetInput,
|
||||||
|
PinFixedPull,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class RPIOPin(Pin):
|
||||||
|
"""
|
||||||
|
Uses the `RPIO`_ library to interface to the Pi's GPIO pins. This is
|
||||||
|
the default pin implementation if the RPi.GPIO library is not installed,
|
||||||
|
but RPIO is. Supports all features including PWM (hardware via DMA).
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
Please note that at the time of writing, RPIO is only compatible with
|
||||||
|
Pi 1's; the Raspberry Pi 2 Model B is *not* supported. Also note that
|
||||||
|
root access is required so scripts must typically be run with ``sudo``.
|
||||||
|
|
||||||
|
.. _RPIO: https://pythonhosted.org/RPIO/
|
||||||
|
"""
|
||||||
|
|
||||||
|
_PINS = {}
|
||||||
|
|
||||||
|
GPIO_FUNCTIONS = {
|
||||||
|
'input': RPIO.IN,
|
||||||
|
'output': RPIO.OUT,
|
||||||
|
'alt0': RPIO.ALT0,
|
||||||
|
}
|
||||||
|
|
||||||
|
GPIO_PULL_UPS = {
|
||||||
|
'up': RPIO.PUD_UP,
|
||||||
|
'down': RPIO.PUD_DOWN,
|
||||||
|
'floating': RPIO.PUD_OFF,
|
||||||
|
}
|
||||||
|
|
||||||
|
GPIO_FUNCTION_NAMES = {v: k for (k, v) in GPIO_FUNCTIONS.items()}
|
||||||
|
GPIO_PULL_UP_NAMES = {v: k for (k, v) in GPIO_PULL_UPS.items()}
|
||||||
|
|
||||||
|
def __new__(cls, number):
|
||||||
|
if not cls._PINS:
|
||||||
|
RPIO.setmode(RPIO.BCM)
|
||||||
|
RPIO.setwarnings(False)
|
||||||
|
RPIO.wait_for_interrupts(threaded=True)
|
||||||
|
RPIO.PWM.setup()
|
||||||
|
RPIO.PWM.init_channel(0, 10000)
|
||||||
|
PINS_CLEANUP.append(RPIO.PWM.cleanup)
|
||||||
|
PINS_CLEANUP.append(RPIO.stop_waiting_for_interrupts)
|
||||||
|
PINS_CLEANUP.append(RPIO.cleanup)
|
||||||
|
try:
|
||||||
|
return cls._PINS[number]
|
||||||
|
except KeyError:
|
||||||
|
self = super(RPIOPin, cls).__new__(cls)
|
||||||
|
cls._PINS[number] = self
|
||||||
|
self._number = number
|
||||||
|
self._pull = 'up' if number in (2, 3) else 'floating'
|
||||||
|
self._pwm = False
|
||||||
|
self._duty_cycle = None
|
||||||
|
self._bounce = None
|
||||||
|
self._when_changed = None
|
||||||
|
self._edges = 'both'
|
||||||
|
RPIO.setup(self._number, RPIO.IN, self.GPIO_PULL_UPS[self._pull])
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "GPIO%d" % self._number
|
||||||
|
|
||||||
|
@property
|
||||||
|
def number(self):
|
||||||
|
return self._number
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.frequency = None
|
||||||
|
self.when_changed = None
|
||||||
|
RPIO.setup(self._number, RPIO.IN, RPIO.PUD_OFF)
|
||||||
|
|
||||||
|
def _get_function(self):
|
||||||
|
return self.GPIO_FUNCTION_NAMES[RPIO.gpio_function(self._number)]
|
||||||
|
|
||||||
|
def _set_function(self, value):
|
||||||
|
if value != 'input':
|
||||||
|
self._pull = 'floating'
|
||||||
|
try:
|
||||||
|
RPIO.setup(self._number, self.GPIO_FUNCTIONS[value], self.GPIO_PULL_UPS[self._pull])
|
||||||
|
except KeyError:
|
||||||
|
raise PinInvalidFunction('invalid function "%s" for pin %r' % (value, self))
|
||||||
|
|
||||||
|
def _get_state(self):
|
||||||
|
if self._pwm:
|
||||||
|
return self._duty_cycle
|
||||||
|
else:
|
||||||
|
return RPIO.input(self._number)
|
||||||
|
|
||||||
|
def _set_state(self, value):
|
||||||
|
if not 0 <= value <= 1:
|
||||||
|
raise PinInvalidValue('invalid state "%s" for pin %r' % (value, self))
|
||||||
|
if self._pwm:
|
||||||
|
RPIO.PWM.clear_channel_gpio(0, self._number)
|
||||||
|
if value == 0:
|
||||||
|
RPIO.output(self._number, False)
|
||||||
|
elif value == 1:
|
||||||
|
RPIO.output(self._number, True)
|
||||||
|
else:
|
||||||
|
RPIO.PWM.add_channel_pulse(0, self._number, start=0, width=int(1000 * value))
|
||||||
|
self._duty_cycle = value
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
RPIO.output(self._number, value)
|
||||||
|
except ValueError:
|
||||||
|
raise PinInvalidState('invalid state "%s" for pin %r' % (value, self))
|
||||||
|
except RuntimeError:
|
||||||
|
raise PinSetInput('cannot set state of pin %r' % self)
|
||||||
|
|
||||||
|
def _get_pull(self):
|
||||||
|
return self._pull
|
||||||
|
|
||||||
|
def _set_pull(self, value):
|
||||||
|
if self.function != 'input':
|
||||||
|
raise PinFixedPull('cannot set pull on non-input pin %r' % self)
|
||||||
|
if value != 'up' and self._number in (2, 3):
|
||||||
|
raise PinFixedPull('%r has a physical pull-up resistor' % self)
|
||||||
|
try:
|
||||||
|
RPIO.setup(self._number, RPIO.IN, self.GPIO_PULL_UPS[value])
|
||||||
|
self._pull = value
|
||||||
|
except KeyError:
|
||||||
|
raise PinInvalidPull('invalid pull "%s" for pin %r' % (value, self))
|
||||||
|
|
||||||
|
def _get_frequency(self):
|
||||||
|
return 100
|
||||||
|
|
||||||
|
def _set_frequency(self, value):
|
||||||
|
if value is not None and value != 100:
|
||||||
|
raise PinPWMError(
|
||||||
|
'RPIOPin implementation is currently limited to '
|
||||||
|
'100Hz sub-cycles')
|
||||||
|
if not self._pwm and value is not None:
|
||||||
|
self._pwm = True
|
||||||
|
# Dirty hack to get RPIO's PWM support to setup, but do nothing,
|
||||||
|
# for a given GPIO pin
|
||||||
|
RPIO.PWM.add_channel_pulse(0, self._number, start=0, width=0)
|
||||||
|
RPIO.PWM.clear_channel_gpio(0, self._number)
|
||||||
|
elif self._pwm and value is None:
|
||||||
|
RPIO.PWM.clear_channel_gpio(0, self._number)
|
||||||
|
self._pwm = False
|
||||||
|
|
||||||
|
def _get_bounce(self):
|
||||||
|
return None if self._bounce is None else (self._bounce / 1000)
|
||||||
|
|
||||||
|
def _set_bounce(self, value):
|
||||||
|
f = self.when_changed
|
||||||
|
self.when_changed = None
|
||||||
|
try:
|
||||||
|
self._bounce = None if value is None else (value * 1000)
|
||||||
|
finally:
|
||||||
|
self.when_changed = f
|
||||||
|
|
||||||
|
def _get_edges(self):
|
||||||
|
return self._edges
|
||||||
|
|
||||||
|
def _set_edges(self, value):
|
||||||
|
f = self.when_changed
|
||||||
|
self.when_changed = None
|
||||||
|
try:
|
||||||
|
self._edges = value
|
||||||
|
finally:
|
||||||
|
self.when_changed = f
|
||||||
|
|
||||||
|
def _get_when_changed(self):
|
||||||
|
return self._when_changed
|
||||||
|
|
||||||
|
def _set_when_changed(self, value):
|
||||||
|
if self._when_changed is None and value is not None:
|
||||||
|
self._when_changed = value
|
||||||
|
RPIO.add_interrupt_callback(
|
||||||
|
self._number,
|
||||||
|
lambda channel, value: self._when_changed(),
|
||||||
|
self._edges, self.GPIO_PULL_UPS[self._pull], self._bounce)
|
||||||
|
elif self._when_changed is not None and value is None:
|
||||||
|
try:
|
||||||
|
RPIO.del_interrupt_callback(self._number)
|
||||||
|
except KeyError:
|
||||||
|
# Ignore this exception which occurs during shutdown; this
|
||||||
|
# simply means RPIO's built-in cleanup has already run and
|
||||||
|
# removed the handler
|
||||||
|
pass
|
||||||
|
self._when_changed = None
|
||||||
|
else:
|
||||||
|
self._when_changed = value
|
||||||
|
|
||||||
Reference in New Issue
Block a user