mirror of
https://github.com/KevinMidboe/python-gpiozero.git
synced 2025-10-29 17:50:37 +00:00
Merge pull request #410 from waveform80/remote-pi-info
Fix #354, fix #389
This commit is contained in:
@@ -37,28 +37,21 @@ You can change the default pin implementation by over-writing the
|
||||
# This will now use NativePin instead of RPiGPIOPin
|
||||
led = LED(16)
|
||||
|
||||
``pin_factory`` is simply a callable that accepts a single argument: the number
|
||||
of the pin to be constructed (this prototype *may* be expanded in future). This
|
||||
means you can define it as a function that provides additional parameters to an
|
||||
underlying class. For example, to default to creating pins with
|
||||
:class:`gpiozero.pins.pigpiod.PiGPIOPin` on a remote pi called ``remote-pi``::
|
||||
``pin_factory`` is a concrete descendent of the abstract :class:`Pin` class.
|
||||
The descendent may take additional parameters in its constructor provided they
|
||||
are optional; GPIO Zero will expect to be able to construct instances with
|
||||
nothing more than an integer pin number.
|
||||
|
||||
from gpiozero.pins.pigpiod import PiGPIOPin
|
||||
import gpiozero.devices
|
||||
However, the descendent may take default information from additional sources.
|
||||
For example, to default to creating pins with
|
||||
:class:`gpiozero.pins.pigpiod.PiGPIOPin` on a remote pi called ``remote-pi``
|
||||
you can set the :envvar:`PIGPIO_ADDR` environment variable when running your
|
||||
script::
|
||||
|
||||
def my_pin_factory(number):
|
||||
return PiGPIOPin(number, host='remote-pi')
|
||||
$ PIGPIO_ADDR=remote-pi python my_script.py
|
||||
|
||||
gpiozero.devices.pin_factory = my_pin_factory
|
||||
|
||||
from gpiozero import TrafficLights
|
||||
|
||||
# This will now use pins on remote-pi (assuming it has the
|
||||
# pigpiod daemon installed and running)
|
||||
tl = TrafficLights(13, 19, 26)
|
||||
|
||||
Alternatively, instead of passing an integer to the device constructor, you
|
||||
can pass an object derived from :class:`Pin` itself::
|
||||
It is worth noting that instead of passing an integer to device constructors,
|
||||
you can pass an object derived from :class:`Pin` itself::
|
||||
|
||||
from gpiozero.pins.native import NativePin
|
||||
from gpiozero import LED
|
||||
@@ -121,6 +114,13 @@ Abstract Pin
|
||||
:members:
|
||||
|
||||
|
||||
Local Pin
|
||||
=========
|
||||
|
||||
.. autoclass:: LocalPin
|
||||
:members:
|
||||
|
||||
|
||||
Utilities
|
||||
=========
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ from __future__ import (
|
||||
|
||||
from .pins import (
|
||||
Pin,
|
||||
LocalPin,
|
||||
)
|
||||
from .pins.data import (
|
||||
PiBoardInfo,
|
||||
|
||||
@@ -6,6 +6,9 @@ from __future__ import (
|
||||
)
|
||||
str = type('')
|
||||
|
||||
import io
|
||||
|
||||
from .data import pi_info
|
||||
from ..exc import (
|
||||
PinInvalidFunction,
|
||||
PinSetInput,
|
||||
@@ -47,6 +50,7 @@ class Pin(object):
|
||||
* :meth:`_set_edges`
|
||||
* :meth:`_get_when_changed`
|
||||
* :meth:`_set_when_changed`
|
||||
* :meth:`pi_info`
|
||||
* :meth:`output_with_state`
|
||||
* :meth:`input_with_pull`
|
||||
|
||||
@@ -243,3 +247,48 @@ class Pin(object):
|
||||
property will raise :exc:`PinEdgeDetectUnsupported`.
|
||||
""")
|
||||
|
||||
@classmethod
|
||||
def pi_info(cls):
|
||||
"""
|
||||
Returns a :class:`PiBoardInfo` instance representing the Pi that
|
||||
instances of this pin class will be attached to.
|
||||
|
||||
If the pins represented by this class are not *directly* attached to a
|
||||
Pi (e.g. the pin is attached to a board attached to the Pi, or the pins
|
||||
are not on a Pi at all), this may return ``None``.
|
||||
"""
|
||||
return None
|
||||
|
||||
|
||||
class LocalPin(Pin):
|
||||
"""
|
||||
Abstract base class representing pins attached locally to a Pi. This forms
|
||||
the base class for local-only pin interfaces (:class:`RPiGPIOPin`,
|
||||
:class:`RPIOPin`, and :class:`NativePin`).
|
||||
"""
|
||||
_PI_REVISION = None
|
||||
|
||||
@classmethod
|
||||
def pi_info(cls):
|
||||
"""
|
||||
Returns a :class:`PiBoardInfo` instance representing the local Pi.
|
||||
The Pi's revision is determined by reading :file:`/proc/cpuinfo`. If
|
||||
no valid revision is found, returns ``None``.
|
||||
"""
|
||||
# Cache the result as we can reasonably assume it won't change during
|
||||
# runtime (this is LocalPin after all; descendents that deal with
|
||||
# remote Pis should inherit from Pin instead)
|
||||
if cls._PI_REVISION is None:
|
||||
with io.open('/proc/cpuinfo', 'r') as f:
|
||||
for line in f:
|
||||
if line.startswith('Revision'):
|
||||
revision = line.split(':')[1].strip().lower()
|
||||
overvolted = revision.startswith('100')
|
||||
if overvolted:
|
||||
revision = revision[-4:]
|
||||
cls._PI_REVISION = revision
|
||||
break
|
||||
if cls._PI_REVISION is None:
|
||||
return None # something weird going on
|
||||
return pi_info(cls._PI_REVISION)
|
||||
|
||||
|
||||
@@ -324,6 +324,12 @@ class PiBoardInfo(namedtuple('PiBoardInfo', (
|
||||
a tuple, it is strongly recommended that you use the following named
|
||||
attributes to access the data contained within.
|
||||
|
||||
.. automethod:: physical_pin
|
||||
|
||||
.. automethod:: physical_pins
|
||||
|
||||
.. automethod:: pulled_up
|
||||
|
||||
.. attribute:: revision
|
||||
|
||||
A string indicating the revision of the Pi. This is unique to each
|
||||
@@ -426,7 +432,7 @@ class PiBoardInfo(namedtuple('PiBoardInfo', (
|
||||
|
||||
def physical_pins(self, function):
|
||||
"""
|
||||
Return the physical pins supporting the specified *function* as a tuple
|
||||
Return the physical pins supporting the specified *function* as tuples
|
||||
of ``(header, pin_number)`` where *header* is a string specifying the
|
||||
header containing the *pin_number*. Note that the return value is a
|
||||
:class:`set` which is not indexable. Use :func:`physical_pin` if you
|
||||
@@ -483,19 +489,6 @@ class PiBoardInfo(namedtuple('PiBoardInfo', (
|
||||
return self.headers[header][number].pull_up
|
||||
|
||||
|
||||
_PI_REVISION = None
|
||||
def _get_pi_revision():
|
||||
with io.open('/proc/cpuinfo', 'r') as f:
|
||||
for line in f:
|
||||
if line.startswith('Revision'):
|
||||
revision = line.split(':')[1].strip().lower()
|
||||
overvolted = revision.startswith('1000')
|
||||
if overvolted:
|
||||
revision = revision[4:]
|
||||
return revision
|
||||
raise IOError('unable to locate Pi revision in /proc/cpuinfo')
|
||||
|
||||
|
||||
def _parse_pi_revision(revision):
|
||||
# For new-style revisions the value's bit pattern is as follows:
|
||||
#
|
||||
@@ -510,7 +503,7 @@ def _parse_pi_revision(revision):
|
||||
# TTTTTTTT - Type (0=A, 1=B, 2=A+, 3=B+, 4=2B, 5=Alpha (??), 6=CM, 8=3B, 9=Zero)
|
||||
# RRRR - Revision (0, 1, or 2)
|
||||
if not (revision & 0x800000):
|
||||
raise ValueError('cannot parse "%x"; this is not a new-style revision' % revision)
|
||||
raise PinUnknownPi('cannot parse "%x"; this is not a new-style revision' % revision)
|
||||
try:
|
||||
model = {
|
||||
0: 'A',
|
||||
@@ -538,7 +531,7 @@ def _parse_pi_revision(revision):
|
||||
'2B': '2015Q1',
|
||||
'CM': '2014Q2',
|
||||
'3B': '2016Q1',
|
||||
'Zero': '2015Q4',
|
||||
'Zero': '2015Q4' if pcb_revision == '1.0' else '2016Q2',
|
||||
}[model]
|
||||
soc = {
|
||||
0: 'BCM2835',
|
||||
@@ -580,7 +573,7 @@ def _parse_pi_revision(revision):
|
||||
'3B': True,
|
||||
}.get(model, False)
|
||||
csi = {
|
||||
'Zero': 0,
|
||||
'Zero': 0 if pcb_revision == '0.0' else 1,
|
||||
'CM': 2,
|
||||
}.get(model, 1)
|
||||
dsi = csi
|
||||
@@ -590,7 +583,7 @@ def _parse_pi_revision(revision):
|
||||
'CM': {'SODIMM': CM_SODIMM},
|
||||
}.get(model, {'P1': PLUS_P1})
|
||||
except KeyError:
|
||||
raise ValueError('unable to parse new-style revision "%x"' % revision)
|
||||
raise PinUnknownPi('unable to parse new-style revision "%x"' % revision)
|
||||
else:
|
||||
return (
|
||||
model,
|
||||
@@ -620,20 +613,26 @@ def pi_info(revision=None):
|
||||
or ``None`` (the default), then the library will attempt to determine
|
||||
the model of Pi it is running on and return information about that.
|
||||
"""
|
||||
# cache the result as we can reasonably assume the revision of the Pi isn't
|
||||
# going to change at runtime...
|
||||
if revision is None:
|
||||
global _PI_REVISION
|
||||
if _PI_REVISION is None:
|
||||
try:
|
||||
_PI_REVISION = _get_pi_revision()
|
||||
except IOError:
|
||||
_PI_REVISION = 'unknown'
|
||||
revision = _PI_REVISION
|
||||
try:
|
||||
revision_int = int(revision, base=16)
|
||||
except ValueError:
|
||||
raise PinUnknownPi('unknown RPi revision "%s"' % revision)
|
||||
# NOTE: This import is declared locally for two reasons. Firstly it
|
||||
# avoids a circular dependency (devices->pins->pins.data->devices).
|
||||
# Secondly, pin_factory is one global which might potentially be
|
||||
# re-written by a user's script at runtime hence we should re-import
|
||||
# here in case it's changed since initialization
|
||||
from ..devices import pin_factory
|
||||
result = pin_factory.pi_info()
|
||||
if result is None:
|
||||
raise PinUnknownPi('The default pin_factory is not attached to a Pi')
|
||||
else:
|
||||
return result
|
||||
else:
|
||||
if isinstance(revision, bytes):
|
||||
revision = revision.decode('ascii')
|
||||
if isinstance(revision, str):
|
||||
revision = int(revision, base=16)
|
||||
else:
|
||||
# be nice to people passing an int (or something numeric anyway)
|
||||
revision = int(revision)
|
||||
try:
|
||||
(
|
||||
model,
|
||||
@@ -650,27 +649,24 @@ def pi_info(revision=None):
|
||||
csi,
|
||||
dsi,
|
||||
headers,
|
||||
) = PI_REVISIONS[revision_int]
|
||||
) = PI_REVISIONS[revision]
|
||||
except KeyError:
|
||||
try:
|
||||
(
|
||||
model,
|
||||
pcb_revision,
|
||||
released,
|
||||
soc,
|
||||
manufacturer,
|
||||
memory,
|
||||
storage,
|
||||
usb,
|
||||
ethernet,
|
||||
wifi,
|
||||
bluetooth,
|
||||
csi,
|
||||
dsi,
|
||||
headers,
|
||||
) = _parse_pi_revision(revision_int)
|
||||
except ValueError:
|
||||
raise PinUnknownPi('unknown RPi revision "%s"' % revision)
|
||||
(
|
||||
model,
|
||||
pcb_revision,
|
||||
released,
|
||||
soc,
|
||||
manufacturer,
|
||||
memory,
|
||||
storage,
|
||||
usb,
|
||||
ethernet,
|
||||
wifi,
|
||||
bluetooth,
|
||||
csi,
|
||||
dsi,
|
||||
headers,
|
||||
) = _parse_pi_revision(revision)
|
||||
headers = {
|
||||
header: {
|
||||
number: PinInfo(number, function, pull_up)
|
||||
@@ -679,7 +675,7 @@ def pi_info(revision=None):
|
||||
for header, header_data in headers.items()
|
||||
}
|
||||
return PiBoardInfo(
|
||||
revision,
|
||||
'%04x' % revision,
|
||||
model,
|
||||
pcb_revision,
|
||||
released,
|
||||
|
||||
@@ -16,6 +16,7 @@ except ImportError:
|
||||
from ..compat import isclose
|
||||
|
||||
from . import Pin
|
||||
from .data import pi_info
|
||||
from ..exc import PinSetInput, PinPWMUnsupported, PinFixedPull
|
||||
|
||||
|
||||
@@ -32,6 +33,10 @@ class MockPin(Pin):
|
||||
def clear_pins(cls):
|
||||
cls._PINS.clear()
|
||||
|
||||
@classmethod
|
||||
def pi_info(cls):
|
||||
return pi_info('a21041') # Pretend we're a Pi 2B
|
||||
|
||||
def __new__(cls, number):
|
||||
if not (0 <= number < 54):
|
||||
raise ValueError('invalid pin %d specified (must be 0..53)' % number)
|
||||
|
||||
@@ -17,7 +17,7 @@ from time import sleep
|
||||
from threading import Thread, Event, Lock
|
||||
from collections import Counter
|
||||
|
||||
from . import Pin, PINS_CLEANUP
|
||||
from . import LocalPin, PINS_CLEANUP
|
||||
from .data import pi_info
|
||||
from ..exc import (
|
||||
PinInvalidPull,
|
||||
@@ -149,7 +149,7 @@ class GPIOFS(object):
|
||||
f.write(str(pin).encode('ascii'))
|
||||
|
||||
|
||||
class NativePin(Pin):
|
||||
class NativePin(LocalPin):
|
||||
"""
|
||||
Uses a built-in pure Python implementation to interface to the Pi's GPIO
|
||||
pins. This is the default pin implementation if no third-party libraries
|
||||
|
||||
@@ -69,7 +69,7 @@ class PiGPIOPin(Pin):
|
||||
.. _pigpio: http://abyz.co.uk/rpi/pigpio/
|
||||
"""
|
||||
|
||||
_CONNECTIONS = {}
|
||||
_CONNECTIONS = {} # maps (host, port) to (connection, pi_info)
|
||||
_PINS = {}
|
||||
|
||||
GPIO_FUNCTIONS = {
|
||||
@@ -99,18 +99,15 @@ class PiGPIOPin(Pin):
|
||||
GPIO_PULL_UP_NAMES = {v: k for (k, v) in GPIO_PULL_UPS.items()}
|
||||
GPIO_EDGES_NAMES = {v: k for (k, v) in GPIO_EDGES.items()}
|
||||
|
||||
def __new__(cls, number, host=os.getenv('PIGPIO_ADDR', 'localhost'), port=int(os.getenv('PIGPIO_PORT', 8888))):
|
||||
def __new__(
|
||||
cls, number, host=os.getenv('PIGPIO_ADDR', 'localhost'),
|
||||
port=int(os.getenv('PIGPIO_PORT', 8888))):
|
||||
try:
|
||||
return cls._PINS[(host, port, number)]
|
||||
except KeyError:
|
||||
self = super(PiGPIOPin, cls).__new__(cls)
|
||||
try:
|
||||
self._connection, self._pi_info = cls._CONNECTIONS[(host, port)]
|
||||
except KeyError:
|
||||
self._connection = pigpio.pi(host, port)
|
||||
revision = hex(self._connection.get_hardware_revision())[2:]
|
||||
self._pi_info = pi_info(revision)
|
||||
cls._CONNECTIONS[(host, port)] = (self._connection, self._pi_info)
|
||||
cls.pi_revision(host, port) # implicitly creates connection
|
||||
self._connection, self._pi_info = cls._CONNECTIONS[(host, port)]
|
||||
try:
|
||||
self._pi_info.physical_pin('GPIO%d' % number)
|
||||
except PinNoPins:
|
||||
@@ -259,3 +256,16 @@ class PiGPIOPin(Pin):
|
||||
self._number, self._edges,
|
||||
lambda gpio, level, tick: value())
|
||||
|
||||
@classmethod
|
||||
def pi_info(
|
||||
cls, host=os.getenv('PIGPIO_ADDR', 'localhost'),
|
||||
port=int(os.getenv('PIGPIO_PORT', 8888))):
|
||||
try:
|
||||
connection, info = cls._CONNECTIONS[(host, port)]
|
||||
except KeyError:
|
||||
connection = pigpio.pi(host, port)
|
||||
revision = '%04x' % connection.get_hardware_revision()
|
||||
info = pi_info(revision)
|
||||
cls._CONNECTIONS[(host, port)] = (connection, info)
|
||||
return info
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ str = type('')
|
||||
import warnings
|
||||
from RPi import GPIO
|
||||
|
||||
from . import Pin
|
||||
from . import LocalPin
|
||||
from .data import pi_info
|
||||
from ..exc import (
|
||||
PinInvalidFunction,
|
||||
@@ -24,7 +24,7 @@ from ..exc import (
|
||||
)
|
||||
|
||||
|
||||
class RPiGPIOPin(Pin):
|
||||
class RPiGPIOPin(LocalPin):
|
||||
"""
|
||||
Uses the `RPi.GPIO`_ library to interface to the Pi's GPIO pins. This is
|
||||
the default pin implementation if the RPi.GPIO library is installed.
|
||||
|
||||
@@ -12,7 +12,7 @@ import RPIO
|
||||
import RPIO.PWM
|
||||
from RPIO.Exceptions import InvalidChannelException
|
||||
|
||||
from . import Pin, PINS_CLEANUP
|
||||
from . import LocalPin, PINS_CLEANUP
|
||||
from .data import pi_info
|
||||
from ..exc import (
|
||||
PinInvalidFunction,
|
||||
@@ -27,7 +27,7 @@ from ..exc import (
|
||||
)
|
||||
|
||||
|
||||
class RPIOPin(Pin):
|
||||
class RPIOPin(LocalPin):
|
||||
"""
|
||||
Uses the `RPIO`_ library to interface to the Pi's GPIO pins. This is
|
||||
the default pin implementation if the RPi.GPIO library is not installed,
|
||||
|
||||
90
tests/test_pins_data.py
Normal file
90
tests/test_pins_data.py
Normal file
@@ -0,0 +1,90 @@
|
||||
from __future__ import (
|
||||
unicode_literals,
|
||||
absolute_import,
|
||||
print_function,
|
||||
division,
|
||||
)
|
||||
str = type('')
|
||||
|
||||
|
||||
import pytest
|
||||
from mock import patch, MagicMock
|
||||
|
||||
import gpiozero.devices
|
||||
import gpiozero.pins.data
|
||||
import gpiozero.pins.native
|
||||
from gpiozero.pins.data import pi_info
|
||||
from gpiozero import PinMultiplePins, PinNoPins, PinUnknownPi
|
||||
|
||||
|
||||
def test_pi_revision():
|
||||
save_factory = gpiozero.devices.pin_factory
|
||||
try:
|
||||
# Can't use MockPin for this as we want something that'll actually try
|
||||
# and read /proc/cpuinfo (MockPin simply parrots the 2B's data);
|
||||
# NativePin is used as we're guaranteed to be able to import it
|
||||
gpiozero.devices.pin_factory = gpiozero.pins.native.NativePin
|
||||
with patch('io.open') as m:
|
||||
m.return_value.__enter__.return_value = ['lots of irrelevant', 'lines', 'followed by', 'Revision: 0002', 'Serial: xxxxxxxxxxx']
|
||||
assert pi_info().revision == '0002'
|
||||
# LocalPin caches the revision (because realistically it isn't going to
|
||||
# change at runtime); we need to wipe it here though
|
||||
gpiozero.pins.native.NativePin._PI_REVISION = None
|
||||
m.return_value.__enter__.return_value = ['Revision: a21042']
|
||||
assert pi_info().revision == 'a21042'
|
||||
# Check over-volting result (some argument over whether this is 7 or
|
||||
# 8 character result; make sure both work)
|
||||
gpiozero.pins.native.NativePin._PI_REVISION = None
|
||||
m.return_value.__enter__.return_value = ['Revision: 1000003']
|
||||
assert pi_info().revision == '0003'
|
||||
gpiozero.pins.native.NativePin._PI_REVISION = None
|
||||
m.return_value.__enter__.return_value = ['Revision: 100003']
|
||||
assert pi_info().revision == '0003'
|
||||
with pytest.raises(PinUnknownPi):
|
||||
m.return_value.__enter__.return_value = ['nothing', 'relevant', 'at all']
|
||||
gpiozero.pins.native.NativePin._PI_REVISION = None
|
||||
pi_info()
|
||||
with pytest.raises(PinUnknownPi):
|
||||
pi_info('0fff')
|
||||
finally:
|
||||
gpiozero.devices.pin_factory = save_factory
|
||||
|
||||
def test_pi_info():
|
||||
r = pi_info('900011')
|
||||
assert r.model == 'B'
|
||||
assert r.pcb_revision == '1.0'
|
||||
assert r.memory == 512
|
||||
assert r.manufacturer == 'Sony'
|
||||
assert r.storage == 'SD'
|
||||
assert r.usb == 2
|
||||
assert not r.wifi
|
||||
assert not r.bluetooth
|
||||
assert r.csi == 1
|
||||
assert r.dsi == 1
|
||||
with pytest.raises(PinUnknownPi):
|
||||
pi_info('9000f1')
|
||||
|
||||
def test_pi_info_other_types():
|
||||
with pytest.raises(PinUnknownPi):
|
||||
pi_info(b'9000f1')
|
||||
with pytest.raises(PinUnknownPi):
|
||||
pi_info(0x9000f1)
|
||||
|
||||
def test_physical_pins():
|
||||
# Assert physical pins for some well-known Pi's; a21041 is a Pi2B
|
||||
assert pi_info('a21041').physical_pins('3V3') == {('P1', 1), ('P1', 17)}
|
||||
assert pi_info('a21041').physical_pins('GPIO2') == {('P1', 3)}
|
||||
assert pi_info('a21041').physical_pins('GPIO47') == set()
|
||||
|
||||
def test_physical_pin():
|
||||
with pytest.raises(PinMultiplePins):
|
||||
assert pi_info('a21041').physical_pin('GND')
|
||||
assert pi_info('a21041').physical_pin('GPIO3') == ('P1', 5)
|
||||
with pytest.raises(PinNoPins):
|
||||
assert pi_info('a21041').physical_pin('GPIO47')
|
||||
|
||||
def test_pulled_up():
|
||||
assert pi_info('a21041').pulled_up('GPIO2')
|
||||
assert not pi_info('a21041').pulled_up('GPIO4')
|
||||
assert not pi_info('a21041').pulled_up('GPIO47')
|
||||
|
||||
Reference in New Issue
Block a user