diff --git a/config/smoke_gas_detector.json5 b/config/smoke_gas_detector.json5 new file mode 100644 index 000000000..925ac28b9 --- /dev/null +++ b/config/smoke_gas_detector.json5 @@ -0,0 +1,51 @@ +{ + version: "v1.0.3", + hertz: 0.5, + name: "smoke_patrol", + api_key: "${OM_API_KEY:-openmind_free}", + system_prompt_base: "You are an autonomous safety patrol robot. Your name is SafeBot. You monitor the environment using a smoke and gas detector to detect fire hazards and dangerous gas leaks. When smoke or gas is detected at warning level, alert and inspect immediately. When critical levels are detected, trigger evacuation. Patrol continuously and report air quality status.", + system_governance: "Here are the laws that govern your actions. Do not violate these laws.\nFirst Law: A robot cannot harm a human or allow a human to come to harm.\nSecond Law: A robot must obey orders from humans, unless those orders conflict with the First Law.\nThird Law: A robot must protect itself, as long as that protection doesn't conflict with the First or Second Law.\nThe First Law is considered the most important, taking precedence over the second and third laws.", + system_prompt_examples: "Here are some examples of interactions you might encounter:\n\n1. If smoke detector detects warning level:\n Speak: {{'sentence': 'Warning: elevated smoke detected. Inspecting area immediately.'}}\n\n2. If smoke detector detects critical level:\n Speak: {{'sentence': 'ALERT: Critical smoke level detected. Initiating evacuation protocol.'}}\n\n3. If air quality is normal:\n Move: 'forward'\n Speak: {{'sentence': 'Air quality normal. Continuing patrol.'}}", + agent_inputs: [ + { + // Smoke and gas detector for fire and hazard detection + // connector options: "mock", "serial", "i2c_ens160", "i2c_sgp30" + type: "SmokeGasDetector", + config: { + connector: "mock", + cooldown: 5.0, + smoke_warning_threshold: 300, + smoke_danger_threshold: 600, + gas_warning_threshold: 300, + gas_danger_threshold: 600, + mock_scenario: "normal", + }, + }, + { + type: "GoogleASRInput", + config: { + enable_tts_interrupt: false, + }, + }, + ], + simulators: [ + { + type: "WebSim", + }, + ], + cortex_llm: { + type: "OpenAILLM", + config: { + agent_name: "SafeBot", + history_length: 10, + }, + }, + agent_actions: [ + { + name: "speak", + llm_label: "speak", + implementation: "passthrough", + connector: "elevenlabs_tts", + }, + ], +} diff --git a/docs/developing/4_inputs.md b/docs/developing/4_inputs.md index 5dca87f39..dce98b442 100644 --- a/docs/developing/4_inputs.md +++ b/docs/developing/4_inputs.md @@ -25,5 +25,6 @@ Here are a few examples for you to reuse and build on: - [VLM_COCO_Local](https://github.com/openmind/OM1/blob/main/src/inputs/plugins/vlm_coco_local.py) - [VLM_Vila](https://github.com/openmind/OM1/blob/main/src/inputs/plugins/vlm_vila.py) - [Arduino GPS](https://github.com/openmind/OM1/blob/main/src/inputs/plugins/gps.py) +- [Smoke and Gas Detector](https://github.com/openmind/OM1/blob/main/src/inputs/plugins/smoke_gas_detector.py) Learn how to build a new input plugin [here](../developer_cookbook/input.md) diff --git a/src/inputs/plugins/smoke_gas_detector.py b/src/inputs/plugins/smoke_gas_detector.py new file mode 100644 index 000000000..71d0066e5 --- /dev/null +++ b/src/inputs/plugins/smoke_gas_detector.py @@ -0,0 +1,628 @@ +import asyncio +import logging +import time +from typing import Optional + +from pydantic import Field + +from inputs.base import Message, SensorConfig +from inputs.base.loop import FuserInput +from providers.io_provider import IOProvider + +try: + import adafruit_ens160 as _ens160_lib + + _ENS160_AVAILABLE = True +except ImportError: + _ens160_lib = None + _ENS160_AVAILABLE = False + logging.warning( + "adafruit-ens160 not found. ENS160 connector unavailable. " + "Install with: pip install adafruit-circuitpython-ens160" + ) + +try: + import adafruit_sgp30 as _sgp30_lib + + _SGP30_AVAILABLE = True +except ImportError: + _sgp30_lib = None + _SGP30_AVAILABLE = False + logging.warning( + "adafruit-sgp30 not found. SGP30 connector unavailable. " + "Install with: pip install adafruit-circuitpython-sgp30" + ) + +try: + import serial as _serial + + _SERIAL_AVAILABLE = True +except ImportError: + _serial = None + _SERIAL_AVAILABLE = False + logging.warning( + "pyserial not found. Serial connector unavailable. " + "Install with: pip install pyserial" + ) + +SMOKE_WARNING_THRESHOLD = 300 +SMOKE_DANGER_THRESHOLD = 600 +GAS_WARNING_THRESHOLD = 300 +GAS_DANGER_THRESHOLD = 600 + + +class SmokeGasDetectorConfig(SensorConfig): + """ + Configuration for Smoke and Gas Detector input plugin. + + Supports five hardware backends via the ``connector`` field: + + - ``"serial"`` : Arduino with MQ-2/MQ-7/MQ-135 via pyserial. + - ``"i2c_ens160"`` : ENS160 multi-gas sensor via I2C. + - ``"i2c_sgp30"`` : SGP30 air quality sensor via I2C. + - ``"mock"`` : Simulated data for development (default). + + Parameters + ---------- + connector : str + Hardware backend to use. Default is ``"mock"``. + port : str + Serial port path. Used when connector='serial'. Default ``"/dev/ttyUSB0"``. + baudrate : int + Serial baudrate. Default 9600. + serial_timeout : float + Serial read timeout in seconds. Default 1.0. + cooldown : float + Minimum seconds between alerts forwarded to LLM. Default 5.0. + smoke_warning_threshold : int + Smoke level (ppm) above which warning is triggered. Default 300. + smoke_danger_threshold : int + Smoke level (ppm) above which danger alert is triggered. Default 600. + gas_warning_threshold : int + Gas level (ppm) above which warning is triggered. Default 300. + gas_danger_threshold : int + Gas level (ppm) above which danger alert is triggered. Default 600. + mock_scenario : str + Mock scenario: ``"normal"``, ``"warning"``, or ``"danger"``. Default ``"normal"``. + """ + + connector: str = Field( + default="mock", + description=( + "Hardware backend: 'serial' (Arduino MQ sensors), " + "'i2c_ens160' (ENS160), " + "'i2c_sgp30' (SGP30), " + "'mock' (testing)" + ), + ) + port: str = Field( + default="/dev/ttyUSB0", + description="Serial port path. Used when connector='serial'.", + ) + baudrate: int = Field( + default=9600, + description="Serial baudrate.", + ) + serial_timeout: float = Field( + default=1.0, + description="Serial read timeout in seconds.", + ) + cooldown: float = Field( + default=5.0, + description="Minimum seconds between alerts forwarded to LLM.", + ) + smoke_warning_threshold: int = Field( + default=SMOKE_WARNING_THRESHOLD, + description="Smoke level (ppm) above which warning is triggered.", + ) + smoke_danger_threshold: int = Field( + default=SMOKE_DANGER_THRESHOLD, + description="Smoke level (ppm) above which danger alert is triggered.", + ) + gas_warning_threshold: int = Field( + default=GAS_WARNING_THRESHOLD, + description="Gas level (ppm) above which warning is triggered.", + ) + gas_danger_threshold: int = Field( + default=GAS_DANGER_THRESHOLD, + description="Gas level (ppm) above which danger alert is triggered.", + ) + mock_scenario: str = Field( + default="normal", + description="Mock scenario: 'normal', 'warning', or 'danger'.", + ) + + +class SmokeGasReading: + """ + Container for a smoke/gas sensor reading. + + Parameters + ---------- + smoke_ppm : float + Smoke concentration in parts per million. + gas_ppm : float + Gas concentration in parts per million. + sensor_type : str + Name of the sensor that produced this reading. + """ + + def __init__(self, smoke_ppm: float, gas_ppm: float, sensor_type: str = "unknown"): + """ + Initialize SmokeGasReading. + + Parameters + ---------- + smoke_ppm : float + Smoke concentration in ppm. + gas_ppm : float + Gas concentration in ppm. + sensor_type : str + Sensor type name. + """ + self.smoke_ppm = smoke_ppm + self.gas_ppm = gas_ppm + self.sensor_type = sensor_type + + +class _SerialSmokeConnector: + """ + Read smoke/gas data from Arduino with MQ sensors via serial. + + The paired Arduino sketch must send data in this format:: + + "SMOKE:450,GAS:320" + + Minimal Arduino sketch (MQ-2 sensor) + ------------------------------------------------------- + .. code-block:: cpp + + const int mq2Pin = A0; + const int mq7Pin = A1; + + void setup() { + Serial.begin(9600); + } + void loop() { + int smoke = analogRead(mq2Pin); + int gas = analogRead(mq7Pin); + Serial.print("SMOKE:"); + Serial.print(smoke); + Serial.print(",GAS:"); + Serial.println(gas); + delay(500); + } + """ + + def __init__(self, port: str, baudrate: int, timeout: float): + """ + Initialize serial smoke connector. + + Parameters + ---------- + port : str + Serial port path. + baudrate : int + Serial baudrate. + timeout : float + Read timeout in seconds. + """ + self._ser = None + if not _SERIAL_AVAILABLE or _serial is None: + logging.error("SmokeGasDetector Serial: pyserial not available.") + return + try: + self._ser = _serial.Serial(port, baudrate, timeout=timeout) + logging.info( + f"SmokeGasDetector Serial: connected to {port} @ {baudrate} baud" + ) + except _serial.SerialException as e: + logging.error(f"SmokeGasDetector Serial: failed to open {port}: {e}") + + async def read(self) -> Optional[SmokeGasReading]: + """ + Read a smoke/gas frame from serial. + + Returns + ------- + Optional[SmokeGasReading] + Smoke/gas reading or None if unavailable. + """ + if self._ser is None: + return None + try: + raw = self._ser.readline().decode("utf-8").strip() + except Exception as e: + logging.warning(f"SmokeGasDetector Serial: read error: {e}") + return None + + if not raw.startswith("SMOKE:"): + if raw: + logging.debug(f"SmokeGasDetector Serial: unrecognised line: '{raw}'") + return None + try: + parts = dict(p.split(":") for p in raw.split(",") if ":" in p) + smoke = float(parts.get("SMOKE", 0)) + gas = float(parts.get("GAS", 0)) + return SmokeGasReading( + smoke_ppm=smoke, gas_ppm=gas, sensor_type="Arduino-MQ" + ) + except (ValueError, KeyError) as e: + logging.warning(f"SmokeGasDetector Serial: parse error: {e}") + return None + + def stop(self): + """Release serial resources.""" + if self._ser and self._ser.is_open: + self._ser.close() + logging.info("SmokeGasDetector Serial: port closed") + + +class _ENS160Connector: + """ + Read air quality data from ENS160 multi-gas sensor via I2C. + + Wiring (ENS160 to Raspberry Pi) + ----------------------------------- + ENS160 VIN → RPi Pin 1 (3.3V) + ENS160 GND → RPi Pin 6 (GND) + ENS160 SDA → RPi Pin 3 (GPIO2, I2C SDA) + ENS160 SCL → RPi Pin 5 (GPIO3, I2C SCL) + """ + + def __init__(self): + """Initialize ENS160 connector.""" + self._sensor = None + self._ready = False + + if not _ENS160_AVAILABLE or _ens160_lib is None: + logging.error("SmokeGasDetector ENS160: library not available.") + return + try: + import board + import busio + + i2c = busio.I2C(board.SCL, board.SDA) + self._sensor = _ens160_lib.ENS160(i2c) + self._ready = True + logging.info("SmokeGasDetector ENS160: initialized") + except Exception as e: + logging.error(f"SmokeGasDetector ENS160: setup failed: {e}") + + async def read(self) -> Optional[SmokeGasReading]: + """ + Read air quality from ENS160. + + Returns + ------- + Optional[SmokeGasReading] + Smoke/gas reading or None if unavailable. + """ + if not self._ready or self._sensor is None: + return None + try: + tvoc = float(self._sensor.TVOC) + eco2 = float(self._sensor.eCO2) + return SmokeGasReading(smoke_ppm=tvoc, gas_ppm=eco2, sensor_type="ENS160") + except Exception as e: + logging.warning(f"SmokeGasDetector ENS160: read error: {e}") + return None + + def stop(self): + """Release ENS160 resources.""" + self._ready = False + logging.info("SmokeGasDetector ENS160: stopped") + + +class _SGP30Connector: + """ + Read air quality data from SGP30 sensor via I2C. + + Wiring (SGP30 to Raspberry Pi) + ----------------------------------- + SGP30 VIN → RPi Pin 1 (3.3V) + SGP30 GND → RPi Pin 6 (GND) + SGP30 SDA → RPi Pin 3 (GPIO2, I2C SDA) + SGP30 SCL → RPi Pin 5 (GPIO3, I2C SCL) + """ + + def __init__(self): + """Initialize SGP30 connector.""" + self._sensor = None + self._ready = False + + if not _SGP30_AVAILABLE or _sgp30_lib is None: + logging.error("SmokeGasDetector SGP30: library not available.") + return + try: + import board + import busio + + i2c = busio.I2C(board.SCL, board.SDA, frequency=100000) + self._sensor = _sgp30_lib.Adafruit_SGP30(i2c) + self._sensor.iaq_init() + self._ready = True + logging.info("SmokeGasDetector SGP30: initialized") + except Exception as e: + logging.error(f"SmokeGasDetector SGP30: setup failed: {e}") + + async def read(self) -> Optional[SmokeGasReading]: + """ + Read air quality from SGP30. + + Returns + ------- + Optional[SmokeGasReading] + Smoke/gas reading or None if unavailable. + """ + if not self._ready or self._sensor is None: + return None + try: + tvoc = float(self._sensor.TVOC) + eco2 = float(self._sensor.eCO2) + return SmokeGasReading(smoke_ppm=tvoc, gas_ppm=eco2, sensor_type="SGP30") + except Exception as e: + logging.warning(f"SmokeGasDetector SGP30: read error: {e}") + return None + + def stop(self): + """Release SGP30 resources.""" + self._ready = False + logging.info("SmokeGasDetector SGP30: stopped") + + +class _MockSmokeConnector: + """ + Simulated smoke/gas connector for development and testing. + + Produces three scenarios: normal air quality, warning level, and danger level. + """ + + def __init__(self, scenario: str = "normal"): + """ + Initialize mock smoke connector. + + Parameters + ---------- + scenario : str + Scenario to simulate: 'normal', 'warning', or 'danger'. + """ + import random + + self._random = random + self._scenario = scenario + logging.info( + f"SmokeGasDetector mock: simulation active (scenario='{scenario}'). " + "No real hardware used." + ) + + async def read(self) -> Optional[SmokeGasReading]: + """ + Return simulated smoke/gas reading. + + Returns + ------- + Optional[SmokeGasReading] + Simulated reading. + """ + await asyncio.sleep(0) + + if self._scenario == "warning": + smoke = 350.0 + self._random.uniform(-20.0, 20.0) + gas = 320.0 + self._random.uniform(-20.0, 20.0) + elif self._scenario == "danger": + smoke = 750.0 + self._random.uniform(-30.0, 30.0) + gas = 700.0 + self._random.uniform(-30.0, 30.0) + else: + smoke = 50.0 + self._random.uniform(-10.0, 10.0) + gas = 40.0 + self._random.uniform(-10.0, 10.0) + + return SmokeGasReading(smoke_ppm=smoke, gas_ppm=gas, sensor_type="mock") + + def stop(self): + """Stop mock connector.""" + logging.info("SmokeGasDetector mock: stopped") + + +class SmokeGasDetector(FuserInput[SmokeGasDetectorConfig, Optional[SmokeGasReading]]): + """ + Universal Smoke and Gas Detector input plugin for OM1. + + Reads smoke and gas sensor data and converts readings into natural language + context for the LLM. Detects normal air quality, warning levels, and + dangerous smoke/gas concentrations for fire and hazard detection. + + Supports four hardware backends: Arduino MQ sensors via serial, ENS160, + SGP30, and mock simulation. + + A ``cooldown`` parameter prevents the LLM context from being flooded + with repeated identical alerts. + + Example config entry:: + + { + "type": "SmokeGasDetector", + "config": { + "connector": "serial", + "port": "/dev/ttyUSB0", + "cooldown": 5.0, + "smoke_danger_threshold": 600, + "gas_danger_threshold": 600 + } + } + """ + + def __init__(self, config: SmokeGasDetectorConfig): + """ + Initialize SmokeGasDetector. + + Parameters + ---------- + config : SmokeGasDetectorConfig + Plugin configuration. + """ + super().__init__(config) + + self.descriptor_for_LLM = "Smoke and Gas Detector" + self.io_provider = IOProvider() + self.messages: list[Message] = [] + self._last_alert_time: float = 0.0 + + connector = config.connector.lower() + + if connector == "serial": + self._connector = _SerialSmokeConnector( + port=config.port, + baudrate=config.baudrate, + timeout=config.serial_timeout, + ) + elif connector == "i2c_ens160": + self._connector = _ENS160Connector() + elif connector == "i2c_sgp30": + self._connector = _SGP30Connector() + elif connector == "mock": + self._connector = _MockSmokeConnector(scenario=config.mock_scenario) + else: + logging.error( + f"SmokeGasDetector: unknown connector '{connector}'. " + "Valid options: serial, i2c_ens160, i2c_sgp30, mock. " + "Falling back to mock." + ) + self._connector = _MockSmokeConnector(scenario=config.mock_scenario) + + logging.info( + f"SmokeGasDetector initialized: connector='{connector}', " + f"cooldown={config.cooldown}s" + ) + + async def _poll(self) -> Optional[SmokeGasReading]: + """ + Poll the active connector for a smoke/gas reading. + + Returns + ------- + Optional[SmokeGasReading] + Smoke/gas reading if available, None otherwise. + """ + await asyncio.sleep(0.5) + return await self._connector.read() + + def _classify(self, reading: SmokeGasReading) -> str: + """ + Classify a smoke/gas reading into a severity level. + + Parameters + ---------- + reading : SmokeGasReading + The reading to classify. + + Returns + ------- + str + Severity level: 'danger', 'warning', or 'normal'. + """ + if ( + reading.smoke_ppm >= self.config.smoke_danger_threshold + or reading.gas_ppm >= self.config.gas_danger_threshold + ): + return "danger" + if ( + reading.smoke_ppm >= self.config.smoke_warning_threshold + or reading.gas_ppm >= self.config.gas_warning_threshold + ): + return "warning" + return "normal" + + async def _raw_to_text( + self, raw_input: Optional[SmokeGasReading] + ) -> Optional[Message]: + """ + Convert a smoke/gas reading to a natural language message. + + Parameters + ---------- + raw_input : Optional[SmokeGasReading] + Smoke/gas reading from the sensor. + + Returns + ------- + Optional[Message] + Timestamped message for LLM context, or None if suppressed. + """ + if raw_input is None: + return None + + level = self._classify(raw_input) + now = time.time() + + if level == "danger": + if (now - self._last_alert_time) < self.config.cooldown: + return None + self._last_alert_time = now + message = ( + f"SMOKE ALERT: Critical smoke/gas level detected. " + f"Smoke: {raw_input.smoke_ppm:.0f} ppm, Gas: {raw_input.gas_ppm:.0f} ppm. " + f"Immediate evacuation recommended. Possible fire or gas leak." + ) + elif level == "warning": + if (now - self._last_alert_time) < self.config.cooldown: + return None + self._last_alert_time = now + message = ( + f"SMOKE WARNING: Elevated smoke/gas detected. " + f"Smoke: {raw_input.smoke_ppm:.0f} ppm, Gas: {raw_input.gas_ppm:.0f} ppm. " + f"Possible fire risk. Inspect area immediately." + ) + else: + message = ( + f"Smoke/gas detector: Air quality normal. " + f"Smoke: {raw_input.smoke_ppm:.0f} ppm, Gas: {raw_input.gas_ppm:.0f} ppm." + ) + + return Message(timestamp=now, message=message) + + async def raw_to_text(self, raw_input: Optional[SmokeGasReading]): + """ + Convert raw smoke/gas input to text and append to message buffer. + + Parameters + ---------- + raw_input : Optional[SmokeGasReading] + Raw smoke/gas reading from the sensor connector. + """ + pending = await self._raw_to_text(raw_input) + if pending is not None: + self.messages.append(pending) + + def formatted_latest_buffer(self) -> Optional[str]: + """ + Format the latest buffered message for LLM context injection. + + Clears the buffer after formatting. + + Returns + ------- + Optional[str] + Formatted context string, or None if no events buffered. + """ + if len(self.messages) == 0: + return None + + latest = self.messages[-1] + + result = ( + f"\nINPUT: {self.descriptor_for_LLM}\n// START\n{latest.message}\n// END\n" + ) + + self.io_provider.add_input( + self.__class__.__name__, latest.message, latest.timestamp + ) + self.messages = [] + return result + + def stop(self): + """Gracefully shut down the smoke/gas detector connector.""" + logging.info("SmokeGasDetector: stopping") + if self._connector and hasattr(self._connector, "stop"): + self._connector.stop() + self.messages = [] diff --git a/tests/inputs/plugins/test_smoke_gas_detector.py b/tests/inputs/plugins/test_smoke_gas_detector.py new file mode 100644 index 000000000..cc2e1b985 --- /dev/null +++ b/tests/inputs/plugins/test_smoke_gas_detector.py @@ -0,0 +1,740 @@ +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +import serial + +from inputs.base import Message +from inputs.plugins.smoke_gas_detector import ( + SmokeGasDetector, + SmokeGasDetectorConfig, + SmokeGasReading, + _MockSmokeConnector, + _SerialSmokeConnector, +) + + +def test_smoke_gas_reading_properties(): + """Test SmokeGasReading stores values correctly.""" + reading = SmokeGasReading(smoke_ppm=450.0, gas_ppm=320.0, sensor_type="test") + assert reading.smoke_ppm == 450.0 + assert reading.gas_ppm == 320.0 + assert reading.sensor_type == "test" + + +def test_smoke_gas_reading_default_sensor_type(): + """Test SmokeGasReading default sensor type.""" + reading = SmokeGasReading(smoke_ppm=50.0, gas_ppm=40.0) + assert reading.sensor_type == "unknown" + + +@pytest.mark.asyncio +async def test_mock_connector_normal_scenario(): + """Test mock connector normal scenario returns low ppm.""" + connector = _MockSmokeConnector(scenario="normal") + result = await connector.read() + assert result is not None + assert result.smoke_ppm < 300.0 + assert result.gas_ppm < 300.0 + + +@pytest.mark.asyncio +async def test_mock_connector_warning_scenario(): + """Test mock connector warning scenario returns warning-level ppm.""" + connector = _MockSmokeConnector(scenario="warning") + result = await connector.read() + assert result is not None + assert result.smoke_ppm >= 300.0 + + +@pytest.mark.asyncio +async def test_mock_connector_danger_scenario(): + """Test mock connector danger scenario returns high ppm.""" + connector = _MockSmokeConnector(scenario="danger") + result = await connector.read() + assert result is not None + assert result.smoke_ppm >= 600.0 + + +def test_mock_connector_stop(): + """Test mock connector stop does not raise.""" + connector = _MockSmokeConnector() + connector.stop() + + +def test_serial_connector_init_success(): + """Test serial connector initializes successfully.""" + mock_serial = MagicMock() + with patch( + "inputs.plugins.smoke_gas_detector._serial.Serial", + return_value=mock_serial, + ): + connector = _SerialSmokeConnector("/dev/ttyUSB0", 9600, 1.0) + assert connector._ser == mock_serial + + +def test_serial_connector_init_failure(): + """Test serial connector handles connection failure gracefully.""" + with patch( + "inputs.plugins.smoke_gas_detector._serial.Serial", + side_effect=serial.SerialException("Port not found"), + ): + connector = _SerialSmokeConnector("/dev/ttyUSB0", 9600, 1.0) + assert connector._ser is None + + +def test_serial_connector_init_serial_unavailable(): + """Test serial connector handles missing pyserial library.""" + import inputs.plugins.smoke_gas_detector as mod + + original = mod._SERIAL_AVAILABLE + mod._SERIAL_AVAILABLE = False + connector = _SerialSmokeConnector("/dev/ttyUSB0", 9600, 1.0) + assert connector._ser is None + mod._SERIAL_AVAILABLE = original + + +@pytest.mark.asyncio +async def test_serial_connector_read_valid(): + """Test serial connector parses valid SMOKE line.""" + mock_serial = MagicMock() + mock_serial.readline.return_value = b"SMOKE:450,GAS:320\n" + with patch( + "inputs.plugins.smoke_gas_detector._serial.Serial", + return_value=mock_serial, + ): + connector = _SerialSmokeConnector("/dev/ttyUSB0", 9600, 1.0) + result = await connector.read() + assert result is not None + assert result.smoke_ppm == 450.0 + assert result.gas_ppm == 320.0 + + +@pytest.mark.asyncio +async def test_serial_connector_read_invalid_prefix(): + """Test serial connector ignores lines without SMOKE prefix.""" + mock_serial = MagicMock() + mock_serial.readline.return_value = b"INVALID:data\n" + with patch( + "inputs.plugins.smoke_gas_detector._serial.Serial", + return_value=mock_serial, + ): + connector = _SerialSmokeConnector("/dev/ttyUSB0", 9600, 1.0) + result = await connector.read() + assert result is None + + +@pytest.mark.asyncio +async def test_serial_connector_read_no_connection(): + """Test serial connector read returns None when not connected.""" + with patch( + "inputs.plugins.smoke_gas_detector._serial.Serial", + side_effect=serial.SerialException("Port not found"), + ): + connector = _SerialSmokeConnector("/dev/ttyUSB0", 9600, 1.0) + result = await connector.read() + assert result is None + + +@pytest.mark.asyncio +async def test_serial_connector_read_exception(): + """Test serial connector handles read exception gracefully.""" + mock_serial = MagicMock() + mock_serial.readline.side_effect = Exception("Read error") + with patch( + "inputs.plugins.smoke_gas_detector._serial.Serial", + return_value=mock_serial, + ): + connector = _SerialSmokeConnector("/dev/ttyUSB0", 9600, 1.0) + result = await connector.read() + assert result is None + + +@pytest.mark.asyncio +async def test_serial_connector_read_parse_error(): + """Test serial connector handles parse error gracefully.""" + mock_serial = MagicMock() + mock_serial.readline.return_value = b"SMOKE:not_a_number,GAS:bad\n" + with patch( + "inputs.plugins.smoke_gas_detector._serial.Serial", + return_value=mock_serial, + ): + connector = _SerialSmokeConnector("/dev/ttyUSB0", 9600, 1.0) + result = await connector.read() + assert result is None + + +def test_serial_connector_stop(): + """Test serial connector stop closes port.""" + mock_serial = MagicMock() + mock_serial.is_open = True + with patch( + "inputs.plugins.smoke_gas_detector._serial.Serial", + return_value=mock_serial, + ): + connector = _SerialSmokeConnector("/dev/ttyUSB0", 9600, 1.0) + connector.stop() + mock_serial.close.assert_called_once() + + +def test_initialization_mock_connector(): + """Test SmokeGasDetector initializes with mock connector.""" + with patch("inputs.plugins.smoke_gas_detector.IOProvider"): + config = SmokeGasDetectorConfig(connector="mock") + sensor = SmokeGasDetector(config=config) + assert isinstance(sensor._connector, _MockSmokeConnector) + assert sensor.messages == [] + assert sensor.descriptor_for_LLM == "Smoke and Gas Detector" + + +def test_initialization_unknown_connector_falls_back_to_mock(): + """Test unknown connector falls back to mock.""" + with patch("inputs.plugins.smoke_gas_detector.IOProvider"): + config = SmokeGasDetectorConfig(connector="unknown_hw") + sensor = SmokeGasDetector(config=config) + assert isinstance(sensor._connector, _MockSmokeConnector) + + +def test_initialization_serial_connector(): + """Test SmokeGasDetector initializes with serial connector.""" + mock_serial = MagicMock() + with ( + patch("inputs.plugins.smoke_gas_detector.IOProvider"), + patch( + "inputs.plugins.smoke_gas_detector._serial.Serial", + return_value=mock_serial, + ), + ): + config = SmokeGasDetectorConfig(connector="serial", port="/dev/ttyUSB0") + sensor = SmokeGasDetector(config=config) + assert isinstance(sensor._connector, _SerialSmokeConnector) + + +def test_initialization_ens160_connector(): + """Test SmokeGasDetector initializes with ENS160 connector.""" + import inputs.plugins.smoke_gas_detector as mod + from inputs.plugins.smoke_gas_detector import _ENS160Connector + + original = mod._ENS160_AVAILABLE + mod._ENS160_AVAILABLE = False + with patch("inputs.plugins.smoke_gas_detector.IOProvider"): + config = SmokeGasDetectorConfig(connector="i2c_ens160") + sensor = SmokeGasDetector(config=config) + assert isinstance(sensor._connector, _ENS160Connector) + mod._ENS160_AVAILABLE = original + + +def test_initialization_sgp30_connector(): + """Test SmokeGasDetector initializes with SGP30 connector.""" + import inputs.plugins.smoke_gas_detector as mod + from inputs.plugins.smoke_gas_detector import _SGP30Connector + + original = mod._SGP30_AVAILABLE + mod._SGP30_AVAILABLE = False + with patch("inputs.plugins.smoke_gas_detector.IOProvider"): + config = SmokeGasDetectorConfig(connector="i2c_sgp30") + sensor = SmokeGasDetector(config=config) + assert isinstance(sensor._connector, _SGP30Connector) + mod._SGP30_AVAILABLE = original + + +@pytest.mark.asyncio +async def test_poll_returns_reading(): + """Test _poll returns smoke/gas reading from connector.""" + with patch("inputs.plugins.smoke_gas_detector.IOProvider"): + config = SmokeGasDetectorConfig(connector="mock", mock_scenario="normal") + sensor = SmokeGasDetector(config=config) + with patch("inputs.plugins.smoke_gas_detector.asyncio.sleep", new=AsyncMock()): + result = await sensor._poll() + assert result is not None + assert isinstance(result, SmokeGasReading) + + +@pytest.mark.asyncio +async def test_poll_returns_none_when_connector_fails(): + """Test _poll returns None when connector returns None.""" + with patch("inputs.plugins.smoke_gas_detector.IOProvider"): + config = SmokeGasDetectorConfig(connector="mock") + sensor = SmokeGasDetector(config=config) + sensor._connector = MagicMock() + sensor._connector.read = AsyncMock(return_value=None) + with patch("inputs.plugins.smoke_gas_detector.asyncio.sleep", new=AsyncMock()): + result = await sensor._poll() + assert result is None + + +def test_classify_danger(): + """Test _classify returns danger for critical ppm.""" + with patch("inputs.plugins.smoke_gas_detector.IOProvider"): + config = SmokeGasDetectorConfig(connector="mock", smoke_danger_threshold=600) + sensor = SmokeGasDetector(config=config) + reading = SmokeGasReading(smoke_ppm=750.0, gas_ppm=40.0) + assert sensor._classify(reading) == "danger" + + +def test_classify_danger_via_gas(): + """Test _classify returns danger when gas exceeds threshold.""" + with patch("inputs.plugins.smoke_gas_detector.IOProvider"): + config = SmokeGasDetectorConfig(connector="mock", gas_danger_threshold=600) + sensor = SmokeGasDetector(config=config) + reading = SmokeGasReading(smoke_ppm=50.0, gas_ppm=700.0) + assert sensor._classify(reading) == "danger" + + +def test_classify_warning(): + """Test _classify returns warning for elevated ppm.""" + with patch("inputs.plugins.smoke_gas_detector.IOProvider"): + config = SmokeGasDetectorConfig( + connector="mock", + smoke_warning_threshold=300, + smoke_danger_threshold=600, + ) + sensor = SmokeGasDetector(config=config) + reading = SmokeGasReading(smoke_ppm=400.0, gas_ppm=40.0) + assert sensor._classify(reading) == "warning" + + +def test_classify_normal(): + """Test _classify returns normal for low ppm.""" + with patch("inputs.plugins.smoke_gas_detector.IOProvider"): + config = SmokeGasDetectorConfig(connector="mock") + sensor = SmokeGasDetector(config=config) + reading = SmokeGasReading(smoke_ppm=50.0, gas_ppm=40.0) + assert sensor._classify(reading) == "normal" + + +@pytest.mark.asyncio +async def test_raw_to_text_danger(): + """Test _raw_to_text returns danger alert message.""" + with ( + patch("inputs.plugins.smoke_gas_detector.IOProvider"), + patch("inputs.plugins.smoke_gas_detector.time.time", return_value=1000.0), + ): + config = SmokeGasDetectorConfig(connector="mock", smoke_danger_threshold=600) + sensor = SmokeGasDetector(config=config) + reading = SmokeGasReading(smoke_ppm=750.0, gas_ppm=700.0) + result = await sensor._raw_to_text(reading) + assert result is not None + assert "SMOKE ALERT" in result.message + assert "750" in result.message + + +@pytest.mark.asyncio +async def test_raw_to_text_warning(): + """Test _raw_to_text returns warning message.""" + with ( + patch("inputs.plugins.smoke_gas_detector.IOProvider"), + patch("inputs.plugins.smoke_gas_detector.time.time", return_value=1000.0), + ): + config = SmokeGasDetectorConfig( + connector="mock", + smoke_warning_threshold=300, + smoke_danger_threshold=600, + ) + sensor = SmokeGasDetector(config=config) + reading = SmokeGasReading(smoke_ppm=400.0, gas_ppm=40.0) + result = await sensor._raw_to_text(reading) + assert result is not None + assert "SMOKE WARNING" in result.message + + +@pytest.mark.asyncio +async def test_raw_to_text_normal(): + """Test _raw_to_text returns normal message for low ppm.""" + with ( + patch("inputs.plugins.smoke_gas_detector.IOProvider"), + patch("inputs.plugins.smoke_gas_detector.time.time", return_value=1000.0), + ): + config = SmokeGasDetectorConfig(connector="mock") + sensor = SmokeGasDetector(config=config) + reading = SmokeGasReading(smoke_ppm=50.0, gas_ppm=40.0) + result = await sensor._raw_to_text(reading) + assert result is not None + assert "Air quality normal" in result.message + + +@pytest.mark.asyncio +async def test_raw_to_text_none_input(): + """Test _raw_to_text returns None for None input.""" + with patch("inputs.plugins.smoke_gas_detector.IOProvider"): + config = SmokeGasDetectorConfig(connector="mock") + sensor = SmokeGasDetector(config=config) + result = await sensor._raw_to_text(None) + assert result is None + + +@pytest.mark.asyncio +async def test_raw_to_text_cooldown_suppresses_danger(): + """Test cooldown suppresses repeated danger alerts.""" + with patch("inputs.plugins.smoke_gas_detector.IOProvider"): + config = SmokeGasDetectorConfig( + connector="mock", cooldown=5.0, smoke_danger_threshold=600 + ) + sensor = SmokeGasDetector(config=config) + reading = SmokeGasReading(smoke_ppm=750.0, gas_ppm=700.0) + sensor._last_alert_time = 1000.0 + with patch("inputs.plugins.smoke_gas_detector.time.time", return_value=1001.0): + result = await sensor._raw_to_text(reading) + assert result is None + + +@pytest.mark.asyncio +async def test_raw_to_text_cooldown_allows_after_expiry(): + """Test danger alert allowed again after cooldown expires.""" + with patch("inputs.plugins.smoke_gas_detector.IOProvider"): + config = SmokeGasDetectorConfig( + connector="mock", cooldown=5.0, smoke_danger_threshold=600 + ) + sensor = SmokeGasDetector(config=config) + reading = SmokeGasReading(smoke_ppm=750.0, gas_ppm=700.0) + sensor._last_alert_time = 1000.0 + with patch("inputs.plugins.smoke_gas_detector.time.time", return_value=1006.0): + result = await sensor._raw_to_text(reading) + assert result is not None + + +@pytest.mark.asyncio +async def test_raw_to_text_cooldown_suppresses_warning(): + """Test cooldown suppresses repeated warning messages.""" + with patch("inputs.plugins.smoke_gas_detector.IOProvider"): + config = SmokeGasDetectorConfig( + connector="mock", + cooldown=5.0, + smoke_warning_threshold=300, + smoke_danger_threshold=600, + ) + sensor = SmokeGasDetector(config=config) + reading = SmokeGasReading(smoke_ppm=400.0, gas_ppm=40.0) + sensor._last_alert_time = 1000.0 + with patch("inputs.plugins.smoke_gas_detector.time.time", return_value=1001.0): + result = await sensor._raw_to_text(reading) + assert result is None + + +@pytest.mark.asyncio +async def test_raw_to_text_updates_messages(): + """Test raw_to_text appends to messages buffer.""" + with ( + patch("inputs.plugins.smoke_gas_detector.IOProvider"), + patch("inputs.plugins.smoke_gas_detector.time.time", return_value=1000.0), + ): + config = SmokeGasDetectorConfig(connector="mock") + sensor = SmokeGasDetector(config=config) + reading = SmokeGasReading(smoke_ppm=50.0, gas_ppm=40.0) + await sensor.raw_to_text(reading) + assert len(sensor.messages) == 1 + + +def test_formatted_latest_buffer_with_messages(): + """Test formatted_latest_buffer returns formatted string and clears buffer.""" + with patch("inputs.plugins.smoke_gas_detector.IOProvider"): + config = SmokeGasDetectorConfig(connector="mock") + sensor = SmokeGasDetector(config=config) + sensor.io_provider = MagicMock() + sensor.messages = [ + Message( + timestamp=1000.0, + message="Smoke/gas detector: Air quality normal. Smoke: 50 ppm, Gas: 40 ppm.", + ) + ] + result = sensor.formatted_latest_buffer() + assert result is not None + assert "Smoke and Gas Detector" in result + assert "Air quality normal" in result + sensor.io_provider.add_input.assert_called_once() + assert len(sensor.messages) == 0 + + +def test_formatted_latest_buffer_empty(): + """Test formatted_latest_buffer returns None when buffer is empty.""" + with patch("inputs.plugins.smoke_gas_detector.IOProvider"): + config = SmokeGasDetectorConfig(connector="mock") + sensor = SmokeGasDetector(config=config) + result = sensor.formatted_latest_buffer() + assert result is None + + +def test_stop_calls_connector_stop(): + """Test stop calls connector stop method.""" + with patch("inputs.plugins.smoke_gas_detector.IOProvider"): + config = SmokeGasDetectorConfig(connector="mock") + sensor = SmokeGasDetector(config=config) + sensor._connector = MagicMock() + sensor.stop() + sensor._connector.stop.assert_called_once() + assert sensor.messages == [] + + +def test_ens160_connector_init_library_unavailable(): + """Test ENS160 connector handles missing library.""" + import inputs.plugins.smoke_gas_detector as mod + from inputs.plugins.smoke_gas_detector import _ENS160Connector + + original = mod._ENS160_AVAILABLE + mod._ENS160_AVAILABLE = False + connector = _ENS160Connector() + assert connector._ready is False + mod._ENS160_AVAILABLE = original + + +@pytest.mark.asyncio +async def test_ens160_connector_read_not_ready(): + """Test ENS160 connector returns None when not ready.""" + import inputs.plugins.smoke_gas_detector as mod + from inputs.plugins.smoke_gas_detector import _ENS160Connector + + original = mod._ENS160_AVAILABLE + mod._ENS160_AVAILABLE = False + connector = _ENS160Connector() + result = await connector.read() + assert result is None + mod._ENS160_AVAILABLE = original + + +@pytest.mark.asyncio +async def test_ens160_connector_init_success(): + """Test ENS160 connector initializes with mocked hardware.""" + import inputs.plugins.smoke_gas_detector as mod + from inputs.plugins.smoke_gas_detector import _ENS160Connector + + original = mod._ENS160_AVAILABLE + original_lib = mod._ens160_lib + mock_lib = MagicMock() + mock_sensor = MagicMock() + mock_lib.ENS160.return_value = mock_sensor + mod._ENS160_AVAILABLE = True + mod._ens160_lib = mock_lib + with patch.dict( + "sys.modules", + {"board": MagicMock(), "busio": MagicMock(I2C=MagicMock())}, + ): + connector = _ENS160Connector() + assert connector._ready is True + mod._ENS160_AVAILABLE = original + mod._ens160_lib = original_lib + + +@pytest.mark.asyncio +async def test_ens160_connector_read_success(): + """Test ENS160 connector reads successfully.""" + import inputs.plugins.smoke_gas_detector as mod + from inputs.plugins.smoke_gas_detector import _ENS160Connector + + original = mod._ENS160_AVAILABLE + original_lib = mod._ens160_lib + mock_lib = MagicMock() + mock_sensor = MagicMock() + mock_sensor.TVOC = 120 + mock_sensor.eCO2 = 450 + mock_lib.ENS160.return_value = mock_sensor + mod._ENS160_AVAILABLE = True + mod._ens160_lib = mock_lib + with patch.dict( + "sys.modules", + {"board": MagicMock(), "busio": MagicMock(I2C=MagicMock())}, + ): + connector = _ENS160Connector() + result = await connector.read() + assert result is not None + assert result.smoke_ppm == 120.0 + mod._ENS160_AVAILABLE = original + mod._ens160_lib = original_lib + + +@pytest.mark.asyncio +async def test_ens160_connector_read_exception(): + """Test ENS160 connector handles read exception.""" + import inputs.plugins.smoke_gas_detector as mod + from inputs.plugins.smoke_gas_detector import _ENS160Connector + + original = mod._ENS160_AVAILABLE + original_lib = mod._ens160_lib + mock_lib = MagicMock() + mock_sensor = MagicMock() + type(mock_sensor).TVOC = property( + lambda self: (_ for _ in ()).throw(Exception("Read error")) + ) + mock_lib.ENS160.return_value = mock_sensor + mod._ENS160_AVAILABLE = True + mod._ens160_lib = mock_lib + with patch.dict( + "sys.modules", + {"board": MagicMock(), "busio": MagicMock(I2C=MagicMock())}, + ): + connector = _ENS160Connector() + result = await connector.read() + assert result is None + mod._ENS160_AVAILABLE = original + mod._ens160_lib = original_lib + + +def test_ens160_connector_stop(): + """Test ENS160 connector stop.""" + import inputs.plugins.smoke_gas_detector as mod + from inputs.plugins.smoke_gas_detector import _ENS160Connector + + original = mod._ENS160_AVAILABLE + mod._ENS160_AVAILABLE = False + connector = _ENS160Connector() + connector.stop() + assert connector._ready is False + mod._ENS160_AVAILABLE = original + + +def test_sgp30_connector_init_library_unavailable(): + """Test SGP30 connector handles missing library.""" + import inputs.plugins.smoke_gas_detector as mod + from inputs.plugins.smoke_gas_detector import _SGP30Connector + + original = mod._SGP30_AVAILABLE + mod._SGP30_AVAILABLE = False + connector = _SGP30Connector() + assert connector._ready is False + mod._SGP30_AVAILABLE = original + + +@pytest.mark.asyncio +async def test_sgp30_connector_read_not_ready(): + """Test SGP30 connector returns None when not ready.""" + import inputs.plugins.smoke_gas_detector as mod + from inputs.plugins.smoke_gas_detector import _SGP30Connector + + original = mod._SGP30_AVAILABLE + mod._SGP30_AVAILABLE = False + connector = _SGP30Connector() + result = await connector.read() + assert result is None + mod._SGP30_AVAILABLE = original + + +@pytest.mark.asyncio +async def test_sgp30_connector_init_success(): + """Test SGP30 connector initializes with mocked hardware.""" + import inputs.plugins.smoke_gas_detector as mod + from inputs.plugins.smoke_gas_detector import _SGP30Connector + + original = mod._SGP30_AVAILABLE + original_lib = mod._sgp30_lib + mock_lib = MagicMock() + mock_sensor = MagicMock() + mock_lib.Adafruit_SGP30.return_value = mock_sensor + mod._SGP30_AVAILABLE = True + mod._sgp30_lib = mock_lib + with patch.dict( + "sys.modules", + {"board": MagicMock(), "busio": MagicMock(I2C=MagicMock())}, + ): + connector = _SGP30Connector() + assert connector._ready is True + mod._SGP30_AVAILABLE = original + mod._sgp30_lib = original_lib + + +@pytest.mark.asyncio +async def test_sgp30_connector_read_success(): + """Test SGP30 connector reads successfully.""" + import inputs.plugins.smoke_gas_detector as mod + from inputs.plugins.smoke_gas_detector import _SGP30Connector + + original = mod._SGP30_AVAILABLE + original_lib = mod._sgp30_lib + mock_lib = MagicMock() + mock_sensor = MagicMock() + mock_sensor.TVOC = 80 + mock_sensor.eCO2 = 400 + mock_lib.Adafruit_SGP30.return_value = mock_sensor + mod._SGP30_AVAILABLE = True + mod._sgp30_lib = mock_lib + with patch.dict( + "sys.modules", + {"board": MagicMock(), "busio": MagicMock(I2C=MagicMock())}, + ): + connector = _SGP30Connector() + result = await connector.read() + assert result is not None + assert result.smoke_ppm == 80.0 + mod._SGP30_AVAILABLE = original + mod._sgp30_lib = original_lib + + +@pytest.mark.asyncio +async def test_sgp30_connector_read_exception(): + """Test SGP30 connector handles read exception.""" + import inputs.plugins.smoke_gas_detector as mod + from inputs.plugins.smoke_gas_detector import _SGP30Connector + + original = mod._SGP30_AVAILABLE + original_lib = mod._sgp30_lib + mock_lib = MagicMock() + mock_sensor = MagicMock() + type(mock_sensor).TVOC = property( + lambda self: (_ for _ in ()).throw(Exception("Read error")) + ) + mock_lib.Adafruit_SGP30.return_value = mock_sensor + mod._SGP30_AVAILABLE = True + mod._sgp30_lib = mock_lib + with patch.dict( + "sys.modules", + {"board": MagicMock(), "busio": MagicMock(I2C=MagicMock())}, + ): + connector = _SGP30Connector() + result = await connector.read() + assert result is None + mod._SGP30_AVAILABLE = original + mod._sgp30_lib = original_lib + + +def test_sgp30_connector_stop(): + """Test SGP30 connector stop.""" + import inputs.plugins.smoke_gas_detector as mod + from inputs.plugins.smoke_gas_detector import _SGP30Connector + + original = mod._SGP30_AVAILABLE + mod._SGP30_AVAILABLE = False + connector = _SGP30Connector() + connector.stop() + assert connector._ready is False + mod._SGP30_AVAILABLE = original + + +@pytest.mark.asyncio +async def test_ens160_connector_init_exception(): + """Test ENS160 connector handles init exception gracefully.""" + import inputs.plugins.smoke_gas_detector as mod + from inputs.plugins.smoke_gas_detector import _ENS160Connector + + original = mod._ENS160_AVAILABLE + original_lib = mod._ens160_lib + mock_lib = MagicMock() + mock_lib.ENS160.side_effect = Exception("I2C error") + mod._ENS160_AVAILABLE = True + mod._ens160_lib = mock_lib + with patch.dict( + "sys.modules", + {"board": MagicMock(), "busio": MagicMock(I2C=MagicMock())}, + ): + connector = _ENS160Connector() + assert connector._ready is False + mod._ENS160_AVAILABLE = original + mod._ens160_lib = original_lib + + +@pytest.mark.asyncio +async def test_sgp30_connector_init_exception(): + """Test SGP30 connector handles init exception gracefully.""" + import inputs.plugins.smoke_gas_detector as mod + from inputs.plugins.smoke_gas_detector import _SGP30Connector + + original = mod._SGP30_AVAILABLE + original_lib = mod._sgp30_lib + mock_lib = MagicMock() + mock_lib.Adafruit_SGP30.side_effect = Exception("I2C error") + mod._SGP30_AVAILABLE = True + mod._sgp30_lib = mock_lib + with patch.dict( + "sys.modules", + {"board": MagicMock(), "busio": MagicMock(I2C=MagicMock())}, + ): + connector = _SGP30Connector() + assert connector._ready is False + mod._SGP30_AVAILABLE = original + mod._sgp30_lib = original_lib