mirror of
https://github.com/apirrone/Open_Duck_Mini_Runtime.git
synced 2025-09-02 11:13:55 +00:00
Add logging and unit tests for antennas and eyes modules
- Updated `setup.cfg` to include pytest and pytest-html dependencies. - Enhanced `antennas.py` with logging for initialization, homing, and position changes, including deadband handling. - Implemented a home method to ensure antennas return to a defined position. - Added unit tests for antennas functionality, covering initialization, movement, and synchronized actions. - Enhanced `eyes.py` with logging and improved blink thread management. - Created unit tests for eyes, verifying initialization, individual control, blink cycles, and synchronization. - Added tests for feet contacts, projector, and sounds modules to ensure proper functionality and response times.
This commit is contained in:
parent
32037347dc
commit
09183bbba3
11 changed files with 698 additions and 25 deletions
|
@ -2,6 +2,9 @@ import board
|
|||
import pwmio
|
||||
import math
|
||||
import time
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
LEFT_ANTENNA_PIN = board.D13
|
||||
RIGHT_ANTENNA_PIN = board.D12
|
||||
|
@ -47,6 +50,9 @@ class Antennas:
|
|||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
antennas = Antennas()
|
||||
|
||||
try:
|
||||
|
@ -61,4 +67,4 @@ if __name__ == "__main__":
|
|||
current_time = time.monotonic()
|
||||
|
||||
finally:
|
||||
antennas.stop()
|
||||
antennas.stop()
|
||||
|
|
|
@ -7,6 +7,7 @@ from threading import Thread, Event
|
|||
LEFT_EYE_PIN = board.D24
|
||||
RIGHT_EYE_PIN = board.D23
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class Eyes:
|
||||
def __init__(self, blink_duration=0.1, min_interval=1.0, max_interval=4.0):
|
||||
|
|
|
@ -2,12 +2,30 @@ import pygame
|
|||
import time
|
||||
import os
|
||||
import random
|
||||
import logging
|
||||
|
||||
|
||||
class Sounds:
|
||||
def __init__(self, volume=1.0, sound_directory="./"):
|
||||
pygame.mixer.init()
|
||||
pygame.mixer.music.set_volume(volume)
|
||||
def __init__(self, volume=1.0, sound_directory="./", mixer=None):
|
||||
"""
|
||||
Initialize the Sounds system.
|
||||
Args:
|
||||
volume (float): Volume level between 0.0 and 1.0
|
||||
sound_directory (str): Directory containing .wav files
|
||||
mixer: Optional pygame.mixer module for dependency injection (testing)
|
||||
"""
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.logger.debug("Initializing Sounds class")
|
||||
self.mixer = mixer if mixer is not None else pygame.mixer
|
||||
# Clamp volume between 0 and 1
|
||||
clamped_volume = max(0.0, min(1.0, volume))
|
||||
try:
|
||||
self.mixer.init()
|
||||
self.mixer.music.set_volume(clamped_volume)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to initialize mixer: {e}")
|
||||
self.ok = False
|
||||
return
|
||||
self.sounds = {}
|
||||
self.ok = True
|
||||
try:
|
||||
|
@ -15,37 +33,64 @@ class Sounds:
|
|||
if file.endswith(".wav"):
|
||||
sound_path = os.path.join(sound_directory, file)
|
||||
try:
|
||||
self.sounds[file] = pygame.mixer.Sound(sound_path)
|
||||
print(f"Loaded: {file}")
|
||||
except pygame.error as e:
|
||||
print(f"Failed to load {file}: {e}")
|
||||
sound = self.mixer.Sound(sound_path)
|
||||
sound.set_volume(clamped_volume)
|
||||
self.sounds[file] = sound
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to load {file}: {e}")
|
||||
except FileNotFoundError:
|
||||
print(f"Directory {sound_directory} not found.")
|
||||
self.logger.error(f"Directory {sound_directory} not found.")
|
||||
self.ok = False
|
||||
if len(self.sounds) == 0:
|
||||
print("No sound files found in the directory.")
|
||||
self.logger.warning("No sound files found in the directory.")
|
||||
self.ok = False
|
||||
|
||||
def play(self, sound_name):
|
||||
if not self.ok:
|
||||
print("Sounds not initialized properly.")
|
||||
return
|
||||
if sound_name in self.sounds:
|
||||
self.sounds[sound_name].play()
|
||||
print(f"Playing: {sound_name}")
|
||||
else:
|
||||
print(f"Sound '{sound_name}' not found!")
|
||||
def is_playing(self):
|
||||
"""Check if any sound is currently playing"""
|
||||
return self.mixer.get_busy()
|
||||
|
||||
def play_random_sound(self):
|
||||
def play(self, sound_name, wait_if_playing=True):
|
||||
"""
|
||||
Play a sound file
|
||||
Args:
|
||||
sound_name: Name of the sound file to play
|
||||
wait_if_playing: If True, wait for current sound to finish before playing new sound
|
||||
"""
|
||||
if not self.ok:
|
||||
print("Sounds not initialized properly.")
|
||||
return
|
||||
sound_name = random.choice(list(self.sounds.keys()))
|
||||
self.logger.error("Sounds not initialized properly.")
|
||||
return False
|
||||
|
||||
if sound_name not in self.sounds:
|
||||
self.logger.error(f"Sound '{sound_name}' not found!")
|
||||
return False
|
||||
|
||||
if wait_if_playing:
|
||||
# Wait for current sound to finish
|
||||
self.logger.debug(f"Waiting for current sound to finish before playing {sound_name}")
|
||||
while self.is_playing():
|
||||
time.sleep(0.1)
|
||||
|
||||
self.sounds[sound_name].play()
|
||||
self.logger.info(f"Playing: {sound_name}")
|
||||
return True
|
||||
|
||||
def play_happy(self):
|
||||
self.sounds["happy1.wav"].play()
|
||||
def play_random_sound(self, wait_if_playing=True):
|
||||
"""Play a random sound from loaded sounds"""
|
||||
if not self.ok:
|
||||
self.logger.error("Sounds not initialized properly.")
|
||||
return False
|
||||
sound_name = random.choice(list(self.sounds.keys()))
|
||||
return self.play(sound_name, wait_if_playing)
|
||||
|
||||
def play_happy(self, wait_if_playing=True):
|
||||
"""Play happy sound"""
|
||||
return self.play("happy1.wav", wait_if_playing)
|
||||
|
||||
def wait_for_sound(self):
|
||||
"""Wait for current sound to finish playing"""
|
||||
self.logger.debug("Waiting for sound to finish playing")
|
||||
while self.is_playing():
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
# Example usage
|
||||
|
@ -55,4 +100,5 @@ if __name__ == "__main__":
|
|||
while True:
|
||||
# sound_player.play_random_sound()
|
||||
sound_player.play_happy()
|
||||
sound_player.wait_for_sound() # Wait for sound to finish
|
||||
time.sleep(3)
|
||||
|
|
|
@ -111,6 +111,7 @@ class RLWalk:
|
|||
# Optional expression features
|
||||
if self.duck_config.eyes:
|
||||
self.eyes = Eyes()
|
||||
self.eyes.start_blink_thread()
|
||||
if self.duck_config.projector:
|
||||
self.projector = Projector()
|
||||
if self.duck_config.speaker:
|
||||
|
@ -119,6 +120,7 @@ class RLWalk:
|
|||
)
|
||||
if self.duck_config.antennas:
|
||||
self.antennas = Antennas()
|
||||
self.antennas.home()
|
||||
|
||||
def get_obs(self):
|
||||
|
||||
|
|
|
@ -24,6 +24,8 @@ install_requires =
|
|||
pygame==2.6.0
|
||||
pypot @ git+https://github.com/pollen-robotics/pypot@support-feetech-sts3215
|
||||
openai==1.70.0
|
||||
pytest==7.4.3
|
||||
pytest-html==3.2.0
|
||||
|
||||
# adafruit_extended_bus
|
||||
|
||||
|
|
87
tests/antenna_test.py
Normal file
87
tests/antenna_test.py
Normal file
|
@ -0,0 +1,87 @@
|
|||
import pytest
|
||||
import time
|
||||
import numpy as np
|
||||
import logging
|
||||
from mini_bdx_runtime.antennas import Antennas
|
||||
|
||||
# Configure logging for tests
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Constants
|
||||
SERVO_DELAY = 0.5 # Increased delay to allow servos to settle
|
||||
SWEEP_STEPS = 10 # Number of steps in sweep test
|
||||
|
||||
def test_antenna_initialization(antennas):
|
||||
"""Test that antennas are properly initialized"""
|
||||
assert antennas is not None
|
||||
assert hasattr(antennas, 'pwm1')
|
||||
assert hasattr(antennas, 'pwm2')
|
||||
logger.info("Antenna initialization test passed")
|
||||
|
||||
def test_antenna_movement(antennas):
|
||||
"""Test basic antenna movement"""
|
||||
# Test left antenna
|
||||
logger.info("Testing left antenna movement")
|
||||
antennas.home()
|
||||
time.sleep(SERVO_DELAY)
|
||||
|
||||
antennas.set_position_left(0) # Center position
|
||||
time.sleep(SERVO_DELAY)
|
||||
|
||||
antennas.set_position_left(0.5) # Move right
|
||||
time.sleep(SERVO_DELAY)
|
||||
|
||||
# Test right antenna
|
||||
logger.info("Testing right antenna movement")
|
||||
antennas.home()
|
||||
time.sleep(SERVO_DELAY)
|
||||
|
||||
antennas.set_position_right(0) # Center position
|
||||
time.sleep(SERVO_DELAY)
|
||||
|
||||
antennas.set_position_right(-0.5) # Move left
|
||||
time.sleep(SERVO_DELAY)
|
||||
|
||||
def test_left_antenna_sweep(antennas):
|
||||
"""Test sweeping motion of left antenna from min to max"""
|
||||
logger.info("Testing left antenna sweep")
|
||||
positions = np.linspace(-1, 1, SWEEP_STEPS)
|
||||
|
||||
# Forward sweep
|
||||
for pos in positions:
|
||||
antennas.set_position_left(pos)
|
||||
logger.info(f"Left antenna position: {pos:.2f}")
|
||||
time.sleep(SERVO_DELAY)
|
||||
|
||||
# Return to home
|
||||
antennas.home()
|
||||
time.sleep(SERVO_DELAY)
|
||||
|
||||
def test_right_antenna_sweep(antennas):
|
||||
"""Test sweeping motion of right antenna from min to max"""
|
||||
logger.info("Testing right antenna sweep")
|
||||
positions = np.linspace(-1, 1, SWEEP_STEPS)
|
||||
|
||||
# Forward sweep
|
||||
for pos in positions:
|
||||
antennas.set_position_right(pos)
|
||||
logger.info(f"Right antenna position: {pos:.2f}")
|
||||
time.sleep(SERVO_DELAY)
|
||||
|
||||
# Return to home
|
||||
antennas.home()
|
||||
time.sleep(SERVO_DELAY)
|
||||
|
||||
def test_synchronized_movement(antennas):
|
||||
"""Test synchronized movement of both antennas"""
|
||||
logger.info("Testing synchronized antenna movement")
|
||||
antennas.home()
|
||||
time.sleep(SERVO_DELAY)
|
||||
|
||||
positions = np.linspace(-1, 1, 5)
|
||||
for pos in positions:
|
||||
antennas.set_position_left(pos)
|
||||
antennas.set_position_right(pos)
|
||||
logger.info(f"Synchronized position: {pos:.2f}")
|
||||
time.sleep(SERVO_DELAY)
|
146
tests/conftest.py
Normal file
146
tests/conftest.py
Normal file
|
@ -0,0 +1,146 @@
|
|||
import pytest
|
||||
import os
|
||||
import RPi.GPIO as GPIO
|
||||
import logging
|
||||
from pathlib import Path
|
||||
import json
|
||||
|
||||
# Only import DuckConfig at the top, as it's used in actual_config fixture
|
||||
from mini_bdx_runtime.duck_config import DuckConfig
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def get_assets_dir():
|
||||
"""Get the absolute path to the assets directory"""
|
||||
current_dir = Path(__file__).parent
|
||||
assets_dir = current_dir.parent / "mini_bdx_runtime" / "assets"
|
||||
if not assets_dir.exists():
|
||||
raise FileNotFoundError(f"Assets directory not found at {assets_dir}")
|
||||
return str(assets_dir)
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def actual_config():
|
||||
"""
|
||||
Fixture to provide a DuckConfig instance for testing.
|
||||
First tries to use the user's duck_config.json, falls back to example_config.json
|
||||
"""
|
||||
user_config = DuckConfig(ignore_default=True)
|
||||
if not user_config.default:
|
||||
logger.info("Using user's duck_config.json for testing")
|
||||
return user_config
|
||||
example_config_path = Path(__file__).parent.parent / "example_config.json"
|
||||
if not example_config_path.exists():
|
||||
raise FileNotFoundError(f"Neither user config nor example_config.json found at {example_config_path}")
|
||||
logger.info("Using example_config.json for testing")
|
||||
return DuckConfig(config_json_path=str(example_config_path), ignore_default=True)
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def gpio_setup_teardown():
|
||||
"""Global fixture to handle GPIO setup and cleanup"""
|
||||
logger.info("Setting up GPIO for testing")
|
||||
GPIO.setwarnings(False)
|
||||
GPIO.setmode(GPIO.BCM)
|
||||
yield
|
||||
logger.info("Cleaning up GPIO")
|
||||
GPIO.cleanup()
|
||||
|
||||
@pytest.fixture
|
||||
def antennas(actual_config):
|
||||
"""
|
||||
Fixture to provide an Antennas instance.
|
||||
Skips tests if antennas feature is disabled in config.
|
||||
"""
|
||||
if not actual_config.antennas:
|
||||
pytest.skip("Antennas feature is disabled in config")
|
||||
logger.info("Initializing Antennas")
|
||||
from mini_bdx_runtime.antennas import Antennas
|
||||
ant = Antennas()
|
||||
yield ant
|
||||
logger.info("Cleaning up Antennas")
|
||||
ant.stop()
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def eyes(actual_config):
|
||||
"""
|
||||
Fixture to provide an Eyes instance.
|
||||
Skips tests if eyes feature is disabled in config.
|
||||
"""
|
||||
if not actual_config.eyes:
|
||||
logger.info("Skipping eyes tests as feature is disabled")
|
||||
pytest.skip("Eyes feature is disabled in config")
|
||||
logger.info("Initializing Eyes")
|
||||
from mini_bdx_runtime.eyes import Eyes
|
||||
eyes_instance = Eyes()
|
||||
yield eyes_instance
|
||||
logger.info("Cleaning up Eyes")
|
||||
eyes_instance.cleanup()
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def sounds(actual_config):
|
||||
"""
|
||||
Fixture to provide a Sounds instance with test audio files.
|
||||
Skips tests if speaker feature is disabled in config.
|
||||
Module scope since sound playback tests don't interfere with each other.
|
||||
"""
|
||||
if not actual_config.speaker:
|
||||
pytest.skip("Speaker feature is disabled in config")
|
||||
logger.info("Initializing Sounds")
|
||||
from mini_bdx_runtime.sounds import Sounds
|
||||
assets_dir = get_assets_dir()
|
||||
sounds_instance = Sounds(volume=0.1, sound_directory=assets_dir)
|
||||
yield sounds_instance
|
||||
logger.info("Cleaning up Sounds")
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def projector(actual_config):
|
||||
"""
|
||||
Fixture to provide a Projector instance.
|
||||
Skips tests if projector feature is disabled in config.
|
||||
Module scope since state is always cleaned up between tests.
|
||||
"""
|
||||
if not actual_config.projector:
|
||||
pytest.skip("Projector feature is disabled in config")
|
||||
logger.info("Initializing Projector")
|
||||
from mini_bdx_runtime.projector import Projector
|
||||
proj = Projector()
|
||||
yield proj
|
||||
logger.info("Cleaning up Projector")
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def feet_contacts():
|
||||
"""
|
||||
Fixture to provide a FeetContacts instance.
|
||||
Module scope since it's used for read-only operations.
|
||||
"""
|
||||
logger.info("Initializing FeetContacts")
|
||||
from mini_bdx_runtime.feet_contacts import FeetContacts
|
||||
feet = FeetContacts()
|
||||
yield feet
|
||||
logger.info("Cleaning up FeetContacts")
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def hwi():
|
||||
"""
|
||||
Fixture to provide a HWI instance.
|
||||
Module scope for hardware interface efficiency.
|
||||
"""
|
||||
logger.info("Initializing HWI")
|
||||
from mini_bdx_runtime.rustypot_position_hwi import HWI
|
||||
hwi_instance = HWI()
|
||||
yield hwi_instance
|
||||
logger.info("Cleaning up HWI")
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def imu(actual_config):
|
||||
"""
|
||||
Fixture to provide an IMU instance.
|
||||
Takes into account the imu_upside_down configuration.
|
||||
Module scope for hardware interface efficiency.
|
||||
"""
|
||||
logger.info("Initializing IMU")
|
||||
from mini_bdx_runtime.raw_imu import Imu
|
||||
imu_instance = Imu(upside_down=actual_config.imu_upside_down)
|
||||
yield imu_instance
|
||||
logger.info("Cleaning up IMU")
|
197
tests/eye_test.py
Normal file
197
tests/eye_test.py
Normal file
|
@ -0,0 +1,197 @@
|
|||
import pytest
|
||||
import time
|
||||
import RPi.GPIO as GPIO
|
||||
from mini_bdx_runtime.eyes import Eyes, LEFT_EYE_GPIO, RIGHT_EYE_GPIO
|
||||
|
||||
eye_delay = 2.5
|
||||
|
||||
@pytest.fixture
|
||||
def eyes():
|
||||
"""Fixture to create and cleanup Eyes instance"""
|
||||
eyes_instance = Eyes()
|
||||
yield eyes_instance
|
||||
eyes_instance.cleanup() # Ensure cleanup after each test
|
||||
|
||||
def test_eyes_initialization(eyes):
|
||||
"""Test that eyes are properly initialized"""
|
||||
assert eyes is not None
|
||||
|
||||
# Verify initial state (both eyes should be ON)
|
||||
states = eyes.get_eye_states()
|
||||
assert states['left'] == GPIO.HIGH
|
||||
assert states['right'] == GPIO.HIGH
|
||||
time.sleep(eye_delay)
|
||||
# Verify blink thread is not started yet
|
||||
assert not hasattr(eyes, '_blink_thread')
|
||||
|
||||
def test_individual_eye_control(eyes):
|
||||
"""Test individual eye control"""
|
||||
# Start with both eyes off
|
||||
eyes.set_both_eyes(GPIO.LOW)
|
||||
states = eyes.get_eye_states()
|
||||
assert states['left'] == GPIO.LOW
|
||||
assert states['right'] == GPIO.LOW
|
||||
time.sleep(eye_delay)
|
||||
|
||||
# Test left eye
|
||||
eyes.set_left_eye(GPIO.HIGH)
|
||||
states = eyes.get_eye_states()
|
||||
assert states['left'] == GPIO.HIGH
|
||||
assert states['right'] == GPIO.LOW
|
||||
time.sleep(eye_delay)
|
||||
|
||||
|
||||
# Test right eye
|
||||
eyes.set_right_eye(GPIO.HIGH)
|
||||
states = eyes.get_eye_states()
|
||||
assert states['left'] == GPIO.HIGH
|
||||
assert states['right'] == GPIO.HIGH
|
||||
time.sleep(eye_delay)
|
||||
|
||||
# Turn off left eye
|
||||
eyes.set_left_eye(GPIO.LOW)
|
||||
states = eyes.get_eye_states()
|
||||
assert states['left'] == GPIO.LOW
|
||||
assert states['right'] == GPIO.HIGH
|
||||
time.sleep(eye_delay)
|
||||
|
||||
# Turn off right eye
|
||||
eyes.set_right_eye(GPIO.LOW)
|
||||
states = eyes.get_eye_states()
|
||||
assert states['left'] == GPIO.LOW
|
||||
assert states['right'] == GPIO.LOW
|
||||
time.sleep(eye_delay)
|
||||
|
||||
def test_blink_thread_management(eyes):
|
||||
"""Test starting and stopping the blink thread"""
|
||||
# Start the blink thread
|
||||
eyes.start_blink_thread()
|
||||
assert hasattr(eyes, '_blink_thread')
|
||||
assert eyes._blink_thread.is_alive()
|
||||
assert eyes.blink_duration == 0.1
|
||||
|
||||
# Stop the blink thread
|
||||
eyes.stop_blink_thread()
|
||||
assert not eyes._blink_thread.is_alive()
|
||||
|
||||
|
||||
def test_eyes_blink_cycle(eyes):
|
||||
"""Test a complete blink cycle for both eyes"""
|
||||
# Start the blink thread
|
||||
eyes.start_blink_thread()
|
||||
|
||||
# Initial state should be ON
|
||||
states = eyes.get_eye_states()
|
||||
assert states['left'] == GPIO.HIGH
|
||||
assert states['right'] == GPIO.HIGH
|
||||
|
||||
# Wait for at least one blink cycle
|
||||
time.sleep(eyes.blink_duration * 2)
|
||||
|
||||
# Check multiple times to catch different states
|
||||
states_history = []
|
||||
for _ in range(10):
|
||||
states = eyes.get_eye_states()
|
||||
states_history.append((states['left'], states['right']))
|
||||
time.sleep(0.1)
|
||||
|
||||
# Verify that we saw at least one blink (LOW state)
|
||||
assert any(state[0] == GPIO.LOW for state in states_history), "Left eye never blinked"
|
||||
assert any(state[1] == GPIO.LOW for state in states_history), "Right eye never blinked"
|
||||
|
||||
# Verify that eyes return to ON state
|
||||
assert any(state[0] == GPIO.HIGH for state in states_history), "Left eye never turned back on"
|
||||
assert any(state[1] == GPIO.HIGH for state in states_history), "Right eye never turned back on"
|
||||
|
||||
# Stop the blink thread
|
||||
eyes.stop_blink_thread()
|
||||
|
||||
def test_eyes_synchronization(eyes):
|
||||
"""Test that both eyes blink synchronously"""
|
||||
# Start the blink thread
|
||||
eyes.start_blink_thread()
|
||||
|
||||
states_history = []
|
||||
# Capture several states to verify synchronization
|
||||
for _ in range(10):
|
||||
states = eyes.get_eye_states()
|
||||
states_history.append((states['left'], states['right']))
|
||||
time.sleep(0.1)
|
||||
|
||||
# Verify that eyes are always in the same state
|
||||
for left_state, right_state in states_history:
|
||||
assert left_state == right_state, "Eyes are not synchronized"
|
||||
|
||||
# Stop the blink thread
|
||||
eyes.stop_blink_thread()
|
||||
|
||||
def test_blink_duration(eyes):
|
||||
"""Test that blink duration is approximately correct"""
|
||||
# Start the blink thread
|
||||
eyes.start_blink_thread()
|
||||
|
||||
# Wait for a blink to start
|
||||
while eyes.get_eye_states()['left'] == GPIO.HIGH:
|
||||
time.sleep(0.01)
|
||||
|
||||
# Measure blink duration
|
||||
start_time = time.time()
|
||||
while eyes.get_eye_states()['left'] == GPIO.LOW:
|
||||
time.sleep(0.01)
|
||||
blink_time = time.time() - start_time
|
||||
|
||||
# Verify blink duration (with some tolerance)
|
||||
assert abs(blink_time - eyes.blink_duration) < 0.05, "Blink duration is not within expected range"
|
||||
|
||||
# Stop the blink thread
|
||||
eyes.stop_blink_thread()
|
||||
|
||||
def test_cleanup(eyes):
|
||||
"""Test that eyes can be properly cleaned up"""
|
||||
# Start the blink thread
|
||||
eyes.start_blink_thread()
|
||||
|
||||
# Set eyes to known state
|
||||
eyes.set_both_eyes(GPIO.HIGH)
|
||||
time.sleep(0.1)
|
||||
|
||||
# Cleanup
|
||||
eyes.cleanup()
|
||||
|
||||
# Verify both eyes are off after cleanup
|
||||
states = eyes.get_eye_states()
|
||||
assert states['left'] == GPIO.LOW
|
||||
assert states['right'] == GPIO.LOW
|
||||
|
||||
# Verify blink thread has stopped
|
||||
assert not eyes._blink_thread.is_alive()
|
||||
|
||||
def test_eyes_random_blink_timing(eyes):
|
||||
"""Test that eyes blink at random intervals"""
|
||||
# Start the blink thread
|
||||
eyes.start_blink_thread()
|
||||
|
||||
# Capture blink timings
|
||||
blink_times = []
|
||||
start_time = time.time()
|
||||
last_state = GPIO.HIGH
|
||||
|
||||
# Monitor for a few blinks
|
||||
while len(blink_times) < 3 and time.time() - start_time < 10:
|
||||
current_state = eyes.get_eye_states()['left']
|
||||
if current_state != last_state and current_state == GPIO.LOW:
|
||||
blink_times.append(time.time() - start_time)
|
||||
last_state = current_state
|
||||
time.sleep(0.01)
|
||||
|
||||
# Test should complete before timeout
|
||||
assert time.time() - start_time < 10, "Failed to detect enough blinks"
|
||||
|
||||
# Verify random intervals between blinks
|
||||
if len(blink_times) >= 2:
|
||||
intervals = [blink_times[i+1] - blink_times[i] for i in range(len(blink_times)-1)]
|
||||
# Check that intervals are not all the same (allowing for small timing variations)
|
||||
assert not all(abs(intervals[0] - interval) < 0.1 for interval in intervals[1:]), "Blink intervals appear to be constant"
|
||||
|
||||
# Stop the blink thread
|
||||
eyes.stop_blink_thread()
|
42
tests/foot_contacts_test.py
Normal file
42
tests/foot_contacts_test.py
Normal file
|
@ -0,0 +1,42 @@
|
|||
import pytest
|
||||
import numpy as np
|
||||
import time
|
||||
|
||||
def test_feet_contacts_initialization(feet_contacts):
|
||||
"""Test that feet contacts are properly initialized"""
|
||||
assert feet_contacts is not None
|
||||
|
||||
def test_feet_contacts_get(feet_contacts):
|
||||
"""Test getting feet contact states"""
|
||||
# Get contact states
|
||||
contacts = feet_contacts.get()
|
||||
|
||||
# Check return type and shape
|
||||
assert isinstance(contacts, np.ndarray)
|
||||
assert contacts.shape == (2,)
|
||||
assert contacts.dtype == bool
|
||||
|
||||
# Check that values are boolean
|
||||
assert isinstance(contacts[0], bool) # Left foot
|
||||
assert isinstance(contacts[1], bool) # Right foot
|
||||
|
||||
def test_feet_contacts_continuous_reading(feet_contacts):
|
||||
"""Test continuous reading of feet contacts"""
|
||||
# Take multiple readings to ensure stability
|
||||
readings = []
|
||||
for _ in range(5):
|
||||
readings.append(feet_contacts.get())
|
||||
time.sleep(0.1)
|
||||
|
||||
# Convert readings to numpy array for analysis
|
||||
readings = np.array(readings)
|
||||
assert readings.shape == (5, 2)
|
||||
|
||||
def test_feet_contacts_response_time(feet_contacts):
|
||||
"""Test response time of feet contacts"""
|
||||
start_time = time.time()
|
||||
feet_contacts.get()
|
||||
end_time = time.time()
|
||||
|
||||
# Reading should be very quick (under 10ms)
|
||||
assert end_time - start_time < 0.01
|
20
tests/projector_test.py
Normal file
20
tests/projector_test.py
Normal file
|
@ -0,0 +1,20 @@
|
|||
import pytest
|
||||
import time
|
||||
from mini_bdx_runtime.projector import Projector
|
||||
projector_delay = 1
|
||||
|
||||
def test_projector_switch(projector):
|
||||
"""Test projector switching on and off"""
|
||||
# Initial state should be off
|
||||
assert projector.on == False
|
||||
|
||||
# Switch on
|
||||
projector.switch()
|
||||
assert projector.on == True
|
||||
time.sleep(projector_delay) # Allow time for GPIO to update
|
||||
|
||||
# Switch off
|
||||
projector.switch()
|
||||
assert projector.on == False
|
||||
time.sleep(projector_delay) # Allow time for GPIO to update
|
||||
|
124
tests/sounds_test.py
Normal file
124
tests/sounds_test.py
Normal file
|
@ -0,0 +1,124 @@
|
|||
import pytest
|
||||
import os
|
||||
import time
|
||||
import pygame
|
||||
from unittest import mock
|
||||
from conftest import get_assets_dir
|
||||
from mini_bdx_runtime.sounds import Sounds
|
||||
|
||||
sound_delay = 0.1
|
||||
sound_volume = 0.5
|
||||
|
||||
def test_sounds_initialization(sounds):
|
||||
"""Test that sounds system is properly initialized"""
|
||||
assert sounds is not None
|
||||
assert sounds.ok == True
|
||||
assert len(sounds.sounds) > 0
|
||||
|
||||
def test_play_specific_sound(sounds):
|
||||
"""Test playing a specific sound"""
|
||||
# Test playing happy sound
|
||||
sounds.play_happy()
|
||||
time.sleep(sound_delay) # Allow time for sound to start
|
||||
|
||||
def test_play_random_sound(sounds):
|
||||
"""Test playing random sounds"""
|
||||
# Test multiple random sounds
|
||||
for _ in range(3):
|
||||
sounds.play_random_sound()
|
||||
time.sleep(sound_delay) # Allow time between sounds
|
||||
|
||||
def test_play_nonexistent_sound(sounds):
|
||||
"""Test attempting to play a nonexistent sound"""
|
||||
# Should print error message but not crash
|
||||
sounds.play("nonexistent_sound.wav")
|
||||
|
||||
def test_sound_volume(sounds):
|
||||
"""Test sound volume setting"""
|
||||
# Get the actual volume of the sound
|
||||
actual_volume = sounds.sounds["happy1.wav"].get_volume()
|
||||
# Use a small tolerance for floating point comparison
|
||||
assert abs(actual_volume - sound_volume) < 0.01, f"Expected volume {sound_volume}, got {actual_volume}"
|
||||
time.sleep(sound_delay)
|
||||
|
||||
def test_available_sounds(sounds):
|
||||
"""Test that expected sound files are available"""
|
||||
# Check for common sound files
|
||||
assert "happy1.wav" in sounds.sounds
|
||||
|
||||
# Verify all loaded sounds are .wav files
|
||||
for sound_name in sounds.sounds.keys():
|
||||
assert sound_name.endswith('.wav')
|
||||
|
||||
@pytest.mark.parametrize("test_volume", [0.0, 0.3, 0.7, 1.0])
|
||||
def test_sound_volume_control(test_volume):
|
||||
"""Test sound volume control for both music and sound effects"""
|
||||
# Create new sound instance with test volume
|
||||
assets_dir = get_assets_dir()
|
||||
sound_player = Sounds(volume=test_volume, sound_directory=assets_dir)
|
||||
|
||||
# Test that mixer volume is set correctly
|
||||
actual_mixer_volume = pygame.mixer.music.get_volume()
|
||||
assert abs(actual_mixer_volume - test_volume) < 0.01, \
|
||||
f"Music volume {actual_mixer_volume} does not match expected {test_volume}"
|
||||
|
||||
# Test that all loaded sound effects have correct volume
|
||||
for sound_name, sound in sound_player.sounds.items():
|
||||
actual_volume = sound.get_volume()
|
||||
assert abs(actual_volume - test_volume) < 0.01, \
|
||||
f"Sound {sound_name} volume {actual_volume} does not match expected {test_volume}"
|
||||
|
||||
# Try playing a sound at this volume
|
||||
if "happy1.wav" in sound_player.sounds:
|
||||
sound_player.play("happy1.wav")
|
||||
time.sleep(sound_delay) # Allow time to start playing
|
||||
assert sound_player.is_playing(), f"Sound should be playing at volume {test_volume}"
|
||||
|
||||
@pytest.mark.parametrize("invalid_volume", [-0.5, 1.5])
|
||||
def test_volume_bounds(invalid_volume):
|
||||
"""Test that volume is properly bounded between 0 and 1"""
|
||||
assets_dir = get_assets_dir()
|
||||
sound_player = Sounds(volume=invalid_volume, sound_directory=assets_dir)
|
||||
|
||||
# Volume should be clamped between 0 and 1
|
||||
actual_volume = pygame.mixer.music.get_volume()
|
||||
assert 0 <= actual_volume <= 1, f"Volume {actual_volume} should be clamped between 0 and 1"
|
||||
|
||||
# Check all sound effects are also properly bounded
|
||||
for sound_name, sound in sound_player.sounds.items():
|
||||
effect_volume = sound.get_volume()
|
||||
assert 0 <= effect_volume <= 1, \
|
||||
f"Sound effect {sound_name} volume {effect_volume} should be clamped between 0 and 1"
|
||||
|
||||
def test_no_overlap_play(monkeypatch):
|
||||
"""Test that play() does not start a new sound while one is playing if wait_if_playing=True"""
|
||||
assets_dir = get_assets_dir()
|
||||
# Mock mixer and sound
|
||||
mock_mixer = mock.MagicMock()
|
||||
mock_sound = mock.MagicMock()
|
||||
mock_mixer.Sound.return_value = mock_sound
|
||||
# Simulate busy state for first call, then not busy
|
||||
busy_states = [True, False]
|
||||
mock_mixer.get_busy.side_effect = lambda: busy_states.pop(0) if busy_states else False
|
||||
# Patch os.listdir to return a fake .wav file
|
||||
with mock.patch("os.listdir", return_value=["happy1.wav"]):
|
||||
sound_player = Sounds(volume=0.5, sound_directory=assets_dir, mixer=mock_mixer)
|
||||
result = sound_player.play("happy1.wav", wait_if_playing=True)
|
||||
assert result is True
|
||||
# Should have waited for busy to become False
|
||||
assert mock_mixer.get_busy.call_count >= 1
|
||||
mock_sound.play.assert_called_once()
|
||||
|
||||
def test_play_without_wait(monkeypatch):
|
||||
"""Test that play() starts immediately if wait_if_playing=False, even if busy"""
|
||||
assets_dir = get_assets_dir()
|
||||
mock_mixer = mock.MagicMock()
|
||||
mock_sound = mock.MagicMock()
|
||||
mock_mixer.Sound.return_value = mock_sound
|
||||
mock_mixer.get_busy.return_value = True
|
||||
with mock.patch("os.listdir", return_value=["happy1.wav"]):
|
||||
sound_player = Sounds(volume=0.5, sound_directory=assets_dir, mixer=mock_mixer)
|
||||
result = sound_player.play("happy1.wav", wait_if_playing=False)
|
||||
assert result is True
|
||||
# Should not wait for busy to become False
|
||||
mock_sound.play.assert_called_once()
|
Loading…
Reference in a new issue