Merge branch 'master' into docs-updates

This commit is contained in:
Ben Nuttall
2017-06-22 22:55:17 +01:00
committed by GitHub
80 changed files with 4434 additions and 2564 deletions

View File

@@ -6,8 +6,9 @@ from __future__ import (
)
from .pins import (
Factory,
Pin,
LocalPin,
SPI,
)
from .pins.data import (
PiBoardInfo,
@@ -15,47 +16,9 @@ from .pins.data import (
PinInfo,
pi_info,
)
from .exc import (
GPIOZeroError,
DeviceClosed,
BadEventHandler,
BadWaitTime,
BadQueueLen,
CompositeDeviceError,
CompositeDeviceBadName,
CompositeDeviceBadOrder,
CompositeDeviceBadDevice,
SPIError,
SPIBadArgs,
EnergenieSocketMissing,
EnergenieBadSocket,
GPIODeviceError,
GPIODeviceClosed,
GPIOPinInUse,
GPIOPinMissing,
InputDeviceError,
OutputDeviceError,
OutputDeviceBadValue,
PinError,
PinInvalidFunction,
PinInvalidState,
PinInvalidPull,
PinInvalidEdges,
PinSetInput,
PinFixedPull,
PinEdgeDetectUnsupported,
PinPWMError,
PinPWMUnsupported,
PinPWMFixedValue,
PinUnknownPi,
PinMultiplePins,
PinNoPins,
GPIOZeroWarning,
SPIWarning,
SPISoftwareFallback,
PinWarning,
PinNonPhysical,
)
# Yes, import * is naughty, but exc imports nothing else so there's no cross
# contamination here ... and besides, have you *seen* the list lately?!
from .exc import *
from .devices import (
Device,
GPIODevice,

0
gpiozero/cli/__init__.py Normal file
View File

67
gpiozero/cli/pinout.py Executable file
View File

@@ -0,0 +1,67 @@
#!/usr/bin/env python
"""
pinout - gpiozero command-line pinout tool.
Output Raspberry Pi GPIO pinout information.
"""
from __future__ import unicode_literals, absolute_import, print_function, division
import argparse
import sys
from gpiozero import pi_info
class PinoutTool(object):
def __init__(self):
self.parser = argparse.ArgumentParser(
description=__doc__
)
self.parser.add_argument(
'-r', '--revision',
dest='revision',
default='',
help='RPi revision. Default is to autodetect revision of current device'
)
self.parser.add_argument(
'-c', '--color',
action="store_true",
default=None,
help='Force colored output (by default, the output will include ANSI'
'color codes if run in a color-capable terminal). See also --monochrome'
)
self.parser.add_argument(
'-m', '--monochrome',
dest='color',
action='store_false',
help='Force monochrome output. See also --color'
)
def __call__(self, args=None):
if args is None:
args = sys.argv[1:]
try:
return self.main(self.parser.parse_args(args)) or 0
except argparse.ArgumentError as e:
# argparse errors are already nicely formatted, print to stderr and
# exit with code 2
raise e
except Exception as e:
# Output anything else nicely formatted on stderr and exit code 1
self.parser.exit(1, '{prog}: error: {message}\n'.format(
prog=self.parser.prog, message=e))
def main(self, args):
if args.revision == '':
try:
pi_info().pprint(color=args.color)
except IOError:
raise IOError('This device is not a Raspberry Pi')
else:
pi_info(args.revision).pprint(color=args.color)
print('')
print('For further information, please refer to https://pinout.xyz/')
main = PinoutTool()

View File

@@ -9,6 +9,7 @@ from __future__ import (
str = type('')
import cmath
import weakref
import collections
import operator
import functools
@@ -81,3 +82,59 @@ class frozendict(collections.Mapping):
hashes = map(hash, self.items())
self.__hash = functools.reduce(operator.xor, hashes, 0)
return self.__hash
# Backported from py3.4
class WeakMethod(weakref.ref):
"""
A custom `weakref.ref` subclass which simulates a weak reference to
a bound method, working around the lifetime problem of bound methods.
"""
__slots__ = "_func_ref", "_meth_type", "_alive", "__weakref__"
def __new__(cls, meth, callback=None):
try:
obj = meth.__self__
func = meth.__func__
except AttributeError:
raise TypeError("argument should be a bound method, not {0}"
.format(type(meth)))
def _cb(arg):
# The self-weakref trick is needed to avoid creating a reference
# cycle.
self = self_wr()
if self._alive:
self._alive = False
if callback is not None:
callback(self)
self = weakref.ref.__new__(cls, obj, _cb)
self._func_ref = weakref.ref(func, _cb)
self._meth_type = type(meth)
self._alive = True
self_wr = weakref.ref(self)
return self
def __call__(self):
obj = super(WeakMethod, self).__call__()
func = self._func_ref()
if obj is None or func is None:
return None
return self._meth_type(func, obj)
def __eq__(self, other):
if isinstance(other, WeakMethod):
if not self._alive or not other._alive:
return self is other
return weakref.ref.__eq__(self, other) and self._func_ref == other._func_ref
return False
def __ne__(self, other):
if isinstance(other, WeakMethod):
if not self._alive or not other._alive:
return self is not other
return weakref.ref.__ne__(self, other) or self._func_ref != other._func_ref
return True
__hash__ = weakref.ref.__hash__

View File

@@ -10,15 +10,16 @@ str = type('')
import os
import atexit
import weakref
from collections import namedtuple
import warnings
from collections import namedtuple, defaultdict
from itertools import chain
from types import FunctionType
from threading import RLock
from threading import Lock
import pkg_resources
from .pins import Pin
from .threads import _threads_shutdown
from .pins import _pins_shutdown
from .mixins import (
ValuesMixin,
SharedMixin,
@@ -32,52 +33,11 @@ from .exc import (
GPIOPinMissing,
GPIOPinInUse,
GPIODeviceClosed,
PinFactoryFallback,
)
from .compat import frozendict
def _default_pin_factory(name=os.getenv('GPIOZERO_PIN_FACTORY', None)):
group = 'gpiozero_pin_factories'
if name is None:
# If no factory is explicitly specified, try various names in
# "preferred" order. Note that in this case we only select from
# gpiozero distribution so without explicitly specifying a name (via
# the environment) it's impossible to auto-select a factory from
# outside the base distribution
#
# We prefer RPi.GPIO here as it supports PWM, and all Pi revisions. If
# no third-party libraries are available, however, we fall back to a
# pure Python implementation which supports platforms like PyPy
dist = pkg_resources.get_distribution('gpiozero')
for name in ('RPiGPIOPin', 'RPIOPin', 'PiGPIOPin', 'NativePin'):
try:
return pkg_resources.load_entry_point(dist, group, name)
except ImportError:
pass
raise BadPinFactory('Unable to locate any default pin factory!')
else:
for factory in pkg_resources.iter_entry_points(group, name):
return factory.load()
raise BadPinFactory('Unable to locate pin factory "%s"' % name)
pin_factory = _default_pin_factory()
_PINS = set()
_PINS_LOCK = RLock() # Yes, this needs to be re-entrant
def _shutdown():
_threads_shutdown()
with _PINS_LOCK:
while _PINS:
_PINS.pop().close()
# Any cleanup routines registered by pins libraries must be called *after*
# cleanup of pin objects used by devices
_pins_shutdown()
atexit.register(_shutdown)
class GPIOMeta(type):
# NOTE Yes, this is a metaclass. Don't be scared - it's a simple one.
@@ -106,7 +66,7 @@ class GPIOMeta(type):
# already exists. Only construct the instance if the key's new.
key = cls._shared_key(*args, **kwargs)
try:
self = cls._INSTANCES[key]
self = cls._instances[key]
self._refs += 1
except (KeyError, ReferenceError) as e:
self = super(GPIOMeta, cls).__call__(*args, **kwargs)
@@ -122,14 +82,14 @@ class GPIOMeta(type):
old_close()
finally:
try:
del cls._INSTANCES[key]
del cls._instances[key]
except KeyError:
# If the _refs go negative (too many closes)
# just ignore the resulting KeyError here -
# it's already gone
pass
self.close = close
cls._INSTANCES[key] = weakref.proxy(self)
cls._instances[key] = weakref.proxy(self)
else:
# Construct the instance as normal
self = super(GPIOMeta, cls).__call__(*args, **kwargs)
@@ -229,13 +189,89 @@ class GPIOBase(GPIOMeta(nstr('GPIOBase'), (), {})):
class Device(ValuesMixin, GPIOBase):
"""
Represents a single device of any type; GPIO-based, SPI-based, I2C-based,
etc. This is the base class of the device hierarchy. It defines the
basic services applicable to all devices (specifically the :attr:`is_active`
etc. This is the base class of the device hierarchy. It defines the basic
services applicable to all devices (specifically the :attr:`is_active`
property, the :attr:`value` property, and the :meth:`close` method).
"""
pin_factory = None # instance of a Factory sub-class
_reservations = defaultdict(list) # maps pin addresses to lists of devices
_res_lock = Lock()
def __repr__(self):
return "<gpiozero.%s object>" % (self.__class__.__name__)
def _reserve_pins(self, *pins_or_addresses):
"""
Called to indicate that the device reserves the right to use the
specified *pins_or_addresses*. This should be done during device
construction. If pins are reserved, you must ensure that the
reservation is released by eventually called :meth:`_release_pins`.
The *pins_or_addresses* can be actual :class:`Pin` instances or the
addresses of pin instances (each address is a tuple of strings). The
latter form is permitted to ensure that devices do not have to
construct :class:`Pin` objects to reserve pins. This is important as
constructing a pin often configures it (e.g. as an input) which
conflicts with alternate pin functions like SPI.
"""
addresses = (
p.address if isinstance(p, Pin) else p
for p in pins_or_addresses
)
with Device._res_lock:
for address in addresses:
for device_ref in Device._reservations[address]:
device = device_ref()
if device is not None and self._conflicts_with(device):
raise GPIOPinInUse(
'pin %s is already in use by %r' % (
'/'.join(address), device)
)
Device._reservations[address].append(weakref.ref(self))
def _release_pins(self, *pins_or_addresses):
"""
Releases the reservation of this device against *pins_or_addresses*.
This is typically called during :meth:`close` to clean up reservations
taken during construction. Releasing a reservation that is not
currently held will be silently ignored (to permit clean-up after
failed / partial construction).
"""
addresses = (
p.address if isinstance(p, Pin) else p
for p in pins_or_addresses
)
with Device._res_lock:
for address in addresses:
Device._reservations[address] = [
ref for ref in Device._reservations[address]
if ref() not in (self, None) # may as well clean up dead refs
]
def _release_all(self):
"""
Releases all pin reservations taken out by this device. See
:meth:`_release_pins` for further information).
"""
with Device._res_lock:
Device._reservations = defaultdict(list, {
address: [
ref for ref in conflictors
if ref() not in (self, None)
]
for address, conflictors in Device._reservations.items()
})
def _conflicts_with(self, other):
"""
Called by :meth:`_reserve_pin` to test whether the *other*
:class:`Device` using a common pin conflicts with this device's intent
to use it. The default is ``True`` indicating that all devices conflict
with common pins. Sub-classes may override this to permit more nuanced
replies.
"""
return True
@property
def value(self):
"""
@@ -378,14 +414,12 @@ class GPIODevice(Device):
self._pin = None
if pin is None:
raise GPIOPinMissing('No pin given')
if isinstance(pin, int):
pin = pin_factory(pin)
with _PINS_LOCK:
if pin in _PINS:
raise GPIOPinInUse(
'pin %r is already in use by another gpiozero object' % pin
)
_PINS.add(pin)
if isinstance(pin, Pin):
self._reserve_pins(pin)
else:
# Check you can reserve *before* constructing the pin
self._reserve_pins(Device.pin_factory.pin_address(pin))
pin = Device.pin_factory.pin(pin)
self._pin = pin
self._active_state = True
self._inactive_state = False
@@ -402,12 +436,10 @@ class GPIODevice(Device):
def close(self):
super(GPIODevice, self).close()
with _PINS_LOCK:
pin = self._pin
if self._pin is not None:
self._release_pins(self._pin)
self._pin.close()
self._pin = None
if pin in _PINS:
_PINS.remove(pin)
pin.close()
@property
def closed(self):
@@ -441,3 +473,57 @@ class GPIODevice(Device):
except DeviceClosed:
return "<gpiozero.%s object closed>" % self.__class__.__name__
# Defined last to ensure Device is defined before attempting to load any pin
# factory; pin factories want to load spi which in turn relies on devices (for
# the soft-SPI implementation)
def _default_pin_factory(name=os.getenv('GPIOZERO_PIN_FACTORY', None)):
group = 'gpiozero_pin_factories'
if name is None:
# If no factory is explicitly specified, try various names in
# "preferred" order. Note that in this case we only select from
# gpiozero distribution so without explicitly specifying a name (via
# the environment) it's impossible to auto-select a factory from
# outside the base distribution
#
# We prefer RPi.GPIO here as it supports PWM, and all Pi revisions. If
# no third-party libraries are available, however, we fall back to a
# pure Python implementation which supports platforms like PyPy
dist = pkg_resources.get_distribution('gpiozero')
for name in ('rpigpio', 'rpio', 'pigpio', 'native'):
try:
return pkg_resources.load_entry_point(dist, group, name)()
except Exception as e:
warnings.warn(
PinFactoryFallback(
'Failed to load factory %s: %s' % (name, str(e))))
raise BadPinFactory('Unable to load any default pin factory!')
else:
for factory in pkg_resources.iter_entry_points(group, name.lower()):
return factory.load()()
raise BadPinFactory('Unable to find pin factory "%s"' % name)
def _devices_shutdown():
if Device.pin_factory:
with Device._res_lock:
reserved_devices = {
dev
for ref_list in Device._reservations.values()
for ref in ref_list
for dev in (ref(),)
if dev is not None
}
for dev in reserved_devices:
dev.close()
Device.pin_factory.close()
Device.pin_factory = None
def _shutdown():
_threads_shutdown()
_devices_shutdown()
Device.pin_factory = _default_pin_factory()
atexit.register(_shutdown)

View File

@@ -52,6 +52,24 @@ class SPIBadArgs(SPIError, ValueError):
class SPIBadChannel(SPIError, ValueError):
"Error raised when an invalid channel is given to an :class:`AnalogInputDevice`"
class SPIFixedClockMode(SPIError, AttributeError):
"Error raised when the SPI clock mode cannot be changed"
class SPIInvalidClockMode(SPIError, ValueError):
"Error raised when an invalid clock mode is given to an SPI implementation"
class SPIFixedBitOrder(SPIError, AttributeError):
"Error raised when the SPI bit-endianness cannot be changed"
class SPIFixedSelect(SPIError, AttributeError):
"Error raised when the SPI select polarity cannot be changed"
class SPIFixedWordSize(SPIError, AttributeError):
"Error raised when the number of bits per word cannot be changed"
class SPIInvalidWordSize(SPIError, ValueError):
"Error raised when an invalid (out of range) number of bits per word is specified"
class GPIODeviceError(GPIOZeroError):
"Base class for errors specific to the GPIODevice hierarchy"
@@ -62,7 +80,7 @@ class GPIOPinInUse(GPIODeviceError):
"Error raised when attempting to use a pin already in use by another device"
class GPIOPinMissing(GPIODeviceError, ValueError):
"Error raised when a pin number is not specified"
"Error raised when a pin specification is not given"
class InputDeviceError(GPIODeviceError):
"Base class for errors specific to the InputDevice hierarchy"
@@ -100,6 +118,12 @@ class PinFixedPull(PinError, AttributeError):
class PinEdgeDetectUnsupported(PinError, AttributeError):
"Error raised when attempting to use edge detection on unsupported pins"
class PinUnsupported(PinError, NotImplementedError):
"Error raised when attempting to obtain a pin interface on unsupported pins"
class PinSPIUnsupported(PinError, NotImplementedError):
"Error raised when attempting to obtain an SPI interface on unsupported pins"
class PinPWMError(PinError):
"Base class for errors related to PWM implementations"
@@ -118,6 +142,9 @@ class PinMultiplePins(PinError, RuntimeError):
class PinNoPins(PinError, RuntimeError):
"Error raised when no pins support the requested function"
class PinInvalidPin(PinError, ValueError):
"Error raised when an invalid pin specification is provided"
class GPIOZeroWarning(Warning):
"Base class for all warnings in GPIO Zero"
@@ -130,6 +157,9 @@ class SPISoftwareFallback(SPIWarning):
class PinWarning(GPIOZeroWarning):
"Base class for warnings related to pin implementations"
class PinFactoryFallback(PinWarning):
"Warning raised when a default pin factory fails to load and a fallback is tried"
class PinNonPhysical(PinWarning):
"Warning raised when a non-physical pin is specified in a constructor"

View File

@@ -165,7 +165,7 @@ class SmoothedInputDevice(EventsMixin, InputDevice):
if self.partial or self._queue.full.is_set():
return super(SmoothedInputDevice, self).__repr__()
else:
return "<gpiozero.%s object on pin=%r, pull_up=%s>" % (
return "<gpiozero.%s object on pin %r, pull_up=%s>" % (
self.__class__.__name__, self.pin, self.pull_up)
@property
@@ -240,7 +240,7 @@ class Button(HoldMixin, DigitalInputDevice):
print("The button was pressed!")
:param int pin:
The GPIO pin which the button is attached to. See :ref:`pin_numbering`
The GPIO pin which the button is attached to. See :ref:`pin-numbering`
for valid pin numbers.
:param bool pull_up:
@@ -302,7 +302,7 @@ class LineSensor(SmoothedInputDevice):
pause()
:param int pin:
The GPIO pin which the sensor is attached to. See :ref:`pin_numbering`
The GPIO pin which the sensor is attached to. See :ref:`pin-numbering`
for valid pin numbers.
:param int queue_len:
@@ -371,7 +371,7 @@ class MotionSensor(SmoothedInputDevice):
print("Motion detected!")
:param int pin:
The GPIO pin which the sensor is attached to. See :ref:`pin_numbering`
The GPIO pin which the sensor is attached to. See :ref:`pin-numbering`
for valid pin numbers.
:param int queue_len:
@@ -435,7 +435,7 @@ class LightSensor(SmoothedInputDevice):
print("Light detected!")
:param int pin:
The GPIO pin which the sensor is attached to. See :ref:`pin_numbering`
The GPIO pin which the sensor is attached to. See :ref:`pin-numbering`
for valid pin numbers.
:param int queue_len:
@@ -543,11 +543,11 @@ class DistanceSensor(SmoothedInputDevice):
:param int echo:
The GPIO pin which the ECHO pin is attached to. See
:ref:`pin_numbering` for valid pin numbers.
:ref:`pin-numbering` for valid pin numbers.
:param int trigger:
The GPIO pin which the TRIG pin is attached to. See
:ref:`pin_numbering` for valid pin numbers.
:ref:`pin-numbering` for valid pin numbers.
:param int queue_len:
The length of the queue used to store values read from the sensor.

View File

@@ -69,11 +69,14 @@ class SourceMixin(object):
super(SourceMixin, self).__init__(*args, **kwargs)
def close(self):
try:
self.source = None
except AttributeError:
pass
try:
super(SourceMixin, self).close()
except AttributeError:
pass
self.source = None
def _copy_values(self, source):
for v in source:
@@ -127,7 +130,7 @@ class SharedMixin(object):
When :meth:`close` is called, an internal reference counter will be
decremented and the instance will only close when it reaches zero.
"""
_INSTANCES = {}
_instances = {}
def __del__(self):
self._refs = 0
@@ -438,23 +441,28 @@ class HoldThread(GPIOThread):
device is active.
"""
def __init__(self, parent):
super(HoldThread, self).__init__(target=self.held, args=(parent,))
super(HoldThread, self).__init__(
target=self.held, args=(weakref.proxy(parent),))
self.holding = Event()
self.start()
def held(self, parent):
while not self.stopping.is_set():
if self.holding.wait(0.1):
self.holding.clear()
while not (
self.stopping.is_set() or
parent._inactive_event.wait(parent.hold_time)
):
if parent._held_from is None:
parent._held_from = time()
parent._fire_held()
if not parent.hold_repeat:
break
try:
while not self.stopping.is_set():
if self.holding.wait(0.1):
self.holding.clear()
while not (
self.stopping.is_set() or
parent._inactive_event.wait(parent.hold_time)
):
if parent._held_from is None:
parent._held_from = time()
parent._fire_held()
if not parent.hold_repeat:
break
except ReferenceError:
# Parent is dead; time to die!
pass
class GPIOQueue(GPIOThread):

View File

@@ -56,7 +56,7 @@ class PingServer(InternalDevice):
self._fire_events()
def __repr__(self):
return '<gpiozero.PingDevice host="%s">' % self.host
return '<gpiozero.PingServer host="%s">' % self.host
@property
def value(self):

View File

@@ -128,8 +128,8 @@ class DigitalOutputDevice(OutputDevice):
"""
def __init__(self, pin=None, active_high=True, initial_value=False):
self._blink_thread = None
super(DigitalOutputDevice, self).__init__(pin, active_high, initial_value)
self._controller = None
super(DigitalOutputDevice, self).__init__(pin, active_high, initial_value)
@property
def value(self):
@@ -217,7 +217,7 @@ class LED(DigitalOutputDevice):
led.on()
:param int pin:
The GPIO pin which the LED is attached to. See :ref:`pin_numbering` for
The GPIO pin which the LED is attached to. See :ref:`pin-numbering` for
valid pin numbers.
:param bool active_high:
@@ -252,7 +252,7 @@ class Buzzer(DigitalOutputDevice):
bz.on()
:param int pin:
The GPIO pin which the buzzer is attached to. See :ref:`pin_numbering`
The GPIO pin which the buzzer is attached to. See :ref:`pin-numbering`
for valid pin numbers.
:param bool active_high:
@@ -276,7 +276,7 @@ class PWMOutputDevice(OutputDevice):
Generic output device configured for pulse-width modulation (PWM).
:param int pin:
The GPIO pin which the device is attached to. See :ref:`pin_numbering`
The GPIO pin which the device is attached to. See :ref:`pin-numbering`
for valid pin numbers.
:param bool active_high:
@@ -483,7 +483,7 @@ class PWMLED(PWMOutputDevice):
an optional resistor to prevent the LED from burning out.
:param int pin:
The GPIO pin which the LED is attached to. See :ref:`pin_numbering` for
The GPIO pin which the LED is attached to. See :ref:`pin-numbering` for
valid pin numbers.
:param bool active_high:
@@ -926,7 +926,7 @@ class Servo(SourceMixin, CompositeDevice):
sleep(1)
:param int pin:
The GPIO pin which the device is attached to. See :ref:`pin_numbering`
The GPIO pin which the device is attached to. See :ref:`pin-numbering`
for valid pin numbers.
:param float initial_value:
@@ -1116,7 +1116,7 @@ class AngularServo(Servo):
expectations of minimum and maximum.
:param int pin:
The GPIO pin which the device is attached to. See :ref:`pin_numbering`
The GPIO pin which the device is attached to. See :ref:`pin-numbering`
for valid pin numbers.
:param float initial_angle:

View File

@@ -1,3 +1,5 @@
# vim: set fileencoding=utf-8:
from __future__ import (
unicode_literals,
absolute_import,
@@ -6,39 +8,137 @@ from __future__ import (
)
str = type('')
import io
from .data import pi_info
from ..exc import (
PinInvalidFunction,
PinSetInput,
PinFixedPull,
PinUnsupported,
PinSPIUnsupported,
PinPWMUnsupported,
PinEdgeDetectUnsupported,
SPIFixedClockMode,
SPIFixedBitOrder,
SPIFixedSelect,
SPIFixedWordSize,
)
PINS_CLEANUP = []
def _pins_shutdown():
for routine in PINS_CLEANUP:
routine()
class Factory(object):
"""
Generates pins, SPI, and I2C interfaces for devices. This is an abstract
base class for pin factories. Descendents *must* override the following
methods:
* :meth:`_get_address`
* :meth:`pin_address`
Descendents *may* additionally override the following methods, if
applicable:
* :meth:`close`
* :meth:`pin`
* :meth:`spi`
* :meth:`_get_pi_info`
"""
def close(self):
"""
Closes the pin factory. This is expected to clean up all resources
manipulated by the factory. It it typically called at script
termination.
"""
pass
def pin(self, spec):
"""
Creates an instance of a :class:`Pin` descendent representing the
specified pin.
.. warning::
Descendents must ensure that pin instances representing the same
hardware are identical; i.e. two separate invocations of
:meth:`pin` for the same pin specification must return the same
object.
"""
raise PinUnsupported("Individual pins are not supported by this pin factory")
def pin_address(self, spec):
"""
Returns the address that a pin *would* have if constructed from the
given *spec*.
This unusual method is used by the pin reservation system to check
for conflicts *prior* to pin construction; with most implementations,
pin construction implicitly alters the state of the pin (e.g. setting
it to an input). This allows pin reservation to take place without
affecting the state of other components.
"""
raise NotImplementedError
def spi(self, **spi_args):
"""
Returns an instance of an :class:`SPI` interface, for the specified SPI
*port* and *device*, or for the specified pins (*clock_pin*,
*mosi_pin*, *miso_pin*, and *select_pin*). Only one of the schemes can
be used; attempting to mix *port* and *device* with pin numbers will
raise :exc:`SPIBadArgs`.
"""
raise PinSPIUnsupported('SPI not supported by this pin factory')
def _get_address(self):
raise NotImplementedError
address = property(
lambda self: self._get_address(),
doc="""\
Returns a tuple of strings representing the address of the factory.
For the Pi itself this is a tuple of one string representing the Pi's
address (e.g. "localhost"). Expander chips can return a tuple appending
whatever string they require to uniquely identify the expander chip
amongst all factories in the system.
.. note::
This property *must* return an immutable object capable of being
used as a dictionary key.
""")
def _get_pi_info(self):
return None
pi_info = property(
lambda self: self._get_pi_info(),
doc="""\
Returns a :class:`PiBoardInfo` instance representing the Pi that
instances generated by this factory will be attached to.
If the pins represented by this class are not *directly* attached to a
Pi (e.g. the pin is attached to a board attached to the Pi, or the pins
are not on a Pi at all), this may return ``None``.
""")
class Pin(object):
"""
Abstract base class representing a GPIO pin or a pin from an IO extender.
Abstract base class representing a pin attached to some form of controller,
be it GPIO, SPI, ADC, etc.
Descendents should override property getters and setters to accurately
represent the capabilities of pins. The following functions *must* be
overridden:
represent the capabilities of pins. Descendents *must* override the
following methods:
* :meth:`_get_address`
* :meth:`_get_function`
* :meth:`_set_function`
* :meth:`_get_state`
The following functions *may* be overridden if applicable:
Descendents *may* additionally override the following methods, if
applicable:
* :meth:`close`
* :meth:`output_with_state`
* :meth:`input_with_pull`
* :meth:`_set_state`
* :meth:`_get_frequency`
* :meth:`_set_frequency`
@@ -50,20 +150,10 @@ class Pin(object):
* :meth:`_set_edges`
* :meth:`_get_when_changed`
* :meth:`_set_when_changed`
* :meth:`pi_info`
* :meth:`output_with_state`
* :meth:`input_with_pull`
.. warning::
Descendents must ensure that pin instances representing the same
physical hardware are identical, right down to object identity. The
framework relies on this to correctly clean up resources at interpreter
shutdown.
"""
def __repr__(self):
return "Abstract pin"
return self.address[-1]
def close(self):
"""
@@ -105,6 +195,18 @@ class Pin(object):
self.function = 'input'
self.pull = pull
def _get_address(self):
raise NotImplementedError
address = property(
lambda self: self._get_address(),
doc="""\
The address of the pin. This property is a tuple of strings constructed
from the owning factory's address with the unique address of the pin
appended to it. The tuple as a whole uniquely identifies the pin
amongst all pins attached to the system.
""")
def _get_function(self):
return "input"
@@ -140,10 +242,19 @@ class Pin(object):
doc="""\
The state of the pin. This is 0 for low, and 1 for high. As a low level
view of the pin, no swapping is performed in the case of pull ups (see
:attr:`pull` for more information).
:attr:`pull` for more information):
If PWM is currently active (when :attr:`frequency` is not ``None``),
this represents the PWM duty cycle as a value between 0.0 and 1.0.
.. code-block:: text
HIGH - - - - > ,----------------------
|
|
LOW ----------------'
Descendents which implement analog, or analog-like capabilities can
return values between 0 and 1. For example, pins implementing PWM
(where :attr:`frequency` is not ``None``) return a value between 0.0
and 1.0 representing the current PWM duty cycle.
If a pin is currently configured for input, and an attempt is made to
set this attribute, :exc:`PinSetInput` will be raised. If an invalid
@@ -205,6 +316,26 @@ class Pin(object):
detection, measured in seconds. If bounce detection is not currently in
use, this is ``None``.
For example, if :attr:`edge` is currently "rising", :attr:`bounce` is
currently 5/1000 (5ms), then the waveform below will only fire
:attr:`when_changed` on two occasions despite there being three rising
edges:
.. code-block:: text
TIME 0...1...2...3...4...5...6...7...8...9...10..11..12 ms
bounce elimination |===================| |==============
HIGH - - - - > ,--. ,--------------. ,--.
| | | | | |
| | | | | |
LOW ----------------' `-' `-' `-----------
: :
: :
when_changed when_changed
fires fires
If the pin does not support edge detection, attempts to set this
property will raise :exc:`PinEdgeDetectUnsupported`. If the pin
supports edge detection, the class must implement bounce detection,
@@ -223,7 +354,18 @@ class Pin(object):
doc="""\
The edge that will trigger execution of the function or bound method
assigned to :attr:`when_changed`. This can be one of the strings
"both" (the default), "rising", "falling", or "none".
"both" (the default), "rising", "falling", or "none":
.. code-block:: text
HIGH - - - - > ,--------------.
| |
| |
LOW --------------------' `--------------
: :
: :
Fires when_changed "both" "both"
when edges is ... "rising" "falling"
If the pin does not support edge detection, attempts to set this
property will raise :exc:`PinEdgeDetectUnsupported`.
@@ -247,48 +389,303 @@ class Pin(object):
property will raise :exc:`PinEdgeDetectUnsupported`.
""")
@classmethod
def pi_info(cls):
"""
Returns a :class:`PiBoardInfo` instance representing the Pi that
instances of this pin class will be attached to.
If the pins represented by this class are not *directly* attached to a
Pi (e.g. the pin is attached to a board attached to the Pi, or the pins
are not on a Pi at all), this may return ``None``.
"""
return None
class LocalPin(Pin):
class SPI(object):
"""
Abstract base class representing pins attached locally to a Pi. This forms
the base class for local-only pin interfaces (:class:`RPiGPIOPin`,
:class:`RPIOPin`, and :class:`NativePin`).
Abstract interface for `Serial Peripheral Interface`_ (SPI)
implementations. Descendents *must* override the following methods:
* :meth:`transfer`
* :meth:`_get_clock_mode`
Descendents *may* override the following methods, if applicable:
* :meth:`read`
* :meth:`write`
* :meth:`_set_clock_mode`
* :meth:`_get_lsb_first`
* :meth:`_set_lsb_first`
* :meth:`_get_select_high`
* :meth:`_set_select_high`
* :meth:`_get_bits_per_word`
* :meth:`_set_bits_per_word`
.. _Serial Peripheral Interface: https://en.wikipedia.org/wiki/Serial_Peripheral_Interface_Bus
"""
_PI_REVISION = None
@classmethod
def pi_info(cls):
def read(self, n):
"""
Returns a :class:`PiBoardInfo` instance representing the local Pi.
The Pi's revision is determined by reading :file:`/proc/cpuinfo`. If
no valid revision is found, returns ``None``.
"""
# Cache the result as we can reasonably assume it won't change during
# runtime (this is LocalPin after all; descendents that deal with
# remote Pis should inherit from Pin instead)
if cls._PI_REVISION is None:
with io.open('/proc/cpuinfo', 'r') as f:
for line in f:
if line.startswith('Revision'):
revision = line.split(':')[1].strip().lower()
overvolted = revision.startswith('100')
if overvolted:
revision = revision[-4:]
cls._PI_REVISION = revision
break
if cls._PI_REVISION is None:
return None # something weird going on
return pi_info(cls._PI_REVISION)
Read *n* words of data from the SPI interface, returning them as a
sequence of unsigned ints, each no larger than the configured
:attr:`bits_per_word` of the interface.
This method is typically used with read-only devices that feature
half-duplex communication. See :meth:`transfer` for full duplex
communication.
"""
return self.transfer((0,) * n)
def write(self, data):
"""
Write *data* to the SPI interface. *data* must be a sequence of
unsigned integer words each of which will fit within the configured
:attr:`bits_per_word` of the interface. The method returns the number
of words written to the interface (which may be less than or equal to
the length of *data*).
This method is typically used with write-only devices that feature
half-duplex communication. See :meth:`transfer` for full duplex
communication.
"""
return len(self.transfer(data))
def transfer(self, data):
"""
Write *data* to the SPI interface. *data* must be a sequence of
unsigned integer words each of which will fit within the configured
:attr:`bits_per_word` of the interface. The method returns the sequence
of words read from the interface while writing occurred (full duplex
communication).
The length of the sequence returned dictates the number of words of
*data* written to the interface. Each word in the returned sequence
will be an unsigned integer no larger than the configured
:attr:`bits_per_word` of the interface.
"""
raise NotImplementedError
@property
def clock_polarity(self):
"""
The polarity of the SPI clock pin. If this is ``False`` (the default),
the clock pin will idle low, and pulse high. Setting this to ``True``
will cause the clock pin to idle high, and pulse low. On many data
sheets this is documented as the CPOL value.
The following diagram illustrates the waveform when
:attr:`clock_polarity` is ``False`` (the default), equivalent to CPOL
0:
.. code-block:: text
on on on on on on on
,---. ,---. ,---. ,---. ,---. ,---. ,---.
CLK | | | | | | | | | | | | | |
| | | | | | | | | | | | | |
------' `---' `---' `---' `---' `---' `---' `------
idle off off off off off off idle
The following diagram illustrates the waveform when
:attr:`clock_polarity` is ``True``, equivalent to CPOL 1:
.. code-block:: text
idle off off off off off off idle
------. ,---. ,---. ,---. ,---. ,---. ,---. ,------
| | | | | | | | | | | | | |
CLK | | | | | | | | | | | | | |
`---' `---' `---' `---' `---' `---' `---'
on on on on on on on
"""
return bool(self.clock_mode & 2)
@clock_polarity.setter
def clock_polarity(self, value):
self.clock_mode = self.clock_mode & (~2) | (bool(value) << 1)
@property
def clock_phase(self):
"""
The phase of the SPI clock pin. If this is ``False`` (the default),
data will be read from the MISO pin when the clock pin activates.
Setting this to ``True`` will cause data to be read from the MISO pin
when the clock pin deactivates. On many data sheets this is documented
as the CPHA value. Whether the clock edge is rising or falling when the
clock is considered activated is controlled by the
:attr:`clock_polarity` attribute (corresponding to CPOL).
The following diagram indicates when data is read when
:attr:`clock_polarity` is ``False``, and :attr:`clock_phase` is
``False`` (the default), equivalent to CPHA 0:
.. code-block:: text
,---. ,---. ,---. ,---. ,---. ,---. ,---.
CLK | | | | | | | | | | | | | |
| | | | | | | | | | | | | |
----' `---' `---' `---' `---' `---' `---' `-------
: : : : : : :
MISO---. ,---. ,---. ,---. ,---. ,---. ,---.
/ \ / \ / \ / \ / \ / \ / \\
-{ Bit X Bit X Bit X Bit X Bit X Bit X Bit }------
\ / \ / \ / \ / \ / \ / \ /
`---' `---' `---' `---' `---' `---' `---'
The following diagram indicates when data is read when
:attr:`clock_polarity` is ``False``, but :attr:`clock_phase` is
``True``, equivalent to CPHA 1:
.. code-block:: text
,---. ,---. ,---. ,---. ,---. ,---. ,---.
CLK | | | | | | | | | | | | | |
| | | | | | | | | | | | | |
----' `---' `---' `---' `---' `---' `---' `-------
: : : : : : :
MISO ,---. ,---. ,---. ,---. ,---. ,---. ,---.
/ \ / \ / \ / \ / \ / \ / \\
-----{ Bit X Bit X Bit X Bit X Bit X Bit X Bit }--
\ / \ / \ / \ / \ / \ / \ /
`---' `---' `---' `---' `---' `---' `---'
"""
return bool(self.clock_mode & 1)
@clock_phase.setter
def clock_phase(self, value):
self.clock_mode = self.clock_mode & (~1) | bool(value)
def _get_clock_mode(self):
raise NotImplementedError
def _set_clock_mode(self, value):
raise SPIFixedClockMode("clock_mode cannot be changed on %r" % self)
clock_mode = property(
lambda self: self._get_clock_mode(),
lambda self, value: self._set_clock_mode(value),
doc="""\
Presents a value representing the :attr:`clock_polarity` and
:attr:`clock_phase` attributes combined according to the following
table:
+------+-----------------+--------------+
| mode | polarity (CPOL) | phase (CPHA) |
+======+=================+==============+
| 0 | False | False |
+------+-----------------+--------------+
| 1 | False | True |
+------+-----------------+--------------+
| 2 | True | False |
+------+-----------------+--------------+
| 3 | True | True |
+------+-----------------+--------------+
Adjusting this value adjusts both the :attr:`clock_polarity` and
:attr:`clock_phase` attributes simultaneously.
""")
def _get_lsb_first(self):
return False
def _set_lsb_first(self, value):
raise SPIFixedBitOrder("lsb_first cannot be changed on %r" % self)
lsb_first = property(
lambda self: self._get_lsb_first(),
lambda self, value: self._set_lsb_first(value),
doc="""\
Controls whether words are read and written LSB in (Least Significant
Bit first) order. The default is ``False`` indicating that words are
read and written in MSB (Most Significant Bit first) order.
Effectively, this controls the `Bit endianness`_ of the connection.
The following diagram shows the a word containing the number 5 (binary
0101) transmitted on MISO with :attr:`bits_per_word` set to 4, and
:attr:`clock_mode` set to 0, when :attr:`lsb_first` is ``False`` (the
default):
.. code-block:: text
,---. ,---. ,---. ,---.
CLK | | | | | | | |
| | | | | | | |
----' `---' `---' `---' `-----
: ,-------. : ,-------.
MISO: | : | : | : |
: | : | : | : |
----------' : `-------' : `----
: : : :
MSB LSB
And now with :attr:`lsb_first` set to ``True`` (and all other
parameters the same):
.. code-block:: text
,---. ,---. ,---. ,---.
CLK | | | | | | | |
| | | | | | | |
----' `---' `---' `---' `-----
,-------. : ,-------. :
MISO: | : | : | :
| : | : | : | :
--' : `-------' : `-----------
: : : :
LSB MSB
.. _Bit endianness: https://en.wikipedia.org/wiki/Endianness#Bit_endianness
""")
def _get_select_high(self):
return False
def _set_select_high(self, value):
raise SPIFixedSelect("select_high cannot be changed on %r" % self)
select_high = property(
lambda self: self._get_select_high(),
lambda self, value: self._set_select_high(value),
doc="""\
If ``False`` (the default), the chip select line is considered active
when it is pulled low. When set to ``True``, the chip select line is
considered active when it is driven high.
The following diagram shows the waveform of the chip select line, and
the clock when :attr:`clock_polarity` is ``False``, and
:attr:`select_high` is ``False`` (the default):
.. code-block:: text
---. ,------
__ | |
CS | chip is selected, and will react to clock | idle
`-----------------------------------------------------'
,---. ,---. ,---. ,---. ,---. ,---. ,---.
CLK | | | | | | | | | | | | | |
| | | | | | | | | | | | | |
----' `---' `---' `---' `---' `---' `---' `-------
And when :attr:`select_high` is ``True``:
.. code-block:: text
,-----------------------------------------------------.
CS | chip is selected, and will react to clock | idle
| |
---' `------
,---. ,---. ,---. ,---. ,---. ,---. ,---.
CLK | | | | | | | | | | | | | |
| | | | | | | | | | | | | |
----' `---' `---' `---' `---' `---' `---' `-------
""")
def _get_bits_per_word(self):
return 8
def _set_bits_per_word(self, value):
raise SPIFixedWordSize("bits_per_word cannot be changed on %r" % self)
bits_per_word = property(
lambda self: self._get_bits_per_word(),
lambda self, value: self._set_bits_per_word(value),
doc="""\
Controls the number of bits that make up a word, and thus where the
word boundaries appear in the data stream, and the maximum value of a
word. Defaults to 8 meaning that words are effectively bytes.
Several implementations do not support non-byte-sized words.
""")

View File

@@ -6,7 +6,6 @@ from __future__ import (
)
str = type('')
import io
import os
import sys
from textwrap import dedent
@@ -14,7 +13,7 @@ from itertools import cycle
from operator import attrgetter
from collections import namedtuple
from ..exc import PinUnknownPi, PinMultiplePins, PinNoPins
from ..exc import PinUnknownPi, PinMultiplePins, PinNoPins, PinInvalidPin
# Some useful constants for describing pins
@@ -120,8 +119,8 @@ A_BOARD = """\
BPLUS_BOARD = """\
{style:white on green},--------------------------------.{style:reset}
{style:white on green}| {P1:{style} col2}{style:white on green} P1 {style:black on white}+===={style:reset}
{style:white on green}| {P1:{style} col1}{style:white on green} {style:black on white}| USB{style:reset}
{style:white on green}| {J8:{style} col2}{style:white on green} J8 {style:black on white}+===={style:reset}
{style:white on green}| {J8:{style} col1}{style:white on green} {style:black on white}| USB{style:reset}
{style:white on green}| {style:black on white}+===={style:reset}
{style:white on green}| {style:bold}Pi Model {model:2s} V{pcb_revision:3s}{style:normal} |{style:reset}
{style:white on green}| {style:on black}+----+{style:on green} {style:black on white}+===={style:reset}
@@ -135,8 +134,8 @@ BPLUS_BOARD = """\
APLUS_BOARD = """\
{style:white on green},--------------------------.{style:reset}
{style:white on green}| {P1:{style} col2}{style:white on green} P1 |{style:reset}
{style:white on green}| {P1:{style} col1}{style:white on green} |{style:reset}
{style:white on green}| {J8:{style} col2}{style:white on green} J8 |{style:reset}
{style:white on green}| {J8:{style} col1}{style:white on green} |{style:reset}
{style:white on green}| |{style:reset}
{style:white on green}| {style:bold}Pi Model {model:2s} V{pcb_revision:3s}{style:normal} |{style:reset}
{style:white on green}| {style:on black}+----+{style:on green} {style:black on white}+===={style:reset}
@@ -150,20 +149,20 @@ APLUS_BOARD = """\
ZERO12_BOARD = """\
{style:white on green},-------------------------.{style:reset}
{style:white on green}| {P1:{style} col2}{style:white on green} P1 |{style:reset}
{style:white on green}| {P1:{style} col1}{style:white on green} |{style:reset}
{style:black on white}---+{style:white on green} {style:on black}+----+{style:on green} {style:bold}PiZero{style:normal} |{style:reset}
{style:black on white} sd|{style:white on green} {style:on black}|SoC |{style:on green} {style:bold}V{pcb_revision:3s}{style:normal} |{style:reset}
{style:black on white}---+|hdmi|{style:white on green} {style:on black}+----+{style:on green} {style:black on white}usb{style:on green} {style:black on white}pwr{style:white on green} |{style:reset}
{style:white on green}| {J8:{style} col2}{style:white on green} J8 |{style:reset}
{style:white on green}| {J8:{style} col1}{style:white on green} |{style:reset}
{style:black on white}---+{style:white on green} {style:on black}+---+{style:on green} {style:bold}PiZero{style:normal} |{style:reset}
{style:black on white} sd|{style:white on green} {style:on black}|SoC|{style:on green} {style:bold}V{pcb_revision:3s}{style:normal} |{style:reset}
{style:black on white}---+|hdmi|{style:white on green} {style:on black}+---+{style:on green} {style:black on white}usb{style:on green} {style:black on white}pwr{style:white on green} |{style:reset}
{style:white on green}`---{style:black on white}| |{style:white on green}--------{style:black on white}| |{style:white on green}-{style:black on white}| |{style:white on green}-'{style:reset}"""
ZERO13_BOARD = """\
{style:white on green}.-------------------------.{style:reset}
{style:white on green}| {P1:{style} col2}{style:white on green} P1 |{style:reset}
{style:white on green}| {P1:{style} col1}{style:white on green} {style:black on white}|c{style:reset}
{style:black on white}---+{style:white on green} {style:on black}+----+{style:on green} {style:bold}PiZero{style:normal} {style:black on white}|s{style:reset}
{style:black on white} sd|{style:white on green} {style:on black}|SoC |{style:on green} {style:bold}V{pcb_revision:3s}{style:normal} {style:black on white}|i{style:reset}
{style:black on white}---+|hdmi|{style:white on green} {style:on black}+----+{style:on green} {style:black on white}usb{style:on green} {style:on white}pwr{style:white on green} |{style:reset}
{style:white on green}| {J8:{style} col2}{style:white on green} J8 |{style:reset}
{style:white on green}| {J8:{style} col1}{style:white on green} {style:black on white}|c{style:reset}
{style:black on white}---+{style:white on green} {style:on black}+---+{style:on green} {style:bold}Pi{model:6s}{style:normal}{style:black on white}|s{style:reset}
{style:black on white} sd|{style:white on green} {style:on black}|SoC|{style:on green} {style:bold}V{pcb_revision:3s}{style:normal} {style:black on white}|i{style:reset}
{style:black on white}---+|hdmi|{style:white on green} {style:on black}+---+{style:on green} {style:black on white}usb{style:on green} {style:on white}pwr{style:white on green} |{style:reset}
{style:white on green}`---{style:black on white}| |{style:white on green}--------{style:black on white}| |{style:white on green}-{style:black on white}| |{style:white on green}-'{style:reset}"""
CM_BOARD = """\
@@ -217,7 +216,7 @@ REV2_P5 = {
7: (GND, False), 8: (GND, False),
}
PLUS_P1 = {
PLUS_J8 = {
1: (V3_3, False), 2: (V5, False),
3: (GPIO2, True), 4: (V5, False),
5: (GPIO3, True), 6: (GND, False),
@@ -344,6 +343,23 @@ CM_SODIMM = {
199: ('VBAT', False), 200: ('VBAT', False),
}
CM3_SODIMM = CM_SODIMM.copy()
CM3_SODIMM.update({
4: ('NC / SDX VREF', False),
6: ('NC / SDX VREF', False),
8: (GND, False),
10: ('NC / SDX CLK', False),
12: ('NC / SDX CMD', False),
14: (GND, False),
16: ('NC / SDX D0', False),
18: ('NC / SDX D1', False),
20: (GND, False),
22: ('NC / SDX D2', False),
24: ('NC / SDX D3', False),
88: ('HDMI HPD N 1V8', False),
90: ('EMMC EN N 1V8', False),
})
# The following data is sourced from a combination of the following locations:
#
# http://elinux.org/RPi_HardwareHistory
@@ -363,12 +379,12 @@ PI_REVISIONS = {
0xd: ('B', '2.0', '2012Q4', 'BCM2835', 'Egoman', 512, 'SD', 2, 1, False, False, 1, 1, {'P1': REV2_P1, 'P5': REV2_P5}, REV2_BOARD, ),
0xe: ('B', '2.0', '2012Q4', 'BCM2835', 'Sony', 512, 'SD', 2, 1, False, False, 1, 1, {'P1': REV2_P1, 'P5': REV2_P5}, REV2_BOARD, ),
0xf: ('B', '2.0', '2012Q4', 'BCM2835', 'Qisda', 512, 'SD', 2, 1, False, False, 1, 1, {'P1': REV2_P1, 'P5': REV2_P5}, REV2_BOARD, ),
0x10: ('B+', '1.2', '2014Q3', 'BCM2835', 'Sony', 512, 'MicroSD', 4, 1, False, False, 1, 1, {'P1': PLUS_P1}, BPLUS_BOARD, ),
0x10: ('B+', '1.2', '2014Q3', 'BCM2835', 'Sony', 512, 'MicroSD', 4, 1, False, False, 1, 1, {'J8': PLUS_J8}, BPLUS_BOARD, ),
0x11: ('CM', '1.1', '2014Q2', 'BCM2835', 'Sony', 512, 'eMMC', 1, 0, False, False, 2, 2, {'SODIMM': CM_SODIMM}, CM_BOARD, ),
0x12: ('A+', '1.1', '2014Q4', 'BCM2835', 'Sony', 256, 'MicroSD', 1, 0, False, False, 1, 1, {'P1': PLUS_P1}, APLUS_BOARD, ),
0x13: ('B+', '1.2', '2015Q1', 'BCM2835', 'Egoman', 512, 'MicroSD', 4, 1, False, False, 1, 1, {'P1': PLUS_P1}, BPLUS_BOARD, ),
0x12: ('A+', '1.1', '2014Q4', 'BCM2835', 'Sony', 256, 'MicroSD', 1, 0, False, False, 1, 1, {'J8': PLUS_J8}, APLUS_BOARD, ),
0x13: ('B+', '1.2', '2015Q1', 'BCM2835', 'Egoman', 512, 'MicroSD', 4, 1, False, False, 1, 1, {'J8': PLUS_J8}, BPLUS_BOARD, ),
0x14: ('CM', '1.1', '2014Q2', 'BCM2835', 'Embest', 512, 'eMMC', 1, 0, False, False, 2, 2, {'SODIMM': CM_SODIMM}, CM_BOARD, ),
0x15: ('A+', '1.1', '2014Q4', 'BCM2835', 'Embest', 256, 'MicroSD', 1, 0, False, False, 1, 1, {'P1': PLUS_P1}, APLUS_BOARD, ),
0x15: ('A+', '1.1', '2014Q4', 'BCM2835', 'Embest', 256, 'MicroSD', 1, 0, False, False, 1, 1, {'J8': PLUS_J8}, APLUS_BOARD, ),
}
@@ -513,7 +529,8 @@ class HeaderInfo(namedtuple('HeaderInfo', (
from gpiozero import *
print('{0:full}'.format(pi_info().headers['P1']))
print('{0}'.format(pi_info().headers['J8']))
print('{0:full}'.format(pi_info().headers['J8']))
print('{0:col2}'.format(pi_info().headers['P1']))
print('{0:row1}'.format(pi_info().headers['P1']))
@@ -521,10 +538,9 @@ class HeaderInfo(namedtuple('HeaderInfo', (
the use of `ANSI color codes`_. If neither is specified, ANSI codes will
only be used if stdout is detected to be a tty::
print('{0:color row2}'.format(pi_info().headers['P1'])) # force use of ANSI codes
print('{0:color row2}'.format(pi_info().headers['J8'])) # force use of ANSI codes
print('{0:mono row2}'.format(pi_info().headers['P1'])) # force plain ASCII
.. _ANSI color codes: https://en.wikipedia.org/wiki/ANSI_escape_code
The following attributes are defined:
.. automethod:: pprint
@@ -532,7 +548,7 @@ class HeaderInfo(namedtuple('HeaderInfo', (
.. attribute:: name
The name of the header, typically as it appears silk-screened on the
board (e.g. "P1").
board (e.g. "P1" or "J8").
.. attribute:: rows
@@ -545,6 +561,8 @@ class HeaderInfo(namedtuple('HeaderInfo', (
.. attribute:: pins
A dictionary mapping physical pin numbers to :class:`PinInfo` tuples.
.. _ANSI color codes: https://en.wikipedia.org/wiki/ANSI_escape_code
"""
__slots__ = () # workaround python issue #24931
@@ -563,7 +581,6 @@ class HeaderInfo(namedtuple('HeaderInfo', (
def _format_full(self, style):
Cell = namedtuple('Cell', ('content', 'align', 'style'))
pin_digits = len(str(self.rows * self.columns))
lines = []
for row in range(self.rows):
line = []
@@ -643,7 +660,6 @@ class HeaderInfo(namedtuple('HeaderInfo', (
print('{0:{style} full}'.format(self, style=Style(color)))
class PiBoardInfo(namedtuple('PiBoardInfo', (
'revision',
'model',
@@ -671,6 +687,7 @@ class PiBoardInfo(namedtuple('PiBoardInfo', (
from gpiozero import *
print('{0}'.format(pi_info()))
print('{0:full}'.format(pi_info()))
print('{0:board}'.format(pi_info()))
print('{0:specs}'.format(pi_info()))
@@ -787,8 +804,8 @@ class PiBoardInfo(namedtuple('PiBoardInfo', (
A dictionary which maps header labels to :class:`HeaderInfo` tuples.
For example, to obtain information about header P1 you would query
``headers['P1']``. To obtain information about pin 12 on header P1 you
would query ``headers['P1'].pins[12]``.
``headers['P1']``. To obtain information about pin 12 on header J8 you
would query ``headers['J8'].pins[12]``.
A rendered version of this data can be obtained by using the
:class:`PiBoardInfo` object in a format string::
@@ -821,96 +838,120 @@ class PiBoardInfo(namedtuple('PiBoardInfo', (
# uuuuuuuu - Unused
# F - New flag (1=valid new-style revision, 0=old-style)
# MMM - Memory size (0=256, 1=512, 2=1024)
# CCCC - Manufacturer (0=Sony, 1=Egoman, 2=Embest)
# CCCC - Manufacturer (0=Sony, 1=Egoman, 2=Embest, 3=Sony Japan)
# PPPP - Processor (0=2835, 1=2836, 2=2837)
# TTTTTTTT - Type (0=A, 1=B, 2=A+, 3=B+, 4=2B, 5=Alpha (??), 6=CM, 8=3B, 9=Zero)
# TTTTTTTT - Type (0=A, 1=B, 2=A+, 3=B+, 4=2B, 5=Alpha (??), 6=CM,
# 8=3B, 9=Zero, 10=CM3, 12=Zero W)
# RRRR - Revision (0, 1, 2, etc.)
revcode_memory = (revision & 0x700000) >> 20
revcode_manufacturer = (revision & 0xf0000) >> 16
revcode_processor = (revision & 0xf000) >> 12
revcode_type = (revision & 0xff0) >> 4
revcode_revision = (revision & 0x0f)
try:
model = {
0: 'A',
1: 'B',
2: 'A+',
3: 'B+',
4: '2B',
6: 'CM',
8: '3B',
9: 'Zero',
}[(revision & 0xff0) >> 4]
0: 'A',
1: 'B',
2: 'A+',
3: 'B+',
4: '2B',
6: 'CM',
8: '3B',
9: 'Zero',
10: 'CM3',
12: 'Zero W',
}[revcode_type]
if model in ('A', 'B'):
pcb_revision = {
0: '1.0', # is this right?
1: '1.0',
2: '2.0',
}[revision & 0x0f]
}.get(revcode_revision, 'Unknown')
else:
pcb_revision = '1.%d' % (revision & 0x0f)
released = {
'A': '2013Q1',
'B': '2012Q1' if pcb_revision == '1.0' else '2012Q4',
'A+': '2014Q4',
'B+': '2014Q3',
'2B': '2015Q1' if pcb_revision == '1.0' or pcb_revision == '1.1' else '2016Q3',
'CM': '2014Q2',
'3B': '2016Q1',
'Zero': '2015Q4' if pcb_revision == '1.2' else '2016Q2',
}[model]
pcb_revision = '1.%d' % revcode_revision
soc = {
0: 'BCM2835',
1: 'BCM2836',
2: 'BCM2837',
}[(revision & 0xf000) >> 12]
}.get(revcode_processor, 'Unknown')
manufacturer = {
0: 'Sony',
1: 'Egoman',
2: 'Embest',
}[(revision & 0xf0000) >> 16]
3: 'Sony Japan',
}.get(revcode_manufacturer, 'Unknown')
memory = {
0: 256,
1: 512,
2: 1024,
}[(revision & 0x700000) >> 20]
}.get(revcode_memory, 0)
released = {
'A': '2013Q1',
'B': '2012Q1' if pcb_revision == '1.0' else '2012Q4',
'A+': '2014Q4' if memory == 512 else '2016Q3',
'B+': '2014Q3',
'2B': '2015Q1' if pcb_revision in ('1.0', '1.1') else '2016Q3',
'CM': '2014Q2',
'3B': '2016Q1' if manufacturer in ('Sony', 'Embest') else '2016Q4',
'Zero': '2015Q4' if pcb_revision == '1.2' else '2016Q2',
'CM3': '2017Q1',
'Zero W': '2017Q1',
}.get(model, 'Unknown')
storage = {
'A': 'SD',
'B': 'SD',
'CM': 'eMMC',
'A': 'SD',
'B': 'SD',
'CM': 'eMMC',
'CM3': 'eMMC / off-board',
}.get(model, 'MicroSD')
usb = {
'A': 1,
'A+': 1,
'Zero': 1,
'B': 2,
'CM': 1,
'A': 1,
'A+': 1,
'Zero': 1,
'Zero W': 1,
'B': 2,
'CM': 0,
'CM3': 1,
}.get(model, 4)
ethernet = {
'A': 0,
'A+': 0,
'Zero': 0,
'CM': 0,
'A': 0,
'A+': 0,
'Zero': 0,
'Zero W': 0,
'CM': 0,
'CM3': 0,
}.get(model, 1)
wifi = {
'3B': True,
'3B': True,
'Zero W': True,
}.get(model, False)
bluetooth = {
'3B': True,
'3B': True,
'Zero W': True,
}.get(model, False)
csi = {
'Zero': 0 if pcb_revision == '1.2' else 1,
'CM': 2,
'Zero': 0 if pcb_revision == '1.0' else 1,
'Zero W': 1,
'CM': 2,
'CM3': 2,
}.get(model, 1)
dsi = {
'Zero': 0,
'Zero': 0,
'Zero W': 0,
}.get(model, csi)
headers = {
'A': {'P1': REV2_P1, 'P5': REV2_P5},
'B': {'P1': REV1_P1} if pcb_revision == '1.0' else {'P1': REV2_P1, 'P5': REV2_P5},
'CM': {'SODIMM': CM_SODIMM},
}.get(model, {'P1': PLUS_P1})
'A': {'P1': REV2_P1, 'P5': REV2_P5},
'B': {'P1': REV1_P1} if pcb_revision == '1.0' else {'P1': REV2_P1, 'P5': REV2_P5},
'CM': {'SODIMM': CM_SODIMM},
'CM3': {'SODIMM': CM3_SODIMM},
}.get(model, {'J8': PLUS_J8})
board = {
'A': A_BOARD,
'B': REV1_BOARD if pcb_revision == '1.0' else REV2_BOARD,
'A+': APLUS_BOARD,
'CM': CM_BOARD,
'Zero': ZERO12_BOARD if pcb_revision == '1.2' else ZERO13_BOARD,
'A': A_BOARD,
'B': REV1_BOARD if pcb_revision == '1.0' else REV2_BOARD,
'A+': APLUS_BOARD,
'CM': CM_BOARD,
'CM3': CM_BOARD,
'Zero': ZERO12_BOARD if pcb_revision == '1.2' else ZERO13_BOARD,
'Zero W': ZERO13_BOARD,
}.get(model, BPLUS_BOARD)
except KeyError:
raise PinUnknownPi('unable to parse new-style revision "%x"' % revision)
@@ -1077,8 +1118,8 @@ class PiBoardInfo(namedtuple('PiBoardInfo', (
"""
Pretty-print a representation of the board along with header diagrams.
If *color* is ``None`` (the default, the diagram will include ANSI
color codes if stdout is a color-capable terminal). Otherwise *color*
If *color* is ``None`` (the default), the diagram will include ANSI
color codes if stdout is a color-capable terminal. Otherwise *color*
can be set to ``True`` or ``False`` to force color or monochrome
output.
"""
@@ -1096,13 +1137,10 @@ def pi_info(revision=None):
the model of Pi it is running on and return information about that.
"""
if revision is None:
# NOTE: This import is declared locally for two reasons. Firstly it
# avoids a circular dependency (devices->pins->pins.data->devices).
# Secondly, pin_factory is one global which might potentially be
# re-written by a user's script at runtime hence we should re-import
# here in case it's changed since initialization
from ..devices import pin_factory
result = pin_factory.pi_info()
# The reason this import is located here is to avoid a circular
# dependency; devices->pins.local->pins.data->devices
from ..devices import Device
result = Device.pin_factory.pi_info
if result is None:
raise PinUnknownPi('The default pin_factory is not attached to a Pi')
else:

245
gpiozero/pins/local.py Normal file
View File

@@ -0,0 +1,245 @@
from __future__ import (
unicode_literals,
absolute_import,
print_function,
division,
)
str = type('')
import io
import warnings
try:
from spidev import SpiDev
except ImportError:
SpiDev = None
from . import SPI
from .pi import PiFactory, PiPin, SPI_HARDWARE_PINS
from .spi import SPISoftwareBus
from ..devices import Device, SharedMixin
from ..output_devices import OutputDevice
from ..exc import DeviceClosed, PinUnknownPi, SPIInvalidClockMode
class LocalPiFactory(PiFactory):
"""
Abstract base class representing pins attached locally to a Pi. This forms
the base class for local-only pin interfaces
(:class:`~gpiozero.pins.rpigpio.RPiGPIOPin`,
:class:`~gpiozero.pins.rpio.RPIOPin`, and
:class:`~gpiozero.pins.native.NativePin`).
"""
pins = {}
def __init__(self):
super(LocalPiFactory, self).__init__()
self.spi_classes = {
('hardware', 'exclusive'): LocalPiHardwareSPI,
('hardware', 'shared'): LocalPiHardwareSPIShared,
('software', 'exclusive'): LocalPiSoftwareSPI,
('software', 'shared'): LocalPiSoftwareSPIShared,
}
# Override the pins dict to be this class' pins dict. This is a bit of
# a dirty hack, but ensures that anyone evil enough to mix pin
# implementations doesn't try and control the same pin with different
# backends
self.pins = LocalPiFactory.pins
def _get_address(self):
return ('localhost',)
def _get_revision(self):
# Cache the result as we can reasonably assume it won't change during
# runtime (this is LocalPin after all; descendents that deal with
# remote Pis should inherit from Pin instead)
with io.open('/proc/cpuinfo', 'r') as f:
for line in f:
if line.startswith('Revision'):
revision = line.split(':')[1].strip().lower()
overvolted = revision.startswith('100')
if overvolted:
revision = revision[-4:]
return revision
raise PinUnknownPi('unable to locate Pi revision in /proc/cpuinfo')
class LocalPiPin(PiPin):
"""
Abstract base class representing a multi-function GPIO pin attached to the
local Raspberry Pi.
"""
pass
class LocalPiHardwareSPI(SPI, Device):
def __init__(self, factory, port, device):
if SpiDev is None:
raise ImportError('failed to import spidev')
self._port = port
self._device = device
self._interface = None
self._address = factory.address + ('SPI(port=%d, device=%d)' % (port, device),)
super(LocalPiHardwareSPI, self).__init__()
pins = SPI_HARDWARE_PINS[port]
self._reserve_pins(
factory.pin_address(pins['clock']),
factory.pin_address(pins['mosi']),
factory.pin_address(pins['miso']),
factory.pin_address(pins['select'][device])
)
self._interface = SpiDev()
self._interface.open(port, device)
self._interface.max_speed_hz = 500000
def close(self):
if self._interface:
try:
self._interface.close()
finally:
self._interface = None
self._release_all()
super(LocalPiHardwareSPI, self).close()
@property
def closed(self):
return self._interface is None
def __repr__(self):
try:
self._check_open()
return 'SPI(port=%d, device=%d)' % (self._port, self._device)
except DeviceClosed:
return 'SPI(closed)'
def transfer(self, data):
"""
Writes data (a list of integer words where each word is assumed to have
:attr:`bits_per_word` bits or less) to the SPI interface, and reads an
equivalent number of words, returning them as a list of integers.
"""
return self._interface.xfer2(data)
def _get_clock_mode(self):
return self._interface.mode
def _set_clock_mode(self, value):
self._interface.mode = value
def _get_lsb_first(self):
return self._interface.lsbfirst
def _set_lsb_first(self, value):
self._interface.lsbfirst = bool(value)
def _get_select_high(self):
return self._interface.cshigh
def _set_select_high(self, value):
self._interface.cshigh = bool(value)
def _get_bits_per_word(self):
return self._interface.bits_per_word
def _set_bits_per_word(self, value):
self._interface.bits_per_word = value
class LocalPiSoftwareSPI(SPI, OutputDevice):
def __init__(self, factory, clock_pin, mosi_pin, miso_pin, select_pin):
self._bus = None
self._address = factory.address + (
'SPI(clock_pin=%d, mosi_pin=%d, miso_pin=%d, select_pin=%d)' % (
clock_pin, mosi_pin, miso_pin, select_pin),
)
super(LocalPiSoftwareSPI, self).__init__(select_pin, active_high=False)
try:
self._clock_phase = False
self._lsb_first = False
self._bits_per_word = 8
self._bus = SPISoftwareBus(clock_pin, mosi_pin, miso_pin)
except:
self.close()
raise
def _conflicts_with(self, other):
return not (
isinstance(other, LocalPiSoftwareSPI) and
(self.pin.number != other.pin.number)
)
def close(self):
if self._bus:
self._bus.close()
self._bus = None
super(LocalPiSoftwareSPI, self).close()
@property
def closed(self):
return self._bus is None
def __repr__(self):
try:
self._check_open()
return 'SPI(clock_pin=%d, mosi_pin=%d, miso_pin=%d, select_pin=%d)' % (
self._bus.clock.pin.number,
self._bus.mosi.pin.number,
self._bus.miso.pin.number,
self.pin.number)
except DeviceClosed:
return 'SPI(closed)'
def transfer(self, data):
with self._bus.lock:
self.on()
try:
return self._bus.transfer(
data, self._clock_phase, self._lsb_first, self._bits_per_word)
finally:
self.off()
def _get_clock_mode(self):
with self._bus.lock:
return (not self._bus.clock.active_high) << 1 | self._clock_phase
def _set_clock_mode(self, value):
if not (0 <= value < 4):
raise SPIInvalidClockMode("%d is not a valid clock mode" % value)
with self._bus.lock:
self._bus.clock.active_high = not (value & 2)
self._clock_phase = bool(value & 1)
def _get_lsb_first(self):
return self._lsb_first
def _set_lsb_first(self, value):
self._lsb_first = bool(value)
def _get_bits_per_word(self):
return self._bits_per_word
def _set_bits_per_word(self, value):
if value < 1:
raise ValueError('bits_per_word must be positive')
self._bits_per_word = int(value)
def _get_select_high(self):
return self.active_high
def _set_select_high(self, value):
with self._bus.lock:
self.active_high = value
self.off()
class LocalPiHardwareSPIShared(SharedMixin, LocalPiHardwareSPI):
@classmethod
def _shared_key(cls, factory, port, device):
return (port, device)
class LocalPiSoftwareSPIShared(SharedMixin, LocalPiSoftwareSPI):
@classmethod
def _shared_key(cls, factory, clock_pin, mosi_pin, miso_pin, select_pin):
return (select_pin,)

View File

@@ -7,6 +7,7 @@ from __future__ import (
str = type('')
import os
from collections import namedtuple
from time import time, sleep
from threading import Thread, Event
@@ -15,56 +16,36 @@ try:
except ImportError:
from ..compat import isclose
from . import Pin
from .data import pi_info
from ..exc import PinSetInput, PinPWMUnsupported, PinFixedPull
import pkg_resources
from ..exc import (
PinPWMUnsupported,
PinSetInput,
PinFixedPull,
PinInvalidFunction,
PinInvalidPull,
)
from ..devices import Device
from .pi import PiPin
from .local import LocalPiFactory
PinState = namedtuple('PinState', ('timestamp', 'state'))
class MockPin(Pin):
class MockPin(PiPin):
"""
A mock pin used primarily for testing. This class does *not* support PWM.
"""
_PINS = {}
@classmethod
def clear_pins(cls):
cls._PINS.clear()
@classmethod
def pi_info(cls):
return pi_info('a21041') # Pretend we're a Pi 2B
def __new__(cls, number):
if not (0 <= number < 54):
raise ValueError('invalid pin %d specified (must be 0..53)' % number)
try:
old_pin = cls._PINS[number]
except KeyError:
self = super(MockPin, cls).__new__(cls)
cls._PINS[number] = self
self._number = number
self._function = 'input'
self._state = False
self._pull = 'floating'
self._bounce = None
self._edges = 'both'
self._when_changed = None
self.clear_states()
return self
# Ensure the pin class expected supports PWM (or not)
if issubclass(cls, MockPWMPin) != isinstance(old_pin, MockPWMPin):
raise ValueError('pin %d is already in use as a %s' % (number, old_pin.__class__.__name__))
return old_pin
def __repr__(self):
return 'MOCK%d' % self._number
@property
def number(self):
return self._number
def __init__(self, factory, number):
super(MockPin, self).__init__(factory, number)
self._function = 'input'
self._pull = 'up' if factory.pi_info.pulled_up(self.address[-1]) else 'floating'
self._state = self._pull == 'up'
self._bounce = None
self._edges = 'both'
self._when_changed = None
self.clear_states()
def close(self):
self.when_changed = None
@@ -74,7 +55,8 @@ class MockPin(Pin):
return self._function
def _set_function(self, value):
assert value in ('input', 'output')
if value not in ('input', 'output'):
raise PinInvalidFunction('function must be input or output')
self._function = value
if value == 'input':
# Drive the input to the pull
@@ -110,8 +92,12 @@ class MockPin(Pin):
return self._pull
def _set_pull(self, value):
assert self._function == 'input'
assert value in ('floating', 'up', 'down')
if self.function != 'input':
raise PinFixedPull('cannot set pull on non-input pin %r' % self)
if value != 'up' and self.factory.pi_info.pulled_up(self.address[-1]):
raise PinFixedPull('%r has a physical pull-up resistor' % self)
if value not in ('floating', 'up', 'down'):
raise PinInvalidPull('pull must be floating, up, or down')
self._pull = value
if value == 'up':
self.drive_high()
@@ -169,14 +155,23 @@ class MockPin(Pin):
assert isclose(actual.state, expected[1])
class MockPulledUpPin(MockPin):
class MockConnectedPin(MockPin):
"""
This derivative of :class:`MockPin` emulates a pin with a physical pull-up
resistor.
This derivative of :class:`MockPin` emulates a pin connected to another
mock pin. This is used in the "real pins" portion of the test suite to
check that one pin can influence another.
"""
def _set_pull(self, value):
if value != 'up':
raise PinFixedPull('pin has a physical pull-up resistor')
def __init__(self, factory, number, input_pin=None):
super(MockConnectedPin, self).__init__(factory, number)
self.input_pin = input_pin
def _change_state(self, value):
if self.input_pin:
if value:
self.input_pin.drive_high()
else:
self.input_pin.drive_low()
return super(MockConnectedPin, self)._change_state(value)
class MockChargingPin(MockPin):
@@ -186,9 +181,9 @@ class MockChargingPin(MockPin):
(as if attached to, e.g. a typical circuit using an LDR and a capacitor
to time the charging rate).
"""
def __init__(self, number):
super(MockChargingPin, self).__init__()
self.charge_time = 0.01 # dark charging time
def __init__(self, factory, number, charge_time=0.01):
super(MockChargingPin, self).__init__(factory, number)
self.charge_time = charge_time # dark charging time
self._charge_stop = Event()
self._charge_thread = None
@@ -225,10 +220,10 @@ class MockTriggerPin(MockPin):
corresponding pin instance. When this pin is driven high it will trigger
the echo pin to drive high for the echo time.
"""
def __init__(self, number):
super(MockTriggerPin, self).__init__()
self.echo_pin = None
self.echo_time = 0.04 # longest echo time
def __init__(self, factory, number, echo_pin=None, echo_time=0.04):
super(MockTriggerPin, self).__init__(factory, number)
self.echo_pin = echo_pin
self.echo_time = echo_time # longest echo time
self._echo_thread = None
def _set_state(self, value):
@@ -250,8 +245,8 @@ class MockPWMPin(MockPin):
"""
This derivative of :class:`MockPin` adds PWM support.
"""
def __init__(self, number):
super(MockPWMPin, self).__init__()
def __init__(self, factory, number):
super(MockPWMPin, self).__init__(factory, number)
self._frequency = None
def close(self):
@@ -283,8 +278,8 @@ class MockSPIClockPin(MockPin):
rather, construct a :class:`MockSPIDevice` with various pin numbers, and
this class will be used for the clock pin.
"""
def __init__(self, number):
super(MockSPIClockPin, self).__init__()
def __init__(self, factory, number):
super(MockSPIClockPin, self).__init__(factory, number)
if not hasattr(self, 'spi_devices'):
self.spi_devices = []
@@ -301,8 +296,8 @@ class MockSPISelectPin(MockPin):
tests; rather, construct a :class:`MockSPIDevice` with various pin numbers,
and this class will be used for the select pin.
"""
def __init__(self, number):
super(MockSPISelectPin, self).__init__()
def __init__(self, factory, number):
super(MockSPISelectPin, self).__init__(factory, number)
if not hasattr(self, 'spi_device'):
self.spi_device = None
@@ -314,13 +309,13 @@ class MockSPISelectPin(MockPin):
class MockSPIDevice(object):
def __init__(
self, clock_pin, mosi_pin, miso_pin, select_pin=None,
self, clock_pin, mosi_pin=None, miso_pin=None, select_pin=None,
clock_polarity=False, clock_phase=False, lsb_first=False,
bits_per_word=8, select_high=False):
self.clock_pin = MockSPIClockPin(clock_pin)
self.mosi_pin = None if mosi_pin is None else MockPin(mosi_pin)
self.miso_pin = None if miso_pin is None else MockPin(miso_pin)
self.select_pin = None if select_pin is None else MockSPISelectPin(select_pin)
self.clock_pin = Device.pin_factory.pin(clock_pin, pin_class=MockSPIClockPin)
self.mosi_pin = None if mosi_pin is None else Device.pin_factory.pin(mosi_pin)
self.miso_pin = None if miso_pin is None else Device.pin_factory.pin(miso_pin)
self.select_pin = None if select_pin is None else Device.pin_factory.pin(select_pin, pin_class=MockSPISelectPin)
self.clock_polarity = clock_polarity
self.clock_phase = clock_phase
self.lsb_first = lsb_first
@@ -413,3 +408,42 @@ class MockSPIDevice(object):
bits = reversed(bits)
self.tx_buf.extend(bits)
class MockFactory(LocalPiFactory):
def __init__(
self, revision=os.getenv('GPIOZERO_MOCK_REVISION', 'a21041'),
pin_class=os.getenv('GPIOZERO_MOCK_PIN_CLASS', MockPin)):
super(MockFactory, self).__init__()
self._revision = revision
if not issubclass(pin_class, MockPin):
if isinstance(pin_class, bytes):
pin_class = pin_class.decode('ascii')
dist = pkg_resources.get_distribution('gpiozero')
group = 'gpiozero_mock_pin_classes'
pin_class = pkg_resources.load_entry_point(dist, group, pin_class.lower())
self.pin_class = pin_class
def _get_address(self):
return ('mock',)
def _get_revision(self):
return self._revision
def reset(self):
self.pins.clear()
def pin(self, spec, pin_class=None, **kwargs):
if pin_class is None:
pin_class = self.pin_class
n = self._to_gpio(spec)
try:
pin = self.pins[n]
except KeyError:
pin = pin_class(self, n, **kwargs)
self.pins[n] = pin
else:
# Ensure the pin class expected supports PWM (or not)
if issubclass(pin_class, MockPWMPin) != isinstance(pin, MockPWMPin):
raise ValueError('pin %d is already in use as a %s' % (n, pin.__class__.__name__))
return pin

View File

@@ -17,16 +17,13 @@ from time import sleep
from threading import Thread, Event, Lock
from collections import Counter
from . import LocalPin, PINS_CLEANUP
from .data import pi_info
from .local import LocalPiPin, LocalPiFactory
from ..exc import (
PinInvalidPull,
PinInvalidEdges,
PinInvalidFunction,
PinFixedPull,
PinSetInput,
PinNonPhysical,
PinNoPins,
)
@@ -149,7 +146,7 @@ class GPIOFS(object):
f.write(str(pin).encode('ascii'))
class NativePin(LocalPin):
class NativeFactory(LocalPiFactory):
"""
Uses a built-in pure Python implementation to interface to the Pi's GPIO
pins. This is the default pin implementation if no third-party libraries
@@ -169,10 +166,17 @@ class NativePin(LocalPin):
led = LED(NativePin(12))
"""
def __init__(self):
super(NativeFactory, self).__init__()
self.mem = GPIOMemory()
self.pin_class = NativePin
_MEM = None
_PINS = {}
def close(self):
super(NativeFactory, self).close()
self.mem.close()
class NativePin(LocalPiPin):
GPIO_FUNCTIONS = {
'input': 0b000,
'output': 0b001,
@@ -202,89 +206,62 @@ class NativePin(LocalPin):
GPIO_PULL_UP_NAMES = {v: k for (k, v) in GPIO_PULL_UPS.items()}
GPIO_EDGES_NAMES = {v: k for (k, v) in GPIO_EDGES.items()}
PI_INFO = None
def __new__(cls, number):
if not cls._PINS:
cls._MEM = GPIOMemory()
PINS_CLEANUP.append(cls._MEM.close)
if cls.PI_INFO is None:
cls.PI_INFO = pi_info()
if not (0 <= number < 54):
raise ValueError('invalid pin %d specified (must be 0..53)' % number)
try:
return cls._PINS[number]
except KeyError:
self = super(NativePin, cls).__new__(cls)
try:
cls.PI_INFO.physical_pin('GPIO%d' % number)
except PinNoPins:
warnings.warn(
PinNonPhysical(
'no physical pins exist for GPIO%d' % number))
self._number = number
self._func_offset = self._MEM.GPFSEL_OFFSET + (number // 10)
self._func_shift = (number % 10) * 3
self._set_offset = self._MEM.GPSET_OFFSET + (number // 32)
self._set_shift = number % 32
self._clear_offset = self._MEM.GPCLR_OFFSET + (number // 32)
self._clear_shift = number % 32
self._level_offset = self._MEM.GPLEV_OFFSET + (number // 32)
self._level_shift = number % 32
self._pull_offset = self._MEM.GPPUDCLK_OFFSET + (number // 32)
self._pull_shift = number % 32
self._edge_offset = self._MEM.GPEDS_OFFSET + (number // 32)
self._edge_shift = number % 32
self._rising_offset = self._MEM.GPREN_OFFSET + (number // 32)
self._rising_shift = number % 32
self._falling_offset = self._MEM.GPFEN_OFFSET + (number // 32)
self._falling_shift = number % 32
self._when_changed = None
self._change_thread = None
self._change_event = Event()
self.function = 'input'
self.pull = 'up' if cls.PI_INFO.pulled_up('GPIO%d' % number) else 'floating'
self.bounce = None
self.edges = 'both'
cls._PINS[number] = self
return self
def __repr__(self):
return "GPIO%d" % self._number
@property
def number(self):
return self._number
def __init__(self, factory, number):
super(NativePin, self).__init__(factory, number)
self._func_offset = self.factory.mem.GPFSEL_OFFSET + (number // 10)
self._func_shift = (number % 10) * 3
self._set_offset = self.factory.mem.GPSET_OFFSET + (number // 32)
self._set_shift = number % 32
self._clear_offset = self.factory.mem.GPCLR_OFFSET + (number // 32)
self._clear_shift = number % 32
self._level_offset = self.factory.mem.GPLEV_OFFSET + (number // 32)
self._level_shift = number % 32
self._pull_offset = self.factory.mem.GPPUDCLK_OFFSET + (number // 32)
self._pull_shift = number % 32
self._edge_offset = self.factory.mem.GPEDS_OFFSET + (number // 32)
self._edge_shift = number % 32
self._rising_offset = self.factory.mem.GPREN_OFFSET + (number // 32)
self._rising_shift = number % 32
self._falling_offset = self.factory.mem.GPFEN_OFFSET + (number // 32)
self._falling_shift = number % 32
self._when_changed = None
self._change_thread = None
self._change_event = Event()
self.function = 'input'
self.pull = 'up' if factory.pi_info.pulled_up(self.address[-1]) else 'floating'
self.bounce = None
self.edges = 'both'
def close(self):
self.frequency = None
self.when_changed = None
self.function = 'input'
self.pull = 'up' if self.PI_INFO.pulled_up('GPIO%d' % self.number) else 'floating'
self.pull = 'up' if self.factory.pi_info.pulled_up(self.address[-1]) else 'floating'
def _get_function(self):
return self.GPIO_FUNCTION_NAMES[(self._MEM[self._func_offset] >> self._func_shift) & 7]
return self.GPIO_FUNCTION_NAMES[(self.factory.mem[self._func_offset] >> self._func_shift) & 7]
def _set_function(self, value):
try:
value = self.GPIO_FUNCTIONS[value]
except KeyError:
raise PinInvalidFunction('invalid function "%s" for pin %r' % (value, self))
self._MEM[self._func_offset] = (
self._MEM[self._func_offset]
self.factory.mem[self._func_offset] = (
self.factory.mem[self._func_offset]
& ~(7 << self._func_shift)
| (value << self._func_shift)
)
def _get_state(self):
return bool(self._MEM[self._level_offset] & (1 << self._level_shift))
return bool(self.factory.mem[self._level_offset] & (1 << self._level_shift))
def _set_state(self, value):
if self.function == 'input':
raise PinSetInput('cannot set state of pin %r' % self)
if value:
self._MEM[self._set_offset] = 1 << self._set_shift
self.factory.mem[self._set_offset] = 1 << self._set_shift
else:
self._MEM[self._clear_offset] = 1 << self._clear_shift
self.factory.mem[self._clear_offset] = 1 << self._clear_shift
def _get_pull(self):
return self.GPIO_PULL_UP_NAMES[self._pull]
@@ -292,23 +269,23 @@ class NativePin(LocalPin):
def _set_pull(self, value):
if self.function != 'input':
raise PinFixedPull('cannot set pull on non-input pin %r' % self)
if value != 'up' and self.PI_INFO.pulled_up('GPIO%d' % self.number):
if value != 'up' and self.factory.pi_info.pulled_up(self.address[-1]):
raise PinFixedPull('%r has a physical pull-up resistor' % self)
try:
value = self.GPIO_PULL_UPS[value]
except KeyError:
raise PinInvalidPull('invalid pull direction "%s" for pin %r' % (value, self))
self._MEM[self._MEM.GPPUD_OFFSET] = value
self.factory.mem[self.factory.mem.GPPUD_OFFSET] = value
sleep(0.000000214)
self._MEM[self._pull_offset] = 1 << self._pull_shift
self.factory.mem[self._pull_offset] = 1 << self._pull_shift
sleep(0.000000214)
self._MEM[self._MEM.GPPUD_OFFSET] = 0
self._MEM[self._pull_offset] = 0
self.factory.mem[self.factory.mem.GPPUD_OFFSET] = 0
self.factory.mem[self._pull_offset] = 0
self._pull = value
def _get_edges(self):
rising = bool(self._MEM[self._rising_offset] & (1 << self._rising_shift))
falling = bool(self._MEM[self._falling_offset] & (1 << self._falling_shift))
rising = bool(self.factory.mem[self._rising_offset] & (1 << self._rising_shift))
falling = bool(self.factory.mem[self._falling_offset] & (1 << self._falling_shift))
return self.GPIO_EDGES_NAMES[(rising, falling)]
def _set_edges(self, value):
@@ -319,43 +296,36 @@ class NativePin(LocalPin):
f = self.when_changed
self.when_changed = None
try:
self._MEM[self._rising_offset] = (
self._MEM[self._rising_offset]
self.factory.mem[self._rising_offset] = (
self.factory.mem[self._rising_offset]
& ~(1 << self._rising_shift)
| (rising << self._rising_shift)
)
self._MEM[self._falling_offset] = (
self._MEM[self._falling_offset]
self.factory.mem[self._falling_offset] = (
self.factory.mem[self._falling_offset]
& ~(1 << self._falling_shift)
| (falling << self._falling_shift)
)
finally:
self.when_changed = f
def _get_when_changed(self):
return self._when_changed
def _enable_event_detect(self):
self._change_thread = Thread(target=self._change_watch)
self._change_thread.daemon = True
self._change_event.clear()
self._change_thread.start()
def _set_when_changed(self, value):
if self._when_changed is None and value is not None:
self._when_changed = value
self._change_thread = Thread(target=self._change_watch)
self._change_thread.daemon = True
self._change_event.clear()
self._change_thread.start()
elif self._when_changed is not None and value is None:
self._change_event.set()
self._change_thread.join()
self._change_thread = None
self._when_changed = None
else:
self._when_changed = value
def _disable_event_detect(self):
self._change_event.set()
self._change_thread.join()
self._change_thread = None
def _change_watch(self):
offset = self._edge_offset
mask = 1 << self._edge_shift
self._MEM[offset] = mask # clear any existing detection bit
self.factory.mem[offset] = mask # clear any existing detection bit
while not self._change_event.wait(0.001):
if self._MEM[offset] & mask:
self._MEM[offset] = mask
self._when_changed()
if self.factory.mem[offset] & mask:
self.factory.mem[offset] = mask
self._call_when_changed()

307
gpiozero/pins/pi.py Normal file
View File

@@ -0,0 +1,307 @@
from __future__ import (
unicode_literals,
absolute_import,
print_function,
division,
)
str = type('')
import io
from threading import RLock
from types import MethodType
try:
from weakref import ref, WeakMethod
except ImportError:
from ..compat import WeakMethod
import warnings
try:
from spidev import SpiDev
except ImportError:
SpiDev = None
from . import Factory, Pin
from .data import pi_info
from ..exc import (
PinNoPins,
PinNonPhysical,
PinInvalidPin,
SPIBadArgs,
SPISoftwareFallback,
)
SPI_HARDWARE_PINS = {
0: {
'clock': 11,
'mosi': 10,
'miso': 9,
'select': (8, 7),
},
}
class PiFactory(Factory):
"""
Abstract base class representing hardware attached to a Raspberry Pi. This
forms the base of :class:`~gpiozero.pins.local.LocalPiFactory`.
"""
def __init__(self):
self._info = None
self.pins = {}
self.pin_class = None
self.spi_classes = {
('hardware', 'exclusive'): None,
('hardware', 'shared'): None,
('software', 'exclusive'): None,
('software', 'shared'): None,
}
def close(self):
for pin in self.pins.values():
pin.close()
self.pins.clear()
def pin(self, spec):
n = self._to_gpio(spec)
try:
pin = self.pins[n]
except KeyError:
pin = self.pin_class(self, n)
self.pins[n] = pin
return pin
def pin_address(self, spec):
n = self._to_gpio(spec)
return self.address + ('GPIO%d' % n,)
def _to_gpio(self, spec):
"""
Converts the pin *spec* to a GPIO port number.
"""
if not 0 <= spec < 54:
raise PinInvalidPin('invalid GPIO port %d specified (range 0..53) ' % spec)
return spec
def _get_revision(self):
raise NotImplementedError
def _get_pi_info(self):
if self._info is None:
self._info = pi_info(self._get_revision())
return self._info
def spi(self, **spi_args):
"""
Returns an SPI interface, for the specified SPI *port* and *device*, or
for the specified pins (*clock_pin*, *mosi_pin*, *miso_pin*, and
*select_pin*). Only one of the schemes can be used; attempting to mix
*port* and *device* with pin numbers will raise :exc:`SPIBadArgs`.
If the pins specified match the hardware SPI pins (clock on GPIO11,
MOSI on GPIO10, MISO on GPIO9, and chip select on GPIO8 or GPIO7), and
the spidev module can be imported, a :class:`SPIHardwareInterface`
instance will be returned. Otherwise, a :class:`SPISoftwareInterface`
will be returned which will use simple bit-banging to communicate.
Both interfaces have the same API, support clock polarity and phase
attributes, and can handle half and full duplex communications, but the
hardware interface is significantly faster (though for many things this
doesn't matter).
"""
spi_args, kwargs = self._extract_spi_args(**spi_args)
shared = 'shared' if kwargs.pop('shared', False) else 'exclusive'
if kwargs:
raise SPIBadArgs(
'unrecognized keyword argument %s' % kwargs.popitem()[0])
for port, pins in SPI_HARDWARE_PINS.items():
if all((
spi_args['clock_pin'] == pins['clock'],
spi_args['mosi_pin'] == pins['mosi'],
spi_args['miso_pin'] == pins['miso'],
spi_args['select_pin'] in pins['select'],
)):
try:
return self.spi_classes[('hardware', shared)](
self, port=port,
device=pins['select'].index(spi_args['select_pin'])
)
except Exception as e:
warnings.warn(
SPISoftwareFallback(
'failed to initialize hardware SPI, falling back to '
'software (error was: %s)' % str(e)))
break
# Convert all pin arguments to integer GPIO numbers. This is necessary
# to ensure the shared-key for shared implementations get matched
# correctly, and is a bit of a hack for the pigpio bit-bang
# implementation which just wants the pin numbers too.
spi_args = {
key: pin.number if isinstance(pin, Pin) else pin
for key, pin in spi_args.items()
}
return self.spi_classes[('software', shared)](self, **spi_args)
def _extract_spi_args(self, **kwargs):
"""
Given a set of keyword arguments, splits it into those relevant to SPI
implementations and all the rest. SPI arguments are augmented with
defaults and converted into the pin format (from the port/device
format) if necessary.
Returns a tuple of ``(spi_args, other_args)``.
"""
dev_defaults = {
'port': 0,
'device': 0,
}
default_hw = SPI_HARDWARE_PINS[dev_defaults['port']]
pin_defaults = {
'clock_pin': default_hw['clock'],
'mosi_pin': default_hw['mosi'],
'miso_pin': default_hw['miso'],
'select_pin': default_hw['select'][dev_defaults['device']],
}
spi_args = {
key: value for (key, value) in kwargs.items()
if key in pin_defaults or key in dev_defaults
}
kwargs = {
key: value for (key, value) in kwargs.items()
if key not in spi_args
}
if not spi_args:
spi_args = pin_defaults
elif set(spi_args) <= set(pin_defaults):
spi_args = {
key: self._to_gpio(spi_args.get(key, default))
for key, default in pin_defaults.items()
}
elif set(spi_args) <= set(dev_defaults):
spi_args = {
key: spi_args.get(key, default)
for key, default in dev_defaults.items()
}
if spi_args['port'] != 0:
raise SPIBadArgs('port 0 is the only valid SPI port')
selected_hw = SPI_HARDWARE_PINS[spi_args['port']]
try:
selected_hw['select'][spi_args['device']]
except IndexError:
raise SPIBadArgs(
'device must be in the range 0..%d' %
len(selected_hw['select']))
spi_args = {
key: value if key != 'select_pin' else selected_hw['select'][spi_args['device']]
for key, value in pin_defaults.items()
}
else:
raise SPIBadArgs(
'you must either specify port and device, or clock_pin, '
'mosi_pin, miso_pin, and select_pin; combinations of the two '
'schemes (e.g. port and clock_pin) are not permitted')
return spi_args, kwargs
class PiPin(Pin):
"""
Abstract base class representing a multi-function GPIO pin attached to a
Raspberry Pi. This overrides several methods in the abstract base
:class:`~gpiozero.Pin`. Descendents must override the following methods:
* :meth:`_get_function`
* :meth:`_set_function`
* :meth:`_get_state`
* :meth:`_call_when_changed`
* :meth:`_enable_event_detect`
* :meth:`_disable_event_detect`
Descendents *may* additionally override the following methods, if
applicable:
* :meth:`close`
* :meth:`output_with_state`
* :meth:`input_with_pull`
* :meth:`_set_state`
* :meth:`_get_frequency`
* :meth:`_set_frequency`
* :meth:`_get_pull`
* :meth:`_set_pull`
* :meth:`_get_bounce`
* :meth:`_set_bounce`
* :meth:`_get_edges`
* :meth:`_set_edges`
"""
def __init__(self, factory, number):
super(PiPin, self).__init__()
self._factory = factory
self._when_changed_lock = RLock()
self._when_changed = None
self._number = number
try:
factory.pi_info.physical_pin(self.address[-1])
except PinNoPins:
warnings.warn(
PinNonPhysical(
'no physical pins exist for %s' % self.address[-1]))
@property
def number(self):
return self._number
@property
def factory(self):
return self._factory
def _get_address(self):
return self.factory.address + ('GPIO%d' % self.number,)
def _call_when_changed(self):
"""
Called to fire the :attr:`when_changed` event handler; override this
in descendents if additional (currently redundant) parameters need
to be passed.
"""
method = self.when_changed()
if method is None:
self.when_changed = None
else:
method()
def _get_when_changed(self):
return self._when_changed
def _set_when_changed(self, value):
with self._when_changed_lock:
if value is None:
if self._when_changed is not None:
self._disable_event_detect()
self._when_changed = None
else:
enabled = self._when_changed is not None
# Have to take care, if value is either a closure or a bound
# method, not to keep a strong reference to the containing
# object
if isinstance(value, MethodType):
self._when_changed = WeakMethod(value)
else:
self._when_changed = ref(value)
if not enabled:
self._enable_event_detect()
def _enable_event_detect(self):
"""
Enables event detection. This is called to activate event detection on
pin :attr:`number`, watching for the specified :attr:`edges`. In
response, :meth:`_call_when_changed` should be executed.
"""
raise NotImplementedError
def _disable_event_detect(self):
"""
Disables event detection. This is called to deactivate event detection
on pin :attr:`number`.
"""
raise NotImplementedError

521
gpiozero/pins/pigpio.py Normal file
View File

@@ -0,0 +1,521 @@
from __future__ import (
unicode_literals,
absolute_import,
print_function,
division,
)
str = type('')
import os
from weakref import proxy
import pigpio
from . import SPI
from .pi import PiPin, PiFactory, SPI_HARDWARE_PINS
from .data import pi_info
from ..devices import Device
from ..mixins import SharedMixin
from ..exc import (
PinInvalidFunction,
PinSetInput,
PinFixedPull,
PinInvalidPull,
PinInvalidBounce,
PinInvalidState,
SPIBadArgs,
SPIInvalidClockMode,
)
class PiGPIOFactory(PiFactory):
"""
Uses the `pigpio`_ library to interface to the Pi's GPIO pins. The pigpio
library relies on a daemon (``pigpiod``) to be running as root to provide
access to the GPIO pins, and communicates with this daemon over a network
socket.
While this does mean only the daemon itself should control the pins, the
architecture does have several advantages:
* Pins can be remote controlled from another machine (the other
machine doesn't even have to be a Raspberry Pi; it simply needs the
`pigpio`_ client library installed on it)
* The daemon supports hardware PWM via the DMA controller
* Your script itself doesn't require root privileges; it just needs to
be able to communicate with the daemon
You can construct pigpio pins manually like so::
from gpiozero.pins.pigpio import PiGPIOPin
from gpiozero import LED
led = LED(PiGPIOPin(12))
This is particularly useful for controlling pins on a remote machine. To
accomplish this simply specify the host (and optionally port) when
constructing the pin::
from gpiozero.pins.pigpio import PiGPIOPin
from gpiozero import LED
from signal import pause
led = LED(PiGPIOPin(12, host='192.168.0.2'))
.. note::
In some circumstances, especially when playing with PWM, it does appear
to be possible to get the daemon into "unusual" states. We would be
most interested to hear any bug reports relating to this (it may be a
bug in our pin implementation). A workaround for now is simply to
restart the ``pigpiod`` daemon.
.. _pigpio: http://abyz.co.uk/rpi/pigpio/
"""
def __init__(
self, host=os.getenv('PIGPIO_ADDR', 'localhost'),
port=int(os.getenv('PIGPIO_PORT', 8888))):
super(PiGPIOFactory, self).__init__()
self.pin_class = PiGPIOPin
self.spi_classes = {
('hardware', 'exclusive'): PiGPIOHardwareSPI,
('hardware', 'shared'): PiGPIOHardwareSPIShared,
('software', 'exclusive'): PiGPIOSoftwareSPI,
('software', 'shared'): PiGPIOSoftwareSPIShared,
}
self._connection = pigpio.pi(host, port)
# Annoyingly, pigpio doesn't raise an exception when it fails to make a
# connection; it returns a valid (but disconnected) pi object
if self.connection is None:
raise IOError('failed to connect to %s:%s' % (host, port))
self._host = host
self._port = port
self._spis = []
def close(self):
super(PiGPIOFactory, self).close()
# We *have* to keep track of SPI interfaces constructed with pigpio;
# if we fail to close them they prevent future interfaces from using
# the same pins
if self.connection:
while self._spis:
self._spis[0].close()
self.connection.stop()
self._connection = None
@property
def connection(self):
# If we're shutting down, the connection may have disconnected itself
# already. Unfortunately, the connection's "connected" property is
# rather buggy - disconnecting doesn't set it to False! So we're
# naughty and check an internal variable instead...
try:
if self._connection.sl.s is not None:
return self._connection
except AttributeError:
pass
@property
def host(self):
return self._host
@property
def port(self):
return self._port
def _get_revision(self):
return self.connection.get_hardware_revision()
def _get_address(self):
return ("%s:%d" % (self.host, self.port),)
def spi(self, **spi_args):
intf = super(PiGPIOFactory, self).spi(**spi_args)
self._spis.append(intf)
return intf
class PiGPIOPin(PiPin):
_CONNECTIONS = {} # maps (host, port) to (connection, pi_info)
GPIO_FUNCTIONS = {
'input': pigpio.INPUT,
'output': pigpio.OUTPUT,
'alt0': pigpio.ALT0,
'alt1': pigpio.ALT1,
'alt2': pigpio.ALT2,
'alt3': pigpio.ALT3,
'alt4': pigpio.ALT4,
'alt5': pigpio.ALT5,
}
GPIO_PULL_UPS = {
'up': pigpio.PUD_UP,
'down': pigpio.PUD_DOWN,
'floating': pigpio.PUD_OFF,
}
GPIO_EDGES = {
'both': pigpio.EITHER_EDGE,
'rising': pigpio.RISING_EDGE,
'falling': pigpio.FALLING_EDGE,
}
GPIO_FUNCTION_NAMES = {v: k for (k, v) in GPIO_FUNCTIONS.items()}
GPIO_PULL_UP_NAMES = {v: k for (k, v) in GPIO_PULL_UPS.items()}
GPIO_EDGES_NAMES = {v: k for (k, v) in GPIO_EDGES.items()}
def __init__(self, factory, number):
super(PiGPIOPin, self).__init__(factory, number)
self._pull = 'up' if factory.pi_info.pulled_up(self.address[-1]) else 'floating'
self._pwm = False
self._bounce = None
self._callback = None
self._edges = pigpio.EITHER_EDGE
try:
self.factory.connection.set_mode(self.number, pigpio.INPUT)
except pigpio.error as e:
raise ValueError(e)
self.factory.connection.set_pull_up_down(self.number, self.GPIO_PULL_UPS[self._pull])
self.factory.connection.set_glitch_filter(self.number, 0)
def close(self):
if self.factory.connection:
self.frequency = None
self.when_changed = None
self.function = 'input'
self.pull = 'up' if self.factory.pi_info.pulled_up(self.address[-1]) else 'floating'
def _get_function(self):
return self.GPIO_FUNCTION_NAMES[self.factory.connection.get_mode(self.number)]
def _set_function(self, value):
if value != 'input':
self._pull = 'floating'
try:
self.factory.connection.set_mode(self.number, self.GPIO_FUNCTIONS[value])
except KeyError:
raise PinInvalidFunction('invalid function "%s" for pin %r' % (value, self))
def _get_state(self):
if self._pwm:
return (
self.factory.connection.get_PWM_dutycycle(self.number) /
self.factory.connection.get_PWM_range(self.number)
)
else:
return bool(self.factory.connection.read(self.number))
def _set_state(self, value):
if self._pwm:
try:
value = int(value * self.factory.connection.get_PWM_range(self.number))
if value != self.factory.connection.get_PWM_dutycycle(self.number):
self.factory.connection.set_PWM_dutycycle(self.number, value)
except pigpio.error:
raise PinInvalidState('invalid state "%s" for pin %r' % (value, self))
elif self.function == 'input':
raise PinSetInput('cannot set state of pin %r' % self)
else:
# write forces pin to OUTPUT, hence the check above
self.factory.connection.write(self.number, bool(value))
def _get_pull(self):
return self._pull
def _set_pull(self, value):
if self.function != 'input':
raise PinFixedPull('cannot set pull on non-input pin %r' % self)
if value != 'up' and self.factory.pi_info.pulled_up(self.address[-1]):
raise PinFixedPull('%r has a physical pull-up resistor' % self)
try:
self.factory.connection.set_pull_up_down(self.number, self.GPIO_PULL_UPS[value])
self._pull = value
except KeyError:
raise PinInvalidPull('invalid pull "%s" for pin %r' % (value, self))
def _get_frequency(self):
if self._pwm:
return self.factory.connection.get_PWM_frequency(self.number)
return None
def _set_frequency(self, value):
if not self._pwm and value is not None:
if self.function != 'output':
raise PinPWMFixedValue('cannot start PWM on pin %r' % self)
# NOTE: the pin's state *must* be set to zero; if it's currently
# high, starting PWM and setting a 0 duty-cycle *doesn't* bring
# the pin low; it stays high!
self.factory.connection.write(self.number, 0)
self.factory.connection.set_PWM_frequency(self.number, value)
self.factory.connection.set_PWM_range(self.number, 10000)
self.factory.connection.set_PWM_dutycycle(self.number, 0)
self._pwm = True
elif self._pwm and value is not None:
if value != self.factory.connection.get_PWM_frequency(self.number):
self.factory.connection.set_PWM_frequency(self.number, value)
self.factory.connection.set_PWM_range(self.number, 10000)
elif self._pwm and value is None:
self.factory.connection.write(self.number, 0)
self._pwm = False
def _get_bounce(self):
return None if not self._bounce else self._bounce / 1000000
def _set_bounce(self, value):
if value is None:
value = 0
elif value < 0:
raise PinInvalidBounce('bounce must be 0 or greater')
self.factory.connection.set_glitch_filter(self.number, int(value * 1000000))
def _get_edges(self):
return self.GPIO_EDGES_NAMES[self._edges]
def _set_edges(self, value):
f = self.when_changed
self.when_changed = None
try:
self._edges = self.GPIO_EDGES[value]
finally:
self.when_changed = f
def _call_when_changed(self, gpio, level, tick):
super(PiGPIOPin, self)._call_when_changed()
def _enable_event_detect(self):
self._callback = self.factory.connection.callback(
self.number, self._edges, self._call_when_changed)
def _disable_event_detect(self):
if self._callback is not None:
self._callback.cancel()
self._callback = None
class PiGPIOHardwareSPI(SPI, Device):
def __init__(self, factory, port, device):
self._port = port
self._device = device
self._factory = proxy(factory)
self._handle = None
super(PiGPIOHardwareSPI, self).__init__()
pins = SPI_HARDWARE_PINS[port]
self._reserve_pins(*(
factory.address + ('GPIO%d' % pin,)
for pin in (
pins['clock'],
pins['mosi'],
pins['miso'],
pins['select'][device]
)
))
self._spi_flags = 8 << 16
self._baud = 500000
self._handle = self._factory.connection.spi_open(
device, self._baud, self._spi_flags)
def _conflicts_with(self, other):
return not (
isinstance(other, PiGPIOHardwareSPI) and
(self._port, self._device) != (other._port, other._device)
)
def close(self):
try:
self._factory._spis.remove(self)
except (ReferenceError, ValueError):
# If the factory has died already or we're not present in its
# internal list, ignore the error
pass
if not self.closed:
self._factory.connection.spi_close(self._handle)
self._handle = None
self._release_all()
super(PiGPIOHardwareSPI, self).close()
@property
def closed(self):
return self._handle is None or self._factory.connection is None
@property
def factory(self):
return self._factory
def __repr__(self):
try:
self._check_open()
return 'SPI(port=%d, device=%d)' % (self._port, self._device)
except DeviceClosed:
return 'SPI(closed)'
def _get_clock_mode(self):
return self._spi_flags & 0x3
def _set_clock_mode(self, value):
self._check_open()
if not 0 <= value < 4:
raise SPIInvalidClockMode("%d is not a valid SPI clock mode" % value)
self._factory.connection.spi_close(self._handle)
self._spi_flags = (self._spi_flags & ~0x3) | value
self._handle = self._factory.connection.spi_open(
self._device, self._baud, self._spi_flags)
def _get_select_high(self):
return bool((self._spi_flags >> (2 + self._device)) & 0x1)
def _set_select_high(self, value):
self._check_open()
self._factory.connection.spi_close(self._handle)
self._spi_flags = (self._spi_flags & ~0x1c) | (bool(value) << (2 + self._device))
self._handle = self._factory.connection.spi_open(
self._device, self._baud, self._spi_flags)
def _get_bits_per_word(self):
return (self._spi_flags >> 16) & 0x3f
def _set_bits_per_word(self, value):
self._check_open()
self._factory.connection.spi_close(self._handle)
self._spi_flags = (self._spi_flags & ~0x3f0000) | ((value & 0x3f) << 16)
self._handle = self._factory.connection.spi_open(
self._device, self._baud, self._spi_flags)
def transfer(self, data):
self._check_open()
count, data = self._factory.connection.spi_xfer(self._handle, data)
if count < 0:
raise IOError('SPI transfer error %d' % count)
# Convert returned bytearray to list of ints. XXX Not sure how non-byte
# sized words (aux intf only) are returned ... padded to 16/32-bits?
return [int(b) for b in data]
class PiGPIOSoftwareSPI(SPI, Device):
def __init__(self, factory, clock_pin, mosi_pin, miso_pin, select_pin):
self._closed = True
self._select_pin = select_pin
self._clock_pin = clock_pin
self._mosi_pin = mosi_pin
self._miso_pin = miso_pin
self._factory = proxy(factory)
super(PiGPIOSoftwareSPI, self).__init__()
self._reserve_pins(
factory.pin_address(clock_pin),
factory.pin_address(mosi_pin),
factory.pin_address(miso_pin),
factory.pin_address(select_pin),
)
self._spi_flags = 0
self._baud = 100000
try:
self._factory.connection.bb_spi_open(
select_pin, miso_pin, mosi_pin, clock_pin,
self._baud, self._spi_flags)
# Only set after opening bb_spi; if that fails then close() will
# also fail if bb_spi_close is attempted on an un-open interface
self._closed = False
except:
self.close()
raise
def _conflicts_with(self, other):
return not (
isinstance(other, PiGPIOSoftwareSPI) and
(self._select_pin) != (other._select_pin)
)
def close(self):
try:
self._factory._spis.remove(self)
except (ReferenceError, ValueError):
# If the factory has died already or we're not present in its
# internal list, ignore the error
pass
if not self.closed:
self._closed = True
self._factory.connection.bb_spi_close(self._select_pin)
self._release_all()
super(PiGPIOSoftwareSPI, self).close()
@property
def closed(self):
return self._closed
def __repr__(self):
try:
self._check_open()
return (
'SPI(clock_pin=%d, mosi_pin=%d, miso_pin=%d, select_pin=%d)' % (
self._clock_pin, self._mosi_pin, self._miso_pin, self._select_pin
))
except DeviceClosed:
return 'SPI(closed)'
def _spi_flags(self):
return (
self._mode << 0 |
self._select_high << 2 |
self._lsb_first << 14 |
self._lsb_first << 15
)
def _get_clock_mode(self):
return self._spi_flags & 0x3
def _set_clock_mode(self, value):
self._check_open()
if not 0 <= value < 4:
raise SPIInvalidClockmode("%d is not a valid SPI clock mode" % value)
self._factory.connection.bb_spi_close(self._select_pin)
self._spi_flags = (self._spi_flags & ~0x3) | value
self._factory.connection.bb_spi_open(
self._select_pin, self._miso_pin, self._mosi_pin, self._clock_pin,
self._baud, self._spi_flags)
def _get_select_high(self):
return bool(self._spi_flags & 0x4)
def _set_select_high(self, value):
self._check_open()
self._factory.connection.bb_spi_close(self._select_pin)
self._spi_flags = (self._spi_flags & ~0x4) | (bool(value) << 2)
self._factory.connection.bb_spi_open(
self._select_pin, self._miso_pin, self._mosi_pin, self._clock_pin,
self._baud, self._spi_flags)
def _get_lsb_first(self):
return bool(self._spi_flags & 0xc000)
def _set_lsb_first(self, value):
self._check_open()
self._factory.connection.bb_spi_close(self._select_pin)
self._spi_flags = (
(self._spi_flags & ~0xc000)
| (bool(value) << 14)
| (bool(value) << 15)
)
self._factory.connection.bb_spi_open(
self._select_pin, self._miso_pin, self._mosi_pin, self._clock_pin,
self._baud, self._spi_flags)
def transfer(self, data):
self._check_open()
count, data = self._factory.connection.bb_spi_xfer(self._select_pin, data)
if count < 0:
raise IOError('SPI transfer error %d' % count)
# Convert returned bytearray to list of ints. bb_spi only supports
# byte-sized words so no issues here
return [int(b) for b in data]
class PiGPIOHardwareSPIShared(SharedMixin, PiGPIOHardwareSPI):
@classmethod
def _shared_key(cls, factory, port, device):
return (factory, port, device)
class PiGPIOSoftwareSPIShared(SharedMixin, PiGPIOSoftwareSPI):
@classmethod
def _shared_key(cls, factory, clock_pin, mosi_pin, miso_pin, select_pin):
return (factory, select_pin)

View File

@@ -1,278 +0,0 @@
from __future__ import (
unicode_literals,
absolute_import,
print_function,
division,
)
str = type('')
import warnings
import pigpio
import os
from . import Pin
from .data import pi_info
from ..exc import (
PinInvalidFunction,
PinSetInput,
PinFixedPull,
PinInvalidPull,
PinInvalidBounce,
PinInvalidState,
PinNonPhysical,
PinNoPins,
)
class PiGPIOPin(Pin):
"""
Uses the `pigpio`_ library to interface to the Pi's GPIO pins. The pigpio
library relies on a daemon (``pigpiod``) to be running as root to provide
access to the GPIO pins, and communicates with this daemon over a network
socket.
While this does mean only the daemon itself should control the pins, the
architecture does have several advantages:
* Pins can be remote controlled from another machine (the other
machine doesn't even have to be a Raspberry Pi; it simply needs the
`pigpio`_ client library installed on it)
* The daemon supports hardware PWM via the DMA controller
* Your script itself doesn't require root privileges; it just needs to
be able to communicate with the daemon
You can construct pigpiod pins manually like so::
from gpiozero.pins.pigpiod import PiGPIOPin
from gpiozero import LED
led = LED(PiGPIOPin(12))
This is particularly useful for controlling pins on a remote machine. To
accomplish this simply specify the host (and optionally port) when
constructing the pin::
from gpiozero.pins.pigpiod import PiGPIOPin
from gpiozero import LED
from signal import pause
led = LED(PiGPIOPin(12, host='192.168.0.2'))
.. note::
In some circumstances, especially when playing with PWM, it does appear
to be possible to get the daemon into "unusual" states. We would be
most interested to hear any bug reports relating to this (it may be a
bug in our pin implementation). A workaround for now is simply to
restart the ``pigpiod`` daemon.
.. _pigpio: http://abyz.co.uk/rpi/pigpio/
"""
_CONNECTIONS = {} # maps (host, port) to (connection, pi_info)
_PINS = {}
GPIO_FUNCTIONS = {
'input': pigpio.INPUT,
'output': pigpio.OUTPUT,
'alt0': pigpio.ALT0,
'alt1': pigpio.ALT1,
'alt2': pigpio.ALT2,
'alt3': pigpio.ALT3,
'alt4': pigpio.ALT4,
'alt5': pigpio.ALT5,
}
GPIO_PULL_UPS = {
'up': pigpio.PUD_UP,
'down': pigpio.PUD_DOWN,
'floating': pigpio.PUD_OFF,
}
GPIO_EDGES = {
'both': pigpio.EITHER_EDGE,
'rising': pigpio.RISING_EDGE,
'falling': pigpio.FALLING_EDGE,
}
GPIO_FUNCTION_NAMES = {v: k for (k, v) in GPIO_FUNCTIONS.items()}
GPIO_PULL_UP_NAMES = {v: k for (k, v) in GPIO_PULL_UPS.items()}
GPIO_EDGES_NAMES = {v: k for (k, v) in GPIO_EDGES.items()}
def __new__(
cls, number, host=os.getenv('PIGPIO_ADDR', 'localhost'),
port=int(os.getenv('PIGPIO_PORT', 8888))):
try:
return cls._PINS[(host, port, number)]
except KeyError:
self = super(PiGPIOPin, cls).__new__(cls)
cls.pi_info(host, port) # implicitly creates connection
self._connection, self._pi_info = cls._CONNECTIONS[(host, port)]
try:
self._pi_info.physical_pin('GPIO%d' % number)
except PinNoPins:
warnings.warn(
PinNonPhysical(
'no physical pins exist for GPIO%d' % number))
self._host = host
self._port = port
self._number = number
self._pull = 'up' if self._pi_info.pulled_up('GPIO%d' % number) else 'floating'
self._pwm = False
self._bounce = None
self._when_changed = None
self._callback = None
self._edges = pigpio.EITHER_EDGE
try:
self._connection.set_mode(self._number, pigpio.INPUT)
except pigpio.error as e:
raise ValueError(e)
self._connection.set_pull_up_down(self._number, self.GPIO_PULL_UPS[self._pull])
self._connection.set_glitch_filter(self._number, 0)
cls._PINS[(host, port, number)] = self
return self
def __repr__(self):
if self._host == 'localhost':
return "GPIO%d" % self._number
else:
return "GPIO%d on %s:%d" % (self._number, self._host, self._port)
@property
def host(self):
return self._host
@property
def port(self):
return self._port
@property
def number(self):
return self._number
def close(self):
# If we're shutting down, the connection may have disconnected itself
# already. Unfortunately, the connection's "connected" property is
# rather buggy - disconnecting doesn't set it to False! So we're
# naughty and check an internal variable instead...
if self._connection.sl.s is not None:
self.frequency = None
self.when_changed = None
self.function = 'input'
self.pull = 'up' if self._pi_info.pulled_up('GPIO%d' % self.number) else 'floating'
def _get_function(self):
return self.GPIO_FUNCTION_NAMES[self._connection.get_mode(self._number)]
def _set_function(self, value):
if value != 'input':
self._pull = 'floating'
try:
self._connection.set_mode(self._number, self.GPIO_FUNCTIONS[value])
except KeyError:
raise PinInvalidFunction('invalid function "%s" for pin %r' % (value, self))
def _get_state(self):
if self._pwm:
return (
self._connection.get_PWM_dutycycle(self._number) /
self._connection.get_PWM_range(self._number)
)
else:
return bool(self._connection.read(self._number))
def _set_state(self, value):
if self._pwm:
try:
value = int(value * self._connection.get_PWM_range(self._number))
if value != self._connection.get_PWM_dutycycle(self._number):
self._connection.set_PWM_dutycycle(self._number, value)
except pigpio.error:
raise PinInvalidState('invalid state "%s" for pin %r' % (value, self))
elif self.function == 'input':
raise PinSetInput('cannot set state of pin %r' % self)
else:
# write forces pin to OUTPUT, hence the check above
self._connection.write(self._number, bool(value))
def _get_pull(self):
return self._pull
def _set_pull(self, value):
if self.function != 'input':
raise PinFixedPull('cannot set pull on non-input pin %r' % self)
if value != 'up' and self._pi_info.pulled_up('GPIO%d' % self._number):
raise PinFixedPull('%r has a physical pull-up resistor' % self)
try:
self._connection.set_pull_up_down(self._number, self.GPIO_PULL_UPS[value])
self._pull = value
except KeyError:
raise PinInvalidPull('invalid pull "%s" for pin %r' % (value, self))
def _get_frequency(self):
if self._pwm:
return self._connection.get_PWM_frequency(self._number)
return None
def _set_frequency(self, value):
if not self._pwm and value is not None:
self._connection.set_PWM_frequency(self._number, value)
self._connection.set_PWM_range(self._number, 10000)
self._connection.set_PWM_dutycycle(self._number, 0)
self._pwm = True
elif self._pwm and value is not None:
if value != self._connection.get_PWM_frequency(self._number):
self._connection.set_PWM_frequency(self._number, value)
self._connection.set_PWM_range(self._number, 10000)
elif self._pwm and value is None:
self._connection.write(self._number, 0)
self._pwm = False
def _get_bounce(self):
return None if not self._bounce else self._bounce / 1000000
def _set_bounce(self, value):
if value is None:
value = 0
elif value < 0:
raise PinInvalidBounce('bounce must be 0 or greater')
self._connection.set_glitch_filter(self._number, int(value * 1000000))
def _get_edges(self):
return self.GPIO_EDGES_NAMES[self._edges]
def _set_edges(self, value):
f = self.when_changed
self.when_changed = None
try:
self._edges = self.GPIO_EDGES[value]
finally:
self.when_changed = f
def _get_when_changed(self):
if self._callback is None:
return None
return self._callback.callb.func
def _set_when_changed(self, value):
if self._callback is not None:
self._callback.cancel()
self._callback = None
if value is not None:
self._callback = self._connection.callback(
self._number, self._edges,
lambda gpio, level, tick: value())
@classmethod
def pi_info(
cls, host=os.getenv('PIGPIO_ADDR', 'localhost'),
port=int(os.getenv('PIGPIO_PORT', 8888))):
try:
connection, info = cls._CONNECTIONS[(host, port)]
except KeyError:
connection = pigpio.pi(host, port)
revision = '%04x' % connection.get_hardware_revision()
info = pi_info(revision)
cls._CONNECTIONS[(host, port)] = (connection, info)
return info

View File

@@ -7,10 +7,10 @@ from __future__ import (
str = type('')
import warnings
from RPi import GPIO
from . import LocalPin
from .data import pi_info
from .local import LocalPiFactory, LocalPiPin
from ..exc import (
PinInvalidFunction,
PinSetInput,
@@ -19,12 +19,10 @@ from ..exc import (
PinInvalidState,
PinInvalidBounce,
PinPWMFixedValue,
PinNonPhysical,
PinNoPins,
)
class RPiGPIOPin(LocalPin):
class RPiGPIOFactory(LocalPiFactory):
"""
Uses the `RPi.GPIO`_ library to interface to the Pi's GPIO pins. This is
the default pin implementation if the RPi.GPIO library is installed.
@@ -39,7 +37,7 @@ class RPiGPIOPin(LocalPin):
However, you can also construct RPi.GPIO pins manually if you wish::
from gpiozero.pins.rpigpio import RPiGPIOPin
from gpiozero.pins.rpigpio import RPiGPIOFactory
from gpiozero import LED
led = LED(RPiGPIOPin(12))
@@ -47,8 +45,18 @@ class RPiGPIOPin(LocalPin):
.. _RPi.GPIO: https://pypi.python.org/pypi/RPi.GPIO
"""
_PINS = {}
def __init__(self):
super(RPiGPIOFactory, self).__init__()
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
self.pin_class = RPiGPIOPin
def close(self):
super(RPiGPIOFactory, self).close()
GPIO.cleanup()
class RPiGPIOPin(LocalPiPin):
GPIO_FUNCTIONS = {
'input': GPIO.IN,
'output': GPIO.OUT,
@@ -75,69 +83,42 @@ class RPiGPIOPin(LocalPin):
GPIO_PULL_UP_NAMES = {v: k for (k, v) in GPIO_PULL_UPS.items()}
GPIO_EDGES_NAMES = {v: k for (k, v) in GPIO_EDGES.items()}
PI_INFO = None
def __new__(cls, number):
if not cls._PINS:
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
if cls.PI_INFO is None:
cls.PI_INFO = pi_info()
try:
return cls._PINS[number]
except KeyError:
self = super(RPiGPIOPin, cls).__new__(cls)
try:
cls.PI_INFO.physical_pin('GPIO%d' % number)
except PinNoPins:
warnings.warn(
PinNonPhysical(
'no physical pins exist for GPIO%d' % number))
self._number = number
self._pull = 'up' if cls.PI_INFO.pulled_up('GPIO%d' % number) else 'floating'
self._pwm = None
self._frequency = None
self._duty_cycle = None
self._bounce = -666
self._when_changed = None
self._edges = GPIO.BOTH
GPIO.setup(self._number, GPIO.IN, self.GPIO_PULL_UPS[self._pull])
cls._PINS[number] = self
return self
def __repr__(self):
return "GPIO%d" % self._number
@property
def number(self):
return self._number
def __init__(self, factory, number):
super(RPiGPIOPin, self).__init__(factory, number)
self._pull = 'up' if factory.pi_info.pulled_up(self.address[-1]) else 'floating'
self._pwm = None
self._frequency = None
self._duty_cycle = None
self._bounce = -666
self._edges = GPIO.BOTH
GPIO.setup(self.number, GPIO.IN, self.GPIO_PULL_UPS[self._pull])
def close(self):
self.frequency = None
self.when_changed = None
GPIO.cleanup(self._number)
GPIO.cleanup(self.number)
def output_with_state(self, state):
self._pull = 'floating'
GPIO.setup(self._number, GPIO.OUT, initial=state)
GPIO.setup(self.number, GPIO.OUT, initial=state)
def input_with_pull(self, pull):
if pull != 'up' and self.PI_INFO.pulled_up('GPIO%d' % self._number):
if pull != 'up' and self.factory.pi_info.pulled_up(self.address[-1]):
raise PinFixedPull('%r has a physical pull-up resistor' % self)
try:
GPIO.setup(self._number, GPIO.IN, self.GPIO_PULL_UPS[pull])
GPIO.setup(self.number, GPIO.IN, self.GPIO_PULL_UPS[pull])
self._pull = pull
except KeyError:
raise PinInvalidPull('invalid pull "%s" for pin %r' % (pull, self))
def _get_function(self):
return self.GPIO_FUNCTION_NAMES[GPIO.gpio_function(self._number)]
return self.GPIO_FUNCTION_NAMES[GPIO.gpio_function(self.number)]
def _set_function(self, value):
if value != 'input':
self._pull = 'floating'
if value in ('input', 'output') and value in self.GPIO_FUNCTIONS:
GPIO.setup(self._number, self.GPIO_FUNCTIONS[value], self.GPIO_PULL_UPS[self._pull])
GPIO.setup(self.number, self.GPIO_FUNCTIONS[value], self.GPIO_PULL_UPS[self._pull])
else:
raise PinInvalidFunction('invalid function "%s" for pin %r' % (value, self))
@@ -145,7 +126,7 @@ class RPiGPIOPin(LocalPin):
if self._pwm:
return self._duty_cycle
else:
return GPIO.input(self._number)
return GPIO.input(self.number)
def _set_state(self, value):
if self._pwm:
@@ -156,7 +137,7 @@ class RPiGPIOPin(LocalPin):
self._duty_cycle = value
else:
try:
GPIO.output(self._number, value)
GPIO.output(self.number, value)
except ValueError:
raise PinInvalidState('invalid state "%s" for pin %r' % (value, self))
except RuntimeError:
@@ -168,10 +149,10 @@ class RPiGPIOPin(LocalPin):
def _set_pull(self, value):
if self.function != 'input':
raise PinFixedPull('cannot set pull on non-input pin %r' % self)
if value != 'up' and self.PI_INFO.pulled_up('GPIO%d' % self._number):
if value != 'up' and self.factory.pi_info.pulled_up(self.address[-1]):
raise PinFixedPull('%r has a physical pull-up resistor' % self)
try:
GPIO.setup(self._number, GPIO.IN, self.GPIO_PULL_UPS[value])
GPIO.setup(self.number, GPIO.IN, self.GPIO_PULL_UPS[value])
self._pull = value
except KeyError:
raise PinInvalidPull('invalid pull "%s" for pin %r' % (value, self))
@@ -182,7 +163,7 @@ class RPiGPIOPin(LocalPin):
def _set_frequency(self, value):
if self._frequency is None and value is not None:
try:
self._pwm = GPIO.PWM(self._number, value)
self._pwm = GPIO.PWM(self.number, value)
except RuntimeError:
raise PinPWMFixedValue('cannot start PWM on pin %r' % self)
self._pwm.start(0)
@@ -221,19 +202,15 @@ class RPiGPIOPin(LocalPin):
finally:
self.when_changed = f
def _get_when_changed(self):
return self._when_changed
def _call_when_changed(self, channel):
super(RPiGPIOPin, self)._call_when_changed()
def _set_when_changed(self, value):
if self._when_changed is None and value is not None:
self._when_changed = value
GPIO.add_event_detect(
self._number, self._edges,
callback=lambda channel: self._when_changed(),
bouncetime=self._bounce)
elif self._when_changed is not None and value is None:
GPIO.remove_event_detect(self._number)
self._when_changed = None
else:
self._when_changed = value
def _enable_event_detect(self):
GPIO.add_event_detect(
self.number, self._edges,
callback=self._call_when_changed,
bouncetime=self._bounce)
def _disable_event_detect(self):
GPIO.remove_event_detect(self.number)

View File

@@ -8,11 +8,12 @@ str = type('')
import warnings
import RPIO
import RPIO.PWM
from RPIO.Exceptions import InvalidChannelException
from . import LocalPin, PINS_CLEANUP
from .local import LocalPiPin, LocalPiFactory
from .data import pi_info
from ..exc import (
PinInvalidFunction,
@@ -22,12 +23,10 @@ from ..exc import (
PinInvalidBounce,
PinInvalidState,
PinPWMError,
PinNonPhysical,
PinNoPins,
)
class RPIOPin(LocalPin):
class RPIOFactory(LocalPiFactory):
"""
Uses the `RPIO`_ library to interface to the Pi's GPIO pins. This is
the default pin implementation if the RPi.GPIO library is not installed,
@@ -48,9 +47,22 @@ class RPIOPin(LocalPin):
.. _RPIO: https://pythonhosted.org/RPIO/
"""
def __init__(self):
super(RPIOFactory, self).__init__()
RPIO.setmode(RPIO.BCM)
RPIO.setwarnings(False)
RPIO.wait_for_interrupts(threaded=True)
RPIO.PWM.setup()
RPIO.PWM.init_channel(0, 10000)
self.pin_class = RPIOPin
_PINS = {}
def close(self):
RPIO.PWM.cleanup()
RPIO.stop_waiting_for_interrupts()
RPIO.cleanup()
class RPIOPin(LocalPiPin):
GPIO_FUNCTIONS = {
'input': RPIO.IN,
'output': RPIO.OUT,
@@ -66,64 +78,31 @@ class RPIOPin(LocalPin):
GPIO_FUNCTION_NAMES = {v: k for (k, v) in GPIO_FUNCTIONS.items()}
GPIO_PULL_UP_NAMES = {v: k for (k, v) in GPIO_PULL_UPS.items()}
PI_INFO = None
def __new__(cls, number):
if not cls._PINS:
RPIO.setmode(RPIO.BCM)
RPIO.setwarnings(False)
RPIO.wait_for_interrupts(threaded=True)
RPIO.PWM.setup()
RPIO.PWM.init_channel(0, 10000)
PINS_CLEANUP.append(RPIO.PWM.cleanup)
PINS_CLEANUP.append(RPIO.stop_waiting_for_interrupts)
PINS_CLEANUP.append(RPIO.cleanup)
if cls.PI_INFO is None:
cls.PI_INFO = pi_info()
def __init__(self, factory, number):
super(RPIOPin, self).__init__(factory, number)
self._pull = 'up' if factory.pi_info.pulled_up(self.address[-1]) else 'floating'
self._pwm = False
self._duty_cycle = None
self._bounce = None
self._edges = 'both'
try:
return cls._PINS[number]
except KeyError:
self = super(RPIOPin, cls).__new__(cls)
try:
cls.PI_INFO.physical_pin('GPIO%d' % number)
except PinNoPins:
warnings.warn(
PinNonPhysical(
'no physical pins exist for GPIO%d' % number))
self._number = number
self._pull = 'up' if cls.PI_INFO.pulled_up('GPIO%d' % number) else 'floating'
self._pwm = False
self._duty_cycle = None
self._bounce = None
self._when_changed = None
self._edges = 'both'
try:
RPIO.setup(self._number, RPIO.IN, self.GPIO_PULL_UPS[self._pull])
except InvalidChannelException as e:
raise ValueError(e)
cls._PINS[number] = self
return self
def __repr__(self):
return "GPIO%d" % self._number
@property
def number(self):
return self._number
RPIO.setup(self.number, RPIO.IN, self.GPIO_PULL_UPS[self._pull])
except InvalidChannelException as e:
raise ValueError(e)
def close(self):
self.frequency = None
self.when_changed = None
RPIO.setup(self._number, RPIO.IN, RPIO.PUD_OFF)
RPIO.setup(self.number, RPIO.IN, RPIO.PUD_OFF)
def _get_function(self):
return self.GPIO_FUNCTION_NAMES[RPIO.gpio_function(self._number)]
return self.GPIO_FUNCTION_NAMES[RPIO.gpio_function(self.number)]
def _set_function(self, value):
if value != 'input':
self._pull = 'floating'
try:
RPIO.setup(self._number, self.GPIO_FUNCTIONS[value], self.GPIO_PULL_UPS[self._pull])
RPIO.setup(self.number, self.GPIO_FUNCTIONS[value], self.GPIO_PULL_UPS[self._pull])
except KeyError:
raise PinInvalidFunction('invalid function "%s" for pin %r' % (value, self))
@@ -131,23 +110,23 @@ class RPIOPin(LocalPin):
if self._pwm:
return self._duty_cycle
else:
return RPIO.input(self._number)
return RPIO.input(self.number)
def _set_state(self, value):
if not 0 <= value <= 1:
raise PinInvalidState('invalid state "%s" for pin %r' % (value, self))
if self._pwm:
RPIO.PWM.clear_channel_gpio(0, self._number)
RPIO.PWM.clear_channel_gpio(0, self.number)
if value == 0:
RPIO.output(self._number, False)
RPIO.output(self.number, False)
elif value == 1:
RPIO.output(self._number, True)
RPIO.output(self.number, True)
else:
RPIO.PWM.add_channel_pulse(0, self._number, start=0, width=int(1000 * value))
RPIO.PWM.add_channel_pulse(0, self.number, start=0, width=int(1000 * value))
self._duty_cycle = value
else:
try:
RPIO.output(self._number, value)
RPIO.output(self.number, value)
except ValueError:
raise PinInvalidState('invalid state "%s" for pin %r' % (value, self))
except RuntimeError:
@@ -159,10 +138,10 @@ class RPIOPin(LocalPin):
def _set_pull(self, value):
if self.function != 'input':
raise PinFixedPull('cannot set pull on non-input pin %r' % self)
if value != 'up' and self.PI_INFO.pulled_up('GPIO%d' % self._number):
if value != 'up' and self.factory.pi_info.pulled_up(self.address[-1]):
raise PinFixedPull('%r has a physical pull-up resistor' % self)
try:
RPIO.setup(self._number, RPIO.IN, self.GPIO_PULL_UPS[value])
RPIO.setup(self.number, RPIO.IN, self.GPIO_PULL_UPS[value])
self._pull = value
except KeyError:
raise PinInvalidPull('invalid pull "%s" for pin %r' % (value, self))
@@ -182,10 +161,10 @@ class RPIOPin(LocalPin):
self._pwm = True
# Dirty hack to get RPIO's PWM support to setup, but do nothing,
# for a given GPIO pin
RPIO.PWM.add_channel_pulse(0, self._number, start=0, width=0)
RPIO.PWM.clear_channel_gpio(0, self._number)
RPIO.PWM.add_channel_pulse(0, self.number, start=0, width=0)
RPIO.PWM.clear_channel_gpio(0, self.number)
elif self._pwm and value is None:
RPIO.PWM.clear_channel_gpio(0, self._number)
RPIO.PWM.clear_channel_gpio(0, self.number)
self._pwm = False
def _get_bounce(self):
@@ -212,25 +191,20 @@ class RPIOPin(LocalPin):
finally:
self.when_changed = f
def _get_when_changed(self):
return self._when_changed
def _call_when_changed(self, channel, value):
super(RPIOPin, self)._call_when_changed()
def _set_when_changed(self, value):
if self._when_changed is None and value is not None:
self._when_changed = value
RPIO.add_interrupt_callback(
self._number,
lambda channel, value: self._when_changed(),
self._edges, self.GPIO_PULL_UPS[self._pull], self._bounce)
elif self._when_changed is not None and value is None:
try:
RPIO.del_interrupt_callback(self._number)
except KeyError:
# Ignore this exception which occurs during shutdown; this
# simply means RPIO's built-in cleanup has already run and
# removed the handler
pass
self._when_changed = None
else:
self._when_changed = value
def _enable_event_detect(self):
RPIO.add_interrupt_callback(
self.number, self._call_when_changed, self._edges,
self.GPIO_PULL_UPS[self._pull], self._bounce)
def _disable_event_detect(self):
try:
RPIO.del_interrupt_callback(self.number)
except KeyError:
# Ignore this exception which occurs during shutdown; this
# simply means RPIO's built-in cleanup has already run and
# removed the handler
pass

98
gpiozero/pins/spi.py Normal file
View File

@@ -0,0 +1,98 @@
from __future__ import (
unicode_literals,
print_function,
absolute_import,
division,
)
str = type('')
import operator
from threading import RLock
from ..devices import Device, SharedMixin
from ..input_devices import InputDevice
from ..output_devices import OutputDevice
class SPISoftwareBus(SharedMixin, Device):
def __init__(self, clock_pin, mosi_pin, miso_pin):
self.lock = None
self.clock = None
self.mosi = None
self.miso = None
super(SPISoftwareBus, self).__init__()
self.lock = RLock()
try:
self.clock = OutputDevice(clock_pin, active_high=True)
if mosi_pin is not None:
self.mosi = OutputDevice(mosi_pin)
if miso_pin is not None:
self.miso = InputDevice(miso_pin)
except:
self.close()
raise
def close(self):
super(SPISoftwareBus, self).close()
if self.lock:
with self.lock:
if self.miso is not None:
self.miso.close()
self.miso = None
if self.mosi is not None:
self.mosi.close()
self.mosi = None
if self.clock is not None:
self.clock.close()
self.clock = None
self.lock = None
@property
def closed(self):
return self.lock is None
@classmethod
def _shared_key(cls, clock_pin, mosi_pin, miso_pin):
return (clock_pin, mosi_pin, miso_pin)
def transfer(self, data, clock_phase=False, lsb_first=False, bits_per_word=8):
"""
Writes data (a list of integer words where each word is assumed to have
:attr:`bits_per_word` bits or less) to the SPI interface, and reads an
equivalent number of words, returning them as a list of integers.
"""
result = []
with self.lock:
# See https://en.wikipedia.org/wiki/Serial_Peripheral_Interface_Bus
# (specifically the section "Example of bit-banging the master
# protocol") for a simpler C implementation of this which ignores
# clock polarity, phase, variable word-size, and multiple input
# words
if lsb_first:
shift = operator.lshift
init_mask = 1
else:
shift = operator.rshift
init_mask = 1 << (bits_per_word - 1)
for write_word in data:
mask = init_mask
read_word = 0
for _ in range(bits_per_word):
if self.mosi is not None:
self.mosi.value = bool(write_word & mask)
# read bit on clock activation
self.clock.on()
if not clock_phase:
if self.miso is not None and self.miso.value:
read_word |= mask
# read bit on clock deactivation
self.clock.off()
if clock_phase:
if self.miso is not None and self.miso.value:
read_word |= mask
mask = shift(mask, 1)
result.append(read_word)
return result

View File

@@ -1,419 +0,0 @@
from __future__ import (
unicode_literals,
print_function,
absolute_import,
division,
)
str = type('')
import warnings
import operator
from threading import RLock
try:
from spidev import SpiDev
except ImportError:
SpiDev = None
from .devices import Device, SharedMixin, _PINS, _PINS_LOCK
from .input_devices import InputDevice
from .output_devices import OutputDevice
from .exc import SPIBadArgs, SPISoftwareFallback, GPIOPinInUse, DeviceClosed
class SPIHardwareInterface(Device):
def __init__(self, port, device):
self._device = None
super(SPIHardwareInterface, self).__init__()
# XXX How can we detect conflicts with existing GPIO instances? This
# isn't ideal ... in fact, it's downright crap and doesn't guard
# against conflicts created *after* this instance, but it's all I can
# come up with right now ...
conflicts = (11, 10, 9, (8, 7)[device])
with _PINS_LOCK:
for pin in _PINS:
if pin.number in conflicts:
raise GPIOPinInUse(
'pin %r is already in use by another gpiozero object' % pin
)
self._device_num = device
self._device = SpiDev()
self._device.open(port, device)
self._device.max_speed_hz = 500000
def close(self):
if self._device:
try:
self._device.close()
finally:
self._device = None
super(SPIHardwareInterface, self).close()
@property
def closed(self):
return self._device is None
def __repr__(self):
try:
self._check_open()
return (
"hardware SPI on clock_pin=11, mosi_pin=10, miso_pin=9, "
"select_pin=%d" % (
8 if self._device_num == 0 else 7))
except DeviceClosed:
return "hardware SPI closed"
def read(self, n):
return self.transfer((0,) * n)
def write(self, data):
return len(self.transfer(data))
def transfer(self, data):
"""
Writes data (a list of integer words where each word is assumed to have
:attr:`bits_per_word` bits or less) to the SPI interface, and reads an
equivalent number of words, returning them as a list of integers.
"""
return self._device.xfer2(data)
def _get_clock_mode(self):
return self._device.mode
def _set_clock_mode(self, value):
self._device.mode = value
def _get_clock_polarity(self):
return bool(self.clock_mode & 2)
def _set_clock_polarity(self, value):
self.clock_mode = self.clock_mode & (~2) | (bool(value) << 1)
def _get_clock_phase(self):
return bool(self.clock_mode & 1)
def _set_clock_phase(self, value):
self.clock_mode = self.clock_mode & (~1) | bool(value)
def _get_lsb_first(self):
return self._device.lsbfirst
def _set_lsb_first(self, value):
self._device.lsbfirst = bool(value)
def _get_select_high(self):
return self._device.cshigh
def _set_select_high(self, value):
self._device.cshigh = bool(value)
def _get_bits_per_word(self):
return self._device.bits_per_word
def _set_bits_per_word(self, value):
self._device.bits_per_word = value
clock_polarity = property(_get_clock_polarity, _set_clock_polarity)
clock_phase = property(_get_clock_phase, _set_clock_phase)
clock_mode = property(_get_clock_mode, _set_clock_mode)
lsb_first = property(_get_lsb_first, _set_lsb_first)
select_high = property(_get_select_high, _set_select_high)
bits_per_word = property(_get_bits_per_word, _set_bits_per_word)
class SPISoftwareBus(SharedMixin, Device):
def __init__(self, clock_pin, mosi_pin, miso_pin):
self.lock = None
self.clock = None
self.mosi = None
self.miso = None
super(SPISoftwareBus, self).__init__()
self.lock = RLock()
try:
self.clock = OutputDevice(clock_pin, active_high=True)
if mosi_pin is not None:
self.mosi = OutputDevice(mosi_pin)
if miso_pin is not None:
self.miso = InputDevice(miso_pin)
except:
self.close()
raise
def close(self):
super(SPISoftwareBus, self).close()
if self.lock:
with self.lock:
if self.miso is not None:
self.miso.close()
self.miso = None
if self.mosi is not None:
self.mosi.close()
self.mosi = None
if self.clock is not None:
self.clock.close()
self.clock = None
self.lock = None
@property
def closed(self):
return self.lock is None
@classmethod
def _shared_key(cls, clock_pin, mosi_pin, miso_pin):
return (clock_pin, mosi_pin, miso_pin)
def transfer(self, data, clock_phase=False, lsb_first=False, bits_per_word=8):
"""
Writes data (a list of integer words where each word is assumed to have
:attr:`bits_per_word` bits or less) to the SPI interface, and reads an
equivalent number of words, returning them as a list of integers.
"""
result = []
with self.lock:
shift = operator.lshift if lsb_first else operator.rshift
for write_word in data:
mask = 1 if lsb_first else 1 << (bits_per_word - 1)
read_word = 0
for _ in range(bits_per_word):
if self.mosi is not None:
self.mosi.value = bool(write_word & mask)
self.clock.on()
if self.miso is not None and not clock_phase:
if self.miso.value:
read_word |= mask
self.clock.off()
if self.miso is not None and clock_phase:
if self.miso.value:
read_word |= mask
mask = shift(mask, 1)
result.append(read_word)
return result
class SPISoftwareInterface(OutputDevice):
def __init__(self, clock_pin, mosi_pin, miso_pin, select_pin):
self._bus = None
super(SPISoftwareInterface, self).__init__(select_pin, active_high=False)
try:
self._clock_phase = False
self._lsb_first = False
self._bits_per_word = 8
self._bus = SPISoftwareBus(clock_pin, mosi_pin, miso_pin)
except:
self.close()
raise
def close(self):
if self._bus:
self._bus.close()
self._bus = None
super(SPISoftwareInterface, self).close()
def __repr__(self):
try:
self._check_open()
return (
"software SPI on clock_pin=%d, mosi_pin=%d, miso_pin=%d, "
"select_pin=%d" % (
self._bus.clock.pin.number,
self._bus.mosi.pin.number,
self._bus.miso.pin.number,
self.pin.number))
except DeviceClosed:
return "software SPI closed"
def read(self, n):
return self.transfer((0,) * n)
def write(self, data):
return len(self.transfer(data))
def transfer(self, data):
with self._bus.lock:
self.on()
try:
return self._bus.transfer(
data, self._clock_phase, self._lsb_first, self._bits_per_word)
finally:
self.off()
def _get_clock_mode(self):
return (self.clock_polarity << 1) | self.clock_phase
def _set_clock_mode(self, value):
value = int(value)
if not 0 <= value <= 3:
raise ValueError('clock_mode must be a value between 0 and 3 inclusive')
self.clock_polarity = bool(value & 2)
self.clock_phase = bool(value & 1)
def _get_clock_polarity(self):
with self._bus.lock:
return not self._bus.clock.active_high
def _set_clock_polarity(self, value):
with self._bus.lock:
self._bus.clock.active_high = not value
self._bus.clock.off()
def _get_clock_phase(self):
return self._clock_phase
def _set_clock_phase(self, value):
self._clock_phase = bool(value)
def _get_lsb_first(self):
return self._lsb_first
def _set_lsb_first(self, value):
self._lsb_first = bool(value)
def _get_bits_per_word(self):
return self._bits_per_word
def _set_bits_per_word(self, value):
if value < 1:
raise ValueError('bits_per_word must be positive')
self._bits_per_word = int(value)
def _get_select_high(self):
return self.active_high
def _set_select_high(self, value):
with self._bus.lock:
self.active_high = value
self.off()
clock_polarity = property(_get_clock_polarity, _set_clock_polarity)
clock_phase = property(_get_clock_phase, _set_clock_phase)
clock_mode = property(_get_clock_mode, _set_clock_mode)
lsb_first = property(_get_lsb_first, _set_lsb_first)
bits_per_word = property(_get_bits_per_word, _set_bits_per_word)
select_high = property(_get_select_high, _set_select_high)
class SharedSPIHardwareInterface(SharedMixin, SPIHardwareInterface):
@classmethod
def _shared_key(cls, port, device):
return (port, device)
class SharedSPISoftwareInterface(SharedMixin, SPISoftwareInterface):
@classmethod
def _shared_key(cls, clock_pin, mosi_pin, miso_pin, select_pin):
return (clock_pin, mosi_pin, miso_pin, select_pin)
def extract_spi_args(**kwargs):
"""
Given a set of keyword arguments, splits it into those relevant to SPI
implementations and all the rest. SPI arguments are augmented with defaults
and converted into the pin format (from the port/device format) if
necessary.
Returns a tuple of ``(spi_args, other_args)``.
"""
pin_defaults = {
'clock_pin': 11,
'mosi_pin': 10,
'miso_pin': 9,
'select_pin': 8,
}
dev_defaults = {
'port': 0,
'device': 0,
}
spi_args = {
key: value for (key, value) in kwargs.items()
if key in pin_defaults or key in dev_defaults
}
kwargs = {
key: value for (key, value) in kwargs.items()
if key not in spi_args
}
if not spi_args:
spi_args = pin_defaults
elif set(spi_args) <= set(pin_defaults):
spi_args = {
key: spi_args.get(key, default)
for key, default in pin_defaults.items()
}
elif set(spi_args) <= set(dev_defaults):
spi_args = {
key: spi_args.get(key, default)
for key, default in dev_defaults.items()
}
if spi_args['port'] != 0:
raise SPIBadArgs('port 0 is the only valid SPI port')
if spi_args['device'] not in (0, 1):
raise SPIBadArgs('device must be 0 or 1')
spi_args = {
key: value if key != 'select_pin' else (8, 7)[spi_args['device']]
for key, value in pin_defaults.items()
}
else:
raise SPIBadArgs(
'you must either specify port and device, or clock_pin, mosi_pin, '
'miso_pin, and select_pin; combinations of the two schemes (e.g. '
'port and clock_pin) are not permitted')
return spi_args, kwargs
def SPI(**spi_args):
"""
Returns an SPI interface, for the specified SPI *port* and *device*, or for
the specified pins (*clock_pin*, *mosi_pin*, *miso_pin*, and *select_pin*).
Only one of the schemes can be used; attempting to mix *port* and *device*
with pin numbers will raise :exc:`SPIBadArgs`.
If the pins specified match the hardware SPI pins (clock on GPIO11, MOSI on
GPIO10, MISO on GPIO9, and chip select on GPIO8 or GPIO7), and the spidev
module can be imported, a :class:`SPIHardwareInterface` instance will be
returned. Otherwise, a :class:`SPISoftwareInterface` will be returned which
will use simple bit-banging to communicate.
Both interfaces have the same API, support clock polarity and phase
attributes, and can handle half and full duplex communications, but the
hardware interface is significantly faster (though for many things this
doesn't matter).
Finally, the *shared* keyword argument specifies whether the resulting
SPI interface can be repeatedly created and used by multiple devices
(useful with multi-channel devices like numerous ADCs).
"""
spi_args, kwargs = extract_spi_args(**spi_args)
shared = kwargs.pop('shared', False)
if kwargs:
raise SPIBadArgs(
'unrecognized keyword argument %s' % kwargs.popitem()[0])
if all((
spi_args['clock_pin'] == 11,
spi_args['mosi_pin'] == 10,
spi_args['miso_pin'] == 9,
spi_args['select_pin'] in (7, 8),
)):
if SpiDev is None:
warnings.warn(
SPISoftwareFallback(
'failed to import spidev, falling back to software SPI'))
else:
try:
hardware_spi_args = {
'port': 0,
'device': {8: 0, 7: 1}[spi_args['select_pin']],
}
if shared:
return SharedSPIHardwareInterface(**hardware_spi_args)
else:
return SPIHardwareInterface(**hardware_spi_args)
except Exception as e:
warnings.warn(
SPISoftwareFallback(
'failed to initialize hardware SPI, falling back to '
'software (error was: %s)' % str(e)))
if shared:
return SharedSPISoftwareInterface(**spi_args)
else:
return SPISoftwareInterface(**spi_args)

View File

@@ -16,7 +16,6 @@ except ImportError:
from .exc import DeviceClosed, SPIBadChannel
from .devices import Device
from .spi import SPI
class SPIDevice(Device):
@@ -28,13 +27,14 @@ class SPIDevice(Device):
specified with the constructor.
"""
def __init__(self, **spi_args):
self._spi = SPI(**spi_args)
self._spi = None
super(SPIDevice, self).__init__()
self._spi = self.pin_factory.spi(**spi_args)
def close(self):
if self._spi:
s = self._spi
self._spi.close()
self._spi = None
s.close()
super(SPIDevice, self).close()
@property