Source code for gort.devices.enclosure

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2023-06-18
# @Filename: enclosure.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio

from typing import TYPE_CHECKING, Literal

from lvmopstools import Retrier

from gort.devices.core import GortDevice, GortDeviceSet
from gort.enums import Event
from gort.exceptions import ErrorCode, GortEnclosureError, GortTelescopeError
from gort.gort import Gort


if TYPE_CHECKING:
    from gort import Gort


if TYPE_CHECKING:
    from gort import ActorReply


__all__ = ["Enclosure", "Lights", "Light"]


[docs] class Light: """An enclosure light.""" def __init__(self, enclosure: Enclosure, name: str): self.enclosure = enclosure self.name = name
[docs] @Retrier(max_attempts=3, delay=1) async def on(self): """Turns on the light.""" status = await self.status() if status is False: self.enclosure.write_to_log(f"Turning on light {self.name!r}.", "info") await self.enclosure.actor.commands.lights("on", self.name)
[docs] @Retrier(max_attempts=3, delay=1) async def off(self): """Turns on the light.""" status = await self.status() if status is True: self.enclosure.write_to_log(f"Turning off light {self.name!r}.", "info") await self.enclosure.actor.commands.lights("off", self.name)
[docs] @Retrier(max_attempts=3, delay=1) async def toggle(self): """Turns on the light.""" self.enclosure.write_to_log(f"Toggling {self.name!r}.", "info") await self.enclosure.actor.commands.lights("toggle", self.name)
[docs] @Retrier(max_attempts=3, delay=1) async def status(self) -> bool: """Returns a boolean with the light status.""" status = await self.enclosure.actor.commands.lights("status") labels = status.get("lights_labels") if labels is None: raise GortEnclosureError("Did not receive lights status.") return self.name.upper() in labels
[docs] class Lights: """Controls the enclosure lights.""" LIGHTS = [ "telescope_bright", "telescope_red", "spectrograph_room", "utilities_room", ] def __init__(self, enclosure: Enclosure): for light in self.LIGHTS: self.__setattr__(light, Light(enclosure, light)) def __repr__(self): return f"<Lights ({', '.join(self.LIGHTS)})>"
[docs] @Retrier(max_attempts=3, delay=1) async def dome_all_off(self): """Turns off all the lights in the dome.""" await asyncio.gather(self.telescope_bright.off(), self.telescope_red.off())
if TYPE_CHECKING: def __getattr__(self, light: str) -> Light: ...
class E_Stops: """Reports status of the emergency stops buttons and allows to reset the relays.""" def __init__(self, enclosure: Enclosure): self.enclosure = enclosure self.ecp = enclosure.actor self.gort = enclosure.gort @Retrier(max_attempts=3, delay=0.5) async def status(self): """Returns :obj:`True` if the emergency stops are pressed.""" status = await self.enclosure.status() labels = status.get("safety_status_labels", "").split(",") if labels is None: raise GortEnclosureError("Cannot determine the status of the e-stops.") return "E_STOP" in labels @Retrier(max_attempts=3, delay=0.5) async def trigger(self): """Triggers the e-stop relays.""" try: await self.ecp.commands.emergency_stop() except Exception as err: raise GortEnclosureError(f"Failed to trigger the e-stops: {err}") @Retrier(max_attempts=3, delay=0.5) async def reset(self): """Resets the e-stop relays after the e-stops have been released.""" try: await self.ecp.commands.engineering_mode.commands.reset_e_stops() except Exception as err: raise GortEnclosureError(f"Failed to reset the e-stops: {err}")
[docs] class Enclosure(GortDevice): """Class representing the LVM enclosure.""" __DEPLOYMENTS__ = ["lvmecp"] def __init__(self, gort: Gort, name: str, actor: str, **kwargs): super().__init__(gort, name, actor) self.lights = Lights(self) self.e_stops = E_Stops(self) self._dome_lock = asyncio.Lock()
[docs] async def restart(self): """Restarts the ``lvmecp`` deployment.""" await GortDeviceSet.restart(self) # type: ignore
[docs] @Retrier(max_attempts=3, delay=1) async def status(self, get_registers: bool = False): """Retrieves the status of the power outlet.""" reply: ActorReply = await self.actor.commands.status( no_registers=(not get_registers), timeout=5, ) return reply.flatten()
[docs] async def allowed_to_move(self): """Checks if the dome is allowed to move.""" status: ActorReply = await self.actor.commands.dome.commands.status(timeout=5) labels = status.get("dome_status_labels", default="").split(",") if "DRIVE_ERROR" in labels: return False if "UNKNOWN" in labels: return False if "DRIVE_AVAILABLE" not in labels: return False if await self.is_local(): return False return True
async def _park_telescopes(self): """Moves telescopes to park position before opening/closing the enclosure.""" telescopes = list(self.gort.telescopes) is_parked = await asyncio.gather( *[self.gort.telescopes[tel].is_parked() for tel in telescopes] ) if all(is_parked): return True self.write_to_log( "Moving telescopes to park before operating the dome.", "warning", ) # Check local only once here. if await self.gort.enclosure.is_local(): raise GortTelescopeError( "Cannot move telescope in local mode.", error_code=ErrorCode.CANNOT_MOVE_LOCAL_MODE, ) park_coros = [ self.gort.telescopes[tel].goto_named_position("park", force=True) for itel, tel in enumerate(telescopes) if not is_parked[itel] ] await asyncio.gather(*park_coros) async def _check_dome(self): """Checks if we can operate the dome or raises an exception.""" if await self.allowed_to_move(): return if await self.is_local(): raise GortEnclosureError( "Cannot open the enclosure while in local mode.", error_code=ErrorCode.ENCLOSURE_IN_LOCAL, ) else: raise GortEnclosureError( "Not allowed to move the dome.", error_code=ErrorCode.ENCLOSURE_ERROR, )
[docs] async def open(self, park_telescopes: bool = True): """Open the enclosure dome. Parameters ---------- park_telescopes Move the telescopes to the park position before opening the enclosure to prevent dust or other debris falling on them. """ async with self._dome_lock: if await self.is_open(): self.write_to_log("Dome is already open.", level="info") return await self._check_dome() if park_telescopes: await asyncio.wait_for(self._park_telescopes(), timeout=120) self.write_to_log("Opening the enclosure ...", level="info") await self.gort.notify_event(Event.DOME_OPENING) await self.actor.commands.dome.commands.open() self.write_to_log("Enclosure is now open.", level="info") await self.gort.notify_event(Event.DOME_OPEN)
[docs] async def close( self, park_telescopes: bool = True, force: bool = False, mode: Literal["normal", "overcurrent"] = "normal", ): """Close the enclosure dome. Parameters ---------- park_telescopes Move the telescopes to the park position before closing the enclosure to prevent dust or other debris falling on them. force Tries to closes the dome even if the system believes it is already closed. mode In ``'normal'`` mode the dome will be closed using a combination of rotary encoder and limit switches. With ``'overcurrent'`` the dome is closed until the VFD detects an overcurrent condition. ``'overcurrent'`` should only be used when the normal mode has failed. """ async with self._dome_lock: if not force and await self.is_closed(): self.write_to_log("Dome is already closed.", level="info") return await self._check_dome() if park_telescopes: try: await asyncio.wait_for(self._park_telescopes(), timeout=120) except Exception as err: self.write_to_log( f"Failed parking the telescopes: {err}", "error", exc_info=err, ) if force is False: raise GortEnclosureError( "Not closing without knowing where the telescopes are. " "If you really need to close call again with " "park_telescopes=False and force=True.", ) else: self.write_to_log("Closing dome because force=True", "warning") self.write_to_log("Closing the dome ...", level="info") await self.gort.notify_event(Event.DOME_CLOSING) await self.actor.commands.dome.commands.close( force=force, overcurrent=(mode == "overcurrent"), ) self.write_to_log("Enclosure is now closed.", level="info") await self.gort.notify_event(Event.DOME_CLOSED)
[docs] async def is_open(self): """Returns :obj:`True` if the enclosure is open.""" status = await self.status() labels = status["dome_status_labels"].split(",") return "OPEN" in labels and "MOVING" not in labels
[docs] async def is_closed(self): """Returns :obj:`True` if the enclosure is closed.""" status = await self.status() labels = status["dome_status_labels"].split(",") return "CLOSED" in labels and "MOVING" not in labels
[docs] @Retrier(max_attempts=2, delay=1) async def stop(self): """Stop the enclosure dome.""" self.write_to_log("Stoping the dome.", level="info") await self.actor.commands.dome.commands.stop()
[docs] async def is_local(self): """Returns :obj:`True` if the enclosure is in local mode.""" # This should generally not be on, but it's useful as a way of disabling # the local mode when the lock or door are not working. if self.gort.config["enclosure"].get("bypass_local_mode", False) is True: return False status = await self.status() safety_status_labels = status.get("safety_status_labels", None) if safety_status_labels is None: raise GortEnclosureError( "Cannot determine if enclosure is in local mode.", error_code=ErrorCode.ENCLOSURE_IN_LOCAL, ) return "LOCAL" in safety_status_labels
[docs] async def get_door_status(self): """Returns the status of the door and lock.""" status = await self.status() safety_status_labels = status.get("safety_status_labels", None) if safety_status_labels is None: raise GortEnclosureError( "Cannot determine door status.", error_code=ErrorCode.DOOR_STATUS_FAILED, ) reply = { "door_closed": "DOOR_CLOSED" in safety_status_labels, "door_locked": "DOOR_LOCKED" in safety_status_labels, "local": await self.is_local(), } return reply