diff --git a/gpiozero/__init__.py b/gpiozero/__init__.py index 255a83e..e4c0d48 100644 --- a/gpiozero/__init__.py +++ b/gpiozero/__init__.py @@ -19,6 +19,7 @@ from .exc import ( BadEventHandler, CompositeDeviceError, CompositeDeviceBadName, + CompositeDeviceBadOrder, SPIError, SPIBadArgs, EnergenieSocketMissing, diff --git a/gpiozero/compat.py b/gpiozero/compat.py index b21d0d3..8387038 100644 --- a/gpiozero/compat.py +++ b/gpiozero/compat.py @@ -49,5 +49,5 @@ def median(data): return data[n // 2] else: i = n // 2 - return (data[n - 1] + data[n]) / 2 + return (data[i - 1] + data[i]) / 2 diff --git a/gpiozero/devices.py b/gpiozero/devices.py index 5493664..c4cbd90 100644 --- a/gpiozero/devices.py +++ b/gpiozero/devices.py @@ -21,6 +21,8 @@ from .mixins import ( ) from .exc import ( DeviceClosed, + CompositeDeviceBadName, + CompositeDeviceBadOrder, GPIOPinMissing, GPIOPinInUse, GPIODeviceClosed, @@ -191,7 +193,7 @@ class GPIOBase(GPIOMeta(nstr('GPIOBase'), (), {})): method). Once a device is closed you can no longer use any other methods or properties to control or query the device. """ - return False + raise NotImplementedError def _check_open(self): if self.closed: @@ -223,7 +225,7 @@ class Device(ValuesMixin, GPIOBase): ranges (e.g. -1 to +1) and composite devices usually use tuples to return the states of all their subordinate components. """ - return 0 + raise NotImplementedError @property def is_active(self): @@ -261,8 +263,8 @@ class CompositeDevice(Device): if self._order is None: self._order = kwargs.keys() self._order = tuple(self._order) - for missing_name in set(self._order) - set(kwargs.keys()): - raise ValueError('%s missing from _order' % missing_name) + for missing_name in set(kwargs.keys()) - set(self._order): + raise CompositeDeviceBadOrder('%s missing from _order' % missing_name) super(CompositeDevice, self).__init__() for name in set(self._order) & set(dir(self)): raise CompositeDeviceBadName('%s is a reserved name' % name) @@ -372,9 +374,6 @@ class GPIODevice(Device): self._check_open() raise - def _fire_events(self): - pass - def close(self): super(GPIODevice, self).close() with _PINS_LOCK: diff --git a/gpiozero/exc.py b/gpiozero/exc.py index 96ab9a3..77e8e6d 100644 --- a/gpiozero/exc.py +++ b/gpiozero/exc.py @@ -22,6 +22,9 @@ class CompositeDeviceError(GPIOZeroError): class CompositeDeviceBadName(CompositeDeviceError, ValueError): "Error raised when a composite device is constructed with a reserved name" +class CompositeDeviceBadOrder(CompositeDeviceError, ValueError): + "Error raised when a composite device is constructed with an incomplete order" + class EnergenieSocketMissing(CompositeDeviceError, ValueError): "Error raised when socket number is not specified" diff --git a/gpiozero/input_devices.py b/gpiozero/input_devices.py index 11c325c..fc08de5 100644 --- a/gpiozero/input_devices.py +++ b/gpiozero/input_devices.py @@ -11,7 +11,7 @@ import warnings from time import sleep, time from threading import Event -from .exc import InputDeviceError, GPIODeviceError, GPIODeviceClosed +from .exc import InputDeviceError, GPIODeviceError, DeviceClosed from .devices import GPIODevice, CompositeDevice from .mixins import GPIOQueue, EventsMixin @@ -161,7 +161,7 @@ class SmoothedInputDevice(EventsMixin, InputDevice): def __repr__(self): try: self._check_open() - except GPIODeviceClosed: + except DeviceClosed: return super(SmoothedInputDevice, self).__repr__() else: if self.partial or self._queue.full.wait(0): @@ -649,7 +649,9 @@ class DistanceSensor(SmoothedInputDevice): return self.pin def _read(self): - # Make sure the echo event is clear + # Make sure the echo pin is low then ensure the echo event is clear + while self.pin.state: + sleep(0.00001) self._echo.clear() # Fire the trigger self._trigger.pin.state = True diff --git a/gpiozero/pins/mock.py b/gpiozero/pins/mock.py index 6cb0a0b..53b1023 100644 --- a/gpiozero/pins/mock.py +++ b/gpiozero/pins/mock.py @@ -8,7 +8,7 @@ str = type('') from collections import namedtuple -from time import time +from time import time, sleep from threading import Thread, Event try: from math import isclose @@ -182,7 +182,7 @@ class MockChargingPin(MockPin): """ def __init__(self, number): super(MockChargingPin, self).__init__() - self.charge_time = 0.01 + self.charge_time = 0.01 # dark charging time self._charge_stop = Event() self._charge_thread = None @@ -193,16 +193,51 @@ class MockChargingPin(MockPin): self._charge_stop.set() self._charge_thread.join() self._charge_stop.clear() - self._charge_thread = Thread(target=lambda: self._charged) + self._charge_thread = Thread(target=self._charge) self._charge_thread.start() elif value == 'output': - if self.charge_thread: + if self._charge_thread: self._charge_stop.set() self._charge_thread.join() - def _charged(self): + def _charge(self): if not self._charge_stop.wait(self.charge_time): - self.drive_high() + try: + self.drive_high() + except AssertionError: + # Charging pins are typically flipped between input and output + # repeatedly; if another thread has already flipped us to + # output ignore the assertion-error resulting from attempting + # to drive the pin high + pass + + +class MockTriggerPin(MockPin): + """ + This derivative of :class:`MockPin` is intended to be used with another + :class:`MockPin` to emulate a distance sensor. Set :attr:`echo_pin` to the + corresponding pin instance. When this pin is driven high it will trigger + the echo pin to drive high for the echo time. + """ + def __init__(self, number): + super(MockTriggerPin, self).__init__() + self.echo_pin = None + self.echo_time = 0.04 # longest echo time + self._echo_thread = None + + def _set_state(self, value): + super(MockTriggerPin, self)._set_state(value) + if value: + if self._echo_thread: + self._echo_thread.join() + self._echo_thread = Thread(target=self._echo) + self._echo_thread.start() + + def _echo(self): + sleep(0.001) + self.echo_pin.drive_high() + sleep(self.echo_time) + self.echo_pin.drive_low() class MockPWMPin(MockPin): diff --git a/gpiozero/source_tools.py b/gpiozero/source_tools.py index 4aa1b51..c7f6847 100644 --- a/gpiozero/source_tools.py +++ b/gpiozero/source_tools.py @@ -16,7 +16,7 @@ try: except ImportError: pass from itertools import count, cycle -from math import sin, cos, floor +from math import sin, cos, floor, radians try: from statistics import mean except ImportError: @@ -270,7 +270,7 @@ def sin_values(): If you require a wider range than 0 to 1, see :func:`scaled`. """ for d in cycle(range(360)): - yield sin(d) + yield sin(radians(d)) def cos_values(): @@ -292,5 +292,5 @@ def cos_values(): If you require a wider range than 0 to 1, see :func:`scaled`. """ for d in cycle(range(360)): - yield cos(d) + yield cos(radians(d)) diff --git a/gpiozero/threads.py b/gpiozero/threads.py index 805212d..8a07b8a 100644 --- a/gpiozero/threads.py +++ b/gpiozero/threads.py @@ -23,8 +23,8 @@ def _threads_shutdown(): class GPIOThread(Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): - super(GPIOThread, self).__init__(group, target, name, args, kwargs) self.stopping = Event() + super(GPIOThread, self).__init__(group, target, name, args, kwargs) self.daemon = True def start(self): diff --git a/tests/test_compat.py b/tests/test_compat.py new file mode 100644 index 0000000..a3e6e28 --- /dev/null +++ b/tests/test_compat.py @@ -0,0 +1,148 @@ +from __future__ import ( + unicode_literals, + absolute_import, + print_function, + division, + ) +str = type('') + + +import pytest +import random + +from gpiozero.compat import * + + +# ported from the official test cases; see +# https://github.com/python/cpython/blob/master/Lib/test/test_math.py for original + +NAN = float('nan') +INF = float('inf') +NINF = float('-inf') + +def test_isclose_negative_tolerances(): + with pytest.raises(ValueError): + isclose(1, 1, rel_tol=-1e-100) + with pytest.raises(ValueError): + isclose(1, 1, rel_tol=1e-100, abs_tol=-1e10) + +def test_isclose_identical(): + examples = [ + (2.0, 2.0), + (0.1e200, 0.1e200), + (1.123e-300, 1.123e-300), + (12345, 12345.0), + (0.0, -0.0), + (345678, 345678), + ] + for a, b in examples: + assert isclose(a, b, rel_tol=0.0, abs_tol=0.0) + +def test_isclose_eight_decimals(): + examples = [ + (1e8, 1e8 + 1), + (-1e-8, -1.000000009e-8), + (1.12345678, 1.12345679), + ] + for a, b in examples: + assert isclose(a, b, rel_tol=1e-8) + assert not isclose(a, b, rel_tol=1e-9) + +def test_isclose_near_zero(): + examples = [1e-9, -1e-9, -1e-150] + for a in examples: + assert not isclose(a, 0.0, rel_tol=0.9) + assert isclose(a, 0.0, abs_tol=1e-8) + +def test_isclose_inf(): + assert isclose(INF, INF) + assert isclose(INF, INF, abs_tol=0.0) + assert isclose(NINF, NINF) + assert isclose(NINF, NINF, abs_tol=0.0) + +def test_isclose_inf_ninf_nan(): + examples = [ + (NAN, NAN), + (NAN, 1e-100), + (1e-100, NAN), + (INF, NAN), + (NAN, INF), + (INF, NINF), + (INF, 1.0), + (1.0, INF), + (INF, 1e308), + (1e308, INF), + ] + for a, b in examples: + assert not isclose(a, b, abs_tol=0.999999999999999) + +def test_isclose_zero_tolerance(): + examples = [ + (1.0, 1.0), + (-3.4, -3.4), + (-1e-300, -1e-300), + ] + for a, b in examples: + assert isclose(a, b, rel_tol=0.0) + examples = [ + (1.0, 1.000000000000001), + (0.99999999999999, 1.0), + (1.0e200, .999999999999999e200), + ] + for a, b in examples: + assert not isclose(a, b, rel_tol=0.0) + +def test_isclose_assymetry(): + assert isclose(9, 10, rel_tol=0.1) + assert isclose(10, 9, rel_tol=0.1) + +def test_isclose_integers(): + examples = [ + (100000001, 100000000), + (123456789, 123456788), + ] + for a, b in examples: + assert isclose(a, b, rel_tol=1e-8) + assert not isclose(a, b, rel_tol=1e-9) + +# ported from the official test cases; see +# https://github.com/python/cpython/blob/master/Lib/test/test_statistics.py for +# original + +def test_mean(): + examples = [ + (4.8125, (0, 1, 2, 3, 3, 3, 4, 5, 5, 6, 7, 7, 7, 7, 8, 9)), + (22.015625, (17.25, 19.75, 20.0, 21.5, 21.75, 23.25, 25.125, 27.5)), + (INF, (1, 3, 5, 7, 9, INF)), + (NINF, (1, 3, 5, 7, 9, NINF)), + ] + for result, values in examples: + values = list(values) + random.shuffle(values) + assert mean(values) == result + +def test_mean_big_data(): + c = 1e9 + data = [3.4, 4.5, 4.9, 6.7, 6.8, 7.2, 8.0, 8.1, 9.4] + expected = mean(data) + c + assert expected != c + assert mean([x + c for x in data]) == expected + +def test_mean_doubled_data(): + data = [random.uniform(-3, 5) for _ in range(1000)] + expected = mean(data) + actual = mean(data * 2) + assert isclose(expected, actual) + +def test_mean_empty(): + with pytest.raises(ValueError): + mean(()) + +def test_median(): + assert median([1, 2, 3, 4, 5, 6]) == 3.5 + assert median([1, 2, 3, 4, 5, 6, 9]) == 4 + +def test_median_empty(): + with pytest.raises(ValueError): + median(()) + diff --git a/tests/test_devices.py b/tests/test_devices.py index 43b3b92..1db7fab 100644 --- a/tests/test_devices.py +++ b/tests/test_devices.py @@ -68,3 +68,60 @@ def test_device_repr_after_close(): device.close() assert repr(device) == '' +def test_device_unknown_attr(): + pin = MockPin(2) + device = GPIODevice(pin) + with pytest.raises(AttributeError): + device.foo = 1 + +def test_device_context_manager(): + pin = MockPin(2) + with GPIODevice(pin) as device: + assert not device.closed + assert device.closed + +def test_composite_device_sequence(): + device = CompositeDevice( + InputDevice(MockPin(2)), + InputDevice(MockPin(3)) + ) + assert len(device) == 2 + assert device[0].pin.number == 2 + assert device[1].pin.number == 3 + assert device.tuple._fields == ('_0', '_1') + +def test_composite_device_values(): + device = CompositeDevice( + InputDevice(MockPin(2)), + InputDevice(MockPin(3)) + ) + assert device.value == (0, 0) + assert not device.is_active + device[0].pin.drive_high() + assert device.value == (1, 0) + assert device.is_active + +def test_composite_device_named(): + device = CompositeDevice( + foo=InputDevice(MockPin(2)), + bar=InputDevice(MockPin(3)), + _order=('foo', 'bar') + ) + assert device.tuple._fields == ('foo', 'bar') + assert device.value == (0, 0) + assert not device.is_active + +def test_composite_device_bad_init(): + with pytest.raises(ValueError): + CompositeDevice(foo=1, bar=2, _order=('foo',)) + with pytest.raises(ValueError): + CompositeDevice(close=1) + +def test_composite_device_read_only(): + device = CompositeDevice( + foo=InputDevice(MockPin(2)), + bar=InputDevice(MockPin(3)) + ) + with pytest.raises(AttributeError): + device.foo = 1 + diff --git a/tests/test_inputs.py b/tests/test_inputs.py index ec01e56..1e92879 100644 --- a/tests/test_inputs.py +++ b/tests/test_inputs.py @@ -7,11 +7,16 @@ from __future__ import ( str = type('') +import sys import pytest -import mock from threading import Event -from gpiozero.pins.mock import MockPin, MockPulledUpPin, MockChargingPin +from gpiozero.pins.mock import ( + MockPin, + MockPulledUpPin, + MockChargingPin, + MockTriggerPin, + ) from gpiozero import * @@ -136,12 +141,43 @@ def test_input_motion_sensor(): assert sensor.wait_for_no_motion(1) assert not sensor.motion_detected -@pytest.mark.skipif(True, reason='Freezes') +@pytest.mark.skipif(hasattr(sys, 'pypy_version_info'), + reason='timing is too random on pypy') def test_input_light_sensor(): pin = MockChargingPin(2) sensor = LightSensor(pin) - pin.charge_time = 1 - assert not sensor.light_detected - pin.charge_time = 0 - assert sensor.light_detected + pin.charge_time = 0.1 + assert sensor.wait_for_dark(1) + pin.charge_time = 0.0 + assert sensor.wait_for_light(1) + +@pytest.mark.skipif(hasattr(sys, 'pypy_version_info'), + reason='timing is too random on pypy') +def test_input_distance_sensor(): + echo_pin = MockPin(2) + trig_pin = MockTriggerPin(3) + trig_pin.echo_pin = echo_pin + trig_pin.echo_time = 0.02 + with pytest.raises(ValueError): + DistanceSensor(echo_pin, trig_pin, max_distance=-1) + # normal queue len is large (because the sensor is *really* jittery) but + # we want quick tests and we've got precisely controlled pins :) + sensor = DistanceSensor(echo_pin, trig_pin, queue_len=5, max_distance=1) + assert sensor.max_distance == 1 + assert sensor.trigger is trig_pin + assert sensor.echo is echo_pin + assert sensor.wait_for_out_of_range(1) + assert not sensor.in_range + assert sensor.distance == 1.0 # should be waay before max-distance so this should work + trig_pin.echo_time = 0.0 + assert sensor.wait_for_in_range(1) + assert sensor.in_range + assert sensor.distance < sensor.threshold_distance # depending on speed of machine, may not reach 0 here + sensor.threshold_distance = 0.1 + assert sensor.threshold_distance == 0.1 + with pytest.raises(ValueError): + sensor.max_distance = -1 + sensor.max_distance = 20 + assert sensor.max_distance == 20 + assert sensor.threshold_distance == 0.1 diff --git a/tests/test_source_tools.py b/tests/test_source_tools.py new file mode 100644 index 0000000..9ba32cb --- /dev/null +++ b/tests/test_source_tools.py @@ -0,0 +1,88 @@ +from __future__ import ( + unicode_literals, + absolute_import, + print_function, + division, + ) +str = type('') + + +import pytest +from math import sin, cos, radians +from time import time + +from gpiozero import * +try: + from math import isclose +except ImportError: + from gpiozero.compat import isclose + + +def test_negated(): + assert list(negated(())) == [] + assert list(negated((True, True, False, False))) == [False, False, True, True] + +def test_inverted(): + assert list(inverted(())) == [] + assert list(inverted((1, 0, 0.1, 0.5))) == [0, 1, 0.9, 0.5] + +def test_scaled(): + assert list(scaled((0, 1, 0.5, 0.1), 0, 1)) == [0, 1, 0.5, 0.1] + assert list(scaled((0, 1, 0.5, 0.1), -1, 1)) == [-1, 1, 0.0, -0.8] + +def test_clamped(): + assert list(clamped((-1, 0, 1, 2))) == [0, 0, 1, 1] + +def test_quantized(): + assert list(quantized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1), 4)) == [ + 0.0, 0.0, 0.0, 0.25, 0.25, 0.5, 0.5, 0.5, 0.75, 0.75, 1.0] + assert list(quantized((0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1), 5)) == [ + 0.0, 0.0, 0.2, 0.2, 0.4, 0.4, 0.6, 0.6, 0.8, 0.8, 1.0] + +def test_conjunction(): + assert list(conjunction(())) == [] + assert list(conjunction((False, True))) == [False, True] + assert list(conjunction((0, 1, 0, 1), (0, 0, 0, 1))) == [0, 0, 0, 1] + +def test_disjunction(): + assert list(disjunction(())) == [] + assert list(disjunction((False, True))) == [False, True] + assert list(disjunction((0, 1, 0, 1), (0, 0, 0, 1))) == [0, 1, 0, 1] + +def test_averaged(): + assert list(averaged(())) == [] + assert list(averaged((0, 0.5, 1), (1, 1, 1))) == [0.5, 0.75, 1] + +def test_queued(): + assert list(queued((1, 2, 3, 4, 5), 5)) == [1] + assert list(queued((1, 2, 3, 4, 5, 6), 5)) == [1, 2] + +def test_pre_delayed(): + start = time() + for v in pre_delayed((0, 0, 0), 0.01): + assert v == 0 + assert time() - start >= 0.01 + start = time() + +def test_post_delayed(): + start = time() + for v in post_delayed((1, 2, 2), 0.01): + if v == 1: + assert time() - start < 0.01 + else: + assert time() - start >= 0.01 + start = time() + assert time() - start >= 0.01 + +def test_random_values(): + for i, v in zip(range(1000), random_values()): + assert 0 <= v <= 1 + +def test_sin_values(): + for i, v in zip(range(1000), sin_values()): + assert isclose(v, sin(radians(i)), abs_tol=1e-9) + +def test_cos_values(): + for i, v in zip(range(1000), cos_values()): + assert isclose(v, cos(radians(i)), abs_tol=1e-9) +