mirror of
https://github.com/apirrone/Open_Duck_Mini_Runtime.git
synced 2025-09-02 03:04:02 +00:00
better controller
This commit is contained in:
parent
cef6c0a2ea
commit
0002523b0c
2 changed files with 104 additions and 138 deletions
|
@ -8,8 +8,11 @@ Design:
|
|||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import atexit
|
||||
import signal
|
||||
from threading import Lock
|
||||
from typing import Tuple, Optional, Union, Dict
|
||||
from typing import Tuple, Optional, Union
|
||||
|
||||
# Direct hardware imports (we assume we're running on-device)
|
||||
import board
|
||||
|
@ -19,15 +22,21 @@ import neopixel
|
|||
PIXEL_PIN = board.D10
|
||||
NUM_PIXELS = 3
|
||||
|
||||
# The order of the pixel colors - RGB or GRB. Some NeoPixels have red and green reversed!
|
||||
# For RGBW NeoPixels, simply change the ORDER to RGBW or GRBW.
|
||||
ORDER = neopixel.RGBW
|
||||
# Allow configuration of pixel order.
|
||||
# Default to GRBW (common on many RGBW strips). You can override via:
|
||||
# - env var ODUCK_LED_ORDER, e.g. "RGB", "GRB", "RGBW", "GRBW"
|
||||
# - duck_config.LED_ORDER (string matching neopixel constants)
|
||||
try:
|
||||
from open_duck_mini_runtime.duck_config import LED_ORDER as _CFG_LED_ORDER # type: ignore
|
||||
except Exception:
|
||||
_CFG_LED_ORDER = None
|
||||
|
||||
_ORDER_NAME = os.getenv("ODUCK_LED_ORDER", _CFG_LED_ORDER or "GRBW").upper()
|
||||
ORDER = getattr(neopixel, _ORDER_NAME, neopixel.GRBW)
|
||||
|
||||
# Brightness can be tuned via env
|
||||
BRIGHTNESS = float(os.getenv("ODUCK_LED_BRIGHTNESS", "1.0"))
|
||||
|
||||
# Brightness and a single shared NeoPixel instance
|
||||
BRIGHTNESS = 1
|
||||
pixels = neopixel.NeoPixel(
|
||||
PIXEL_PIN, NUM_PIXELS, brightness=BRIGHTNESS, auto_write=False, pixel_order=ORDER
|
||||
)
|
||||
|
||||
class LedController:
|
||||
def __init__(self) -> None:
|
||||
|
@ -35,12 +44,15 @@ class LedController:
|
|||
self._pixels = None # type: ignore
|
||||
self._deinited = False
|
||||
|
||||
# Use the module-level NeoPixel configured above
|
||||
self._pixels = pixels
|
||||
# Lazily create the NeoPixel instance (avoid creating it at import-time)
|
||||
self._pixels = neopixel.NeoPixel(
|
||||
PIXEL_PIN, NUM_PIXELS, brightness=BRIGHTNESS, auto_write=False, pixel_order=ORDER
|
||||
)
|
||||
|
||||
# Cache simple color tuples (use 4-tuple for RGBW strips)
|
||||
# Colors are stored in logical (R, G, B, W) regardless of ORDER.
|
||||
self.OFF = (0, 0, 0, 0)
|
||||
# On RGBW strips, "white" uses the W channel for best white.
|
||||
self.WHITE = (0, 0, 0, 255)
|
||||
self.RED = (255, 0, 0, 0)
|
||||
self.GREEN = (0, 255, 0, 0)
|
||||
|
@ -67,6 +79,9 @@ class LedController:
|
|||
# Ensure an initial known state
|
||||
self._apply()
|
||||
|
||||
# Ensure cleanup on interpreter shutdown
|
||||
atexit.register(self.deinit)
|
||||
|
||||
# Internal helper to apply current states to pixels
|
||||
def _apply(
|
||||
self,
|
||||
|
@ -98,8 +113,6 @@ class LedController:
|
|||
"""
|
||||
r, g, b, w = color_rgba
|
||||
try:
|
||||
# neopixel library generally accepts (r,g,b) or (r,g,b,w) depending on pixel type
|
||||
# We map to the declared ORDER for clarity when direct indexing is used.
|
||||
if ORDER in (neopixel.RGB, neopixel.GRB):
|
||||
mapping = {
|
||||
neopixel.RGB: (r, g, b),
|
||||
|
@ -114,7 +127,7 @@ class LedController:
|
|||
}
|
||||
return mapping.get(ORDER, (r, g, b, w))
|
||||
except Exception:
|
||||
# Fallback: return RGBA or RGB as-is
|
||||
# Fallback
|
||||
return (r, g, b, w)
|
||||
|
||||
# Eyes API
|
||||
|
@ -185,15 +198,18 @@ class LedController:
|
|||
self._apply()
|
||||
|
||||
def deinit(self) -> None:
|
||||
if self._deinited or self._pixels is None:
|
||||
if self._deinited:
|
||||
return
|
||||
with self._lock:
|
||||
try:
|
||||
self.all_off()
|
||||
# Turn everything off before releasing the driver
|
||||
if self._pixels is not None:
|
||||
self._pixels.fill(self._to_order(self.OFF))
|
||||
self._pixels.show()
|
||||
except Exception:
|
||||
# Ignore if showing fails on teardown
|
||||
pass
|
||||
try:
|
||||
if self._pixels is not None:
|
||||
self._pixels.deinit()
|
||||
finally:
|
||||
self._deinited = True
|
||||
|
@ -208,3 +224,17 @@ def get_controller() -> LedController:
|
|||
if _controller is None:
|
||||
_controller = LedController()
|
||||
return _controller
|
||||
|
||||
|
||||
# Ensure graceful cleanup on Ctrl+C / SIGTERM without forcing controller creation.
|
||||
def _shutdown_handler(signum, frame):
|
||||
global _controller
|
||||
if _controller is not None:
|
||||
_controller.deinit()
|
||||
|
||||
try:
|
||||
signal.signal(signal.SIGINT, _shutdown_handler)
|
||||
signal.signal(signal.SIGTERM, _shutdown_handler)
|
||||
except Exception:
|
||||
# Not all environments allow setting signals (e.g., some threads)
|
||||
pass
|
|
@ -1,130 +1,66 @@
|
|||
import pygame
|
||||
import argparse
|
||||
import time
|
||||
import os
|
||||
import random
|
||||
from threading import Thread, Lock
|
||||
from pathlib import Path
|
||||
|
||||
from open_duck_mini_runtime.led_controller import get_controller
|
||||
import pygame
|
||||
|
||||
|
||||
class Sounds:
|
||||
def __init__(self, volume=1.0, sound_directory="", default_color: str = "white", sound_color_map: dict | None = None):
|
||||
def find_assets_dir() -> Path:
|
||||
# assets/ is a sibling of the package directory
|
||||
pkg_dir = Path(__file__).resolve().parent
|
||||
src_dir = pkg_dir.parent
|
||||
assets = src_dir / "assets"
|
||||
if not assets.exists():
|
||||
raise FileNotFoundError(f"Assets directory not found: {assets}")
|
||||
return assets
|
||||
|
||||
|
||||
def list_wavs(assets_dir: Path) -> list[Path]:
|
||||
return sorted(assets_dir.glob("*.wav"))
|
||||
|
||||
|
||||
def play_sound(path: Path, volume: float = 1.0) -> None:
|
||||
snd = pygame.mixer.Sound(str(path))
|
||||
snd.set_volume(max(0.0, min(1.0, volume)))
|
||||
ch = snd.play()
|
||||
while ch.get_busy():
|
||||
time.sleep(0.05)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Step through sounds in assets/")
|
||||
parser.add_argument("--auto", action="store_true", help="Automatically advance without waiting for Enter")
|
||||
parser.add_argument("--delay", type=float, default=0.5, help="Delay between sounds when --auto is set")
|
||||
parser.add_argument("--volume", type=float, default=1.0, help="Playback volume (0.0 - 1.0)")
|
||||
args = parser.parse_args()
|
||||
|
||||
assets = find_assets_dir()
|
||||
files = list_wavs(assets)
|
||||
if not files:
|
||||
print(f"No .wav files found in {assets}")
|
||||
return
|
||||
|
||||
pygame.mixer.init()
|
||||
pygame.mixer.music.set_volume(volume)
|
||||
self.sounds = {}
|
||||
self.ok = True
|
||||
self._ctrl = get_controller()
|
||||
self._default_color = default_color
|
||||
self._sound_color_map = sound_color_map or {}
|
||||
self._color_token = 0
|
||||
self._lock = Lock()
|
||||
try:
|
||||
for file in os.listdir(sound_directory):
|
||||
if file.endswith(".wav"):
|
||||
sound_path = os.path.join(sound_directory, file)
|
||||
try:
|
||||
self.sounds[file] = pygame.mixer.Sound(sound_path)
|
||||
print(f"Loaded: {file}")
|
||||
except pygame.error as e:
|
||||
print(f"Failed to load {file}: {e}")
|
||||
except FileNotFoundError:
|
||||
print(f"Directory {sound_directory} not found.")
|
||||
self.ok = False
|
||||
if len(self.sounds) == 0:
|
||||
print("No sound files found in the directory.")
|
||||
self.ok = False
|
||||
print(f"pygame {pygame.version.ver}")
|
||||
print(f"Found {len(files)} wav files in {assets}")
|
||||
|
||||
# Initialize color map defaults (cycle through a limited palette)
|
||||
if self.ok and not self._sound_color_map:
|
||||
palette = ["red", "green", "blue", "white"]
|
||||
for idx, name in enumerate(sorted(self.sounds.keys())):
|
||||
self._sound_color_map[name] = palette[idx % len(palette)]
|
||||
|
||||
# Ensure LEDs are default color initially (without changing on/off state)
|
||||
try:
|
||||
self._ctrl.set_all_color(self._default_color)
|
||||
for i, wav in enumerate(files, 1):
|
||||
print(f"[{i}/{len(files)}] Playing: {wav.name}")
|
||||
play_sound(wav, volume=args.volume)
|
||||
if args.auto:
|
||||
time.sleep(max(0.0, args.delay))
|
||||
else:
|
||||
input("Press Enter for next...")
|
||||
except KeyboardInterrupt:
|
||||
print("\nInterrupted.")
|
||||
finally:
|
||||
try:
|
||||
pygame.mixer.stop()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def play(self, sound_name):
|
||||
if not self.ok:
|
||||
print("Sounds not initialized properly.")
|
||||
return
|
||||
if sound_name in self.sounds:
|
||||
chan = self.sounds[sound_name].play()
|
||||
print(f"Playing: {sound_name}")
|
||||
|
||||
# Change LEDs to mapped color (supports string for all or dict per pixel)
|
||||
mapping_val = self._sound_color_map.get(sound_name, self._default_color)
|
||||
try:
|
||||
if isinstance(mapping_val, dict):
|
||||
left = mapping_val.get("left", self._default_color)
|
||||
right = mapping_val.get("right", self._default_color)
|
||||
proj = mapping_val.get("projector", self._default_color)
|
||||
self._ctrl.set_left_eye_color(left)
|
||||
self._ctrl.set_right_eye_color(right)
|
||||
self._ctrl.set_projector_color(proj)
|
||||
else:
|
||||
self._ctrl.set_all_color(mapping_val)
|
||||
except Exception as e:
|
||||
print(f"LED color set failed: {e}")
|
||||
|
||||
# Schedule reset to default when the sound finishes
|
||||
with self._lock:
|
||||
self._color_token += 1
|
||||
token = self._color_token
|
||||
|
||||
def _reset_when_done(channel, expected_token):
|
||||
try:
|
||||
# If no channel returned, fallback to sleep by length
|
||||
if channel is None:
|
||||
duration = self.sounds[sound_name].get_length()
|
||||
time.sleep(duration)
|
||||
else:
|
||||
# Wait until the channel is no longer busy
|
||||
while channel.get_busy():
|
||||
time.sleep(0.02)
|
||||
# Only reset if no newer sound changed the color
|
||||
with self._lock:
|
||||
if expected_token == self._color_token:
|
||||
self._ctrl.set_all_color(self._default_color)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
Thread(target=_reset_when_done, args=(chan, token), daemon=True).start()
|
||||
else:
|
||||
print(f"Sound '{sound_name}' not found!")
|
||||
|
||||
def play_random_sound(self):
|
||||
if not self.ok:
|
||||
print("Sounds not initialized properly.")
|
||||
return
|
||||
sound_name = random.choice(list(self.sounds.keys()))
|
||||
self.play(sound_name)
|
||||
|
||||
def play_happy(self):
|
||||
self.play("happy1.wav")
|
||||
|
||||
# API helpers
|
||||
def set_default_color(self, color: str):
|
||||
self._default_color = color
|
||||
try:
|
||||
self._ctrl.set_all_color(self._default_color)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def set_sound_color(self, sound_name: str, color: str):
|
||||
self._sound_color_map[sound_name] = color
|
||||
|
||||
def set_sound_color_map(self, mapping: dict):
|
||||
self._sound_color_map.update(mapping)
|
||||
pygame.quit()
|
||||
|
||||
|
||||
|
||||
# Example usage
|
||||
if __name__ == "__main__":
|
||||
sound_player = Sounds(1.0, "../assets/")
|
||||
time.sleep(1)
|
||||
while True:
|
||||
sound_player.play_random_sound()
|
||||
time.sleep(5)
|
||||
main()
|
Loading…
Reference in a new issue