Source code for gort.devices.spec

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2023-06-15
# @Filename: spec.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio

from typing import TYPE_CHECKING, Sequence

from gort.devices.core import GortDevice, GortDeviceSet
from gort.exceptions import ErrorCode, GortError, GortSpecError
from gort.exposure import Exposure


if TYPE_CHECKING:
    from gort.gort import Gort
    from gort.remote import ActorReply


__all__ = ["Spectrograph", "SpectrographSet", "IEB"]


[docs] class IEB(GortDevice): """A class representing an Instrument Electronics Box.""" def __init__(self, gort: Gort, name: str, actor: str): super().__init__(gort, name, actor) self.spec_name = self.name.split(".")[1]
[docs] async def status(self): """Returns the status of the IEB.""" replies: list[ActorReply] = await asyncio.gather( self.actor.commands.shutter.commands.status(), self.actor.commands.hartmann.commands.status(), self.actor.commands.transducer.commands.status(), self.actor.commands.wago.commands.status(), self.actor.commands.wago.commands.getpower(), return_exceptions=True, ) status = {} for reply in replies: if isinstance(reply, Exception): self.write_to_log(str(reply), "warning") continue flat_reply = reply.flatten() if "transducer" in flat_reply: flat_reply = {f"{self.spec_name}_pressures": flat_reply["transducer"]} status.update(flat_reply) return status
[docs] async def power(self, devices: str | list[str], on: bool = True): """Powers on/off the shutter or Hartmann doors. Parameters ---------- device The device to power on/off. Either ``'shutter'``, ``'hartmann_left'``, or ``'hartmann_right'``. Can be a list of devices to modify. on If :obj:`True` powers on the device; otherwise powers it down. """ if isinstance(devices, str): devices = [devices] tasks = [] for device in devices: if device in ["hl", "left"]: device = "hartmann_left" elif device in ["hr", "right"]: device = "hartmann_right" if device not in ["shutter", "hartmann_left", "hartmann_right"]: raise GortSpecError( f"Invalid device {device}.", error_code=ErrorCode.USAGE_ERROR, ) self.write_to_log(f"Powering {'on' if on else 'off'} {device}.", "info") tasks.append( self.actor.commands.wago.commands.setpower( device, action="ON" if on else "OFF", ) ) await asyncio.gather(*tasks)
[docs] async def do(self, devices: str | list[str], action: str): """Performs an action on a device. Powers the device if needed. Parameters ---------- device The device to act on. Either ``'shutter'``, ``'hartmann_left'``, or ``'hartmann_right'``. Can be a list of devices to modify. action The action to perform. Can be ``'open'``, ``'close'``, ``'home'``, or ``'init'``. """ if isinstance(devices, str): devices = [devices] if action not in ["open", "close", "home", "init"]: raise GortSpecError( f"Invalid action {action}.", error_code=ErrorCode.USAGE_ERROR, ) status = await self.status() tasks = [] hartmann_done = False for device in devices: if device in ["hl", "left"]: device = "hartmann_left" elif device in ["hr", "right"]: device = "hartmann_right" if device not in ["shutter", "hartmann_left", "hartmann_right"]: raise GortSpecError( f"Invalid device {device}.", error_code=ErrorCode.USAGE_ERROR, ) self.write_to_log(f"Performing {action!r} on {device}.", "info") if status[f"{self.spec_name}_relays"][device] is False: self.write_to_log(f"Device {device} is off. Powering it on.", "warning") await self.power(device) if "hartmann" in device: command = getattr(self.actor.commands.hartmann.commands, action) if action in ["home", "init"]: if hartmann_done: # Avoid homing/initialising the HD twice. continue tasks.append(command()) hartmann_done = True else: tasks.append(command(side=device.split("_")[1])) else: command = getattr(self.actor.commands.shutter.commands, action) tasks.append(command()) await asyncio.gather(*tasks)
[docs] async def open(self, devices: str | list[str]): """Opens a device or list of devices. Parameters ---------- device The device to act on. Either ``'shutter'``, ``'hartmann_left'``, or ``'hartmann_right'``. Can be a list of devices to open. """ await self.do(devices, "open")
[docs] async def close(self, devices: str | list[str]): """Closes a device or list of devices. Parameters ---------- device The device to act on. Either ``'shutter'``, ``'hartmann_left'``, or ``'hartmann_right'``. Can be a list of devices to close. """ await self.do(devices, "close")
[docs] async def home(self, devices: str | list[str]): """Homes a device or list of devices. Parameters ---------- device The device to act on. Either ``'shutter'``, ``'hartmann_left'``, or ``'hartmann_right'``. Can be a list of devices to home. """ await self.do(devices, "home")
[docs] async def init(self, devices: str | list[str], home: bool = True): """Initialises a device or list of devices. Parameters ---------- device The device to act on. Either ``'shutter'``, ``'hartmann_left'``, or ``'hartmann_right'``. Can be a list of devices to initialise. home If :obj:`True` homes the devices after initialising them. """ await self.do(devices, "init") if home: await self.do(devices, "home")
[docs] class Spectrograph(GortDevice): """Class representing an LVM spectrograph functionality.""" def __init__(self, gort: Gort, name: str, actor: str, **kwargs): super().__init__(gort, name, actor) self.nps = self.gort.nps[name] self.ieb = IEB(gort, f"ieb.{self.name}", f"lvmieb.{self.name}")
[docs] async def status(self, simple: bool = False): """Retrieves the status of the telescope. Parameters ---------- simple If :obj:`True` returns a short version of the status. """ reply: ActorReply = await self.actor.commands.status(simple=simple) flatten_reply = reply.flatten() return flatten_reply.get("status", {})
[docs] async def is_idle(self): """Returns :obj:`True` if the spectrograph is idle and ready to expose.""" status = await self.status(simple=True) names = status["status_names"] for status in ["READOUT_PENDING", "ERROR"]: if status in names: return False return "IDLE" in names
[docs] async def is_errored(self): """Returns :obj:`True` if the spectrograph is in error.""" status = await self.status(simple=True) return "ERROR" in status["status_names"]
[docs] async def is_exposing(self): """Returns :obj:`True` if the spectrograph is exposing.""" status = await self.status(simple=True) return "EXPOSING" in status["status_names"]
[docs] async def is_reading(self): """Returns :obj:`True` if the spectrograph is idle and ready to expose.""" status = await self.status(simple=True) return "READING" in status["status_names"]
[docs] async def initialise(self): """Initialises the spectrograph and flashes the ACF configuration file.""" self.write_to_log("Initialising spectrograph and flashing ACF.", "info") await self.actor.commands.init()
[docs] async def abort(self): """Aborts an ongoing exposure.""" self.write_to_log("Aborting exposures.", "warning") try: await self.actor.commands.abort() except GortError: pass await self.actor.commands.reset() self.write_to_log("Closing shutter.") await self.ieb.close("shutter")
[docs] async def reset(self, full: bool = False): """Resets the spectrograph to a valid state.""" await self.actor.commands.reset() mech_status = await self.ieb.status() # Always try to close the shutter. If this fails, try a full reset in # part to give the shutter another chance but also because it could be # that it failed because the power to the motor controllers is off. if full is False and mech_status[f"{self.name}_shutter"]["open"]: self.write_to_log(f"Closing {self.name} shutter.", "warning") try: await self.ieb.close("shutter") except Exception as ee: self.write_to_log(f"Failed closing the shutter: {ee}", "warning") self.write_to_log("Trying a full reset.", "warning") full = True if full: relays = mech_status[f"{self.name}_relays"] if not all(relays.values()): self.write_to_log(f"Powering on {self.name} relays.", "warning") await self.ieb.power(list(relays.keys())) mech_status = await self.ieb.status() if mech_status[f"{self.name}_shutter"]["open"]: self.write_to_log(f"Closing {self.name} shutter.", "warning") await self.ieb.close("shutter") for side in ["left", "right"]: if not mech_status[f"{self.name}_hartmann_{side}"]["open"]: self.write_to_log(f"Opening {self.name} {side} HD.", "warning") await self.ieb.open(f"hartmann_{side}")
[docs] async def expose(self, **kwargs): """Exposes the spectrograph.""" if not (await self.is_idle()): raise GortSpecError( "Spectrographs is not idle. Cannot expose.", error_code=ErrorCode.SECTROGRAPH_FAILED_EXPOSING, ) self.write_to_log(f"Exposing spectrograph {self.name}.") await self.actor.commands.expose(**kwargs)
[docs] class SpectrographSet(GortDeviceSet[Spectrograph]): """A set of LVM spectrographs.""" __DEVICE_CLASS__ = Spectrograph __DEPLOYMENTS__ = ["lvmscp"] def __init__(self, gort: Gort, data: dict[str, dict], **kwargs): super().__init__(gort, data, **kwargs) self.last_exposure: Exposure | None = None
[docs] async def status(self, simple: bool = False) -> dict[str, dict]: """Collects the status of each spectrograph. Parameters ---------- simple If :obj:`True` returns a short version of the status. """ names = list(self) statuses = await self.call_device_method(Spectrograph.status, simple=simple) return dict(zip(names, statuses))
[docs] def get_expno(self): """Returns the next exposure sequence number.""" next_exposure_number_path = self.gort.config["specs"]["nextExposureNumber"] with open(next_exposure_number_path, "r") as fd: data = fd.read().strip() expno = int(data) if data != "" else 1 return expno
[docs] async def are_idle(self): """Returns :obj:`True` if all the spectrographs are idle and ready to expose.""" return all(await self.call_device_method(Spectrograph.is_idle))
[docs] async def are_reading(self): """Returns :obj:`True` if any of the spectrographs are reading.""" return any(await self.call_device_method(Spectrograph.is_reading))
[docs] async def are_exposing(self): """Returns :obj:`True` if any of the spectrographs are exposing.""" return any(await self.call_device_method(Spectrograph.is_exposing))
[docs] async def are_errored(self): """Returns :obj:`True` if any of the spectrographs are errored.""" return any(await self.call_device_method(Spectrograph.is_errored))
[docs] async def expose( self, exposure_time: float | None = None, flavour: str | None = None, header: dict | None = None, show_progress: bool | None = None, async_readout: bool = False, count: int = 1, object: str | None = None, specs: Sequence[str] | None = None, **kwargs, ) -> Exposure | list[Exposure]: """Exposes the spectrographs. Parameters ---------- exposure_time The exposure time. If not set, assumes this must be a bias. flavour The exposure type, either ``'object'``, ``'arc'``, ``'flat'``, ``'dark'``, or ``'bias'`` header Additional data to add to the headers. show_progress Displays a progress bar with the elapsed exposure time. If :obj:`None` (the default), will show the progress bar only in interactive sessions. async_readout Returns after integration completes. Readout is initiated but handled asynchronously and can be await by awaiting the returned :obj:`.Exposure` object. count The number of exposures to take. object A string that will be stored in the ``OBJECT`` header keyword. specs List the spectrographs to expose. Defaults to all. Returns ------- exp_nos The numbers of the exposed frames. If ``count`` is greater than one, returns a list of exposures. """ if self.last_exposure is not None and not self.last_exposure.done(): self.write_to_log("Waiting for previous exposure to read out.", "warning") await self.last_exposure if not (await self.are_idle()): raise GortSpecError( "Spectrographs are not idle. Cannot expose.", error_code=ErrorCode.SECTROGRAPH_NOT_IDLE, ) if count <= 0: raise GortSpecError("Invalid count.", error_code=ErrorCode.USAGE_ERROR) if exposure_time is None or exposure_time == 0.0: flavour = "bias" exposure_time = 0.0 exposures: list[Exposure] = [] for nexp in range(int(count)): exposure = Exposure(self.gort, flavour=flavour, object=object, specs=specs) async_readout_this_exp = async_readout if nexp == int(count) - 1 else False await exposure.expose( exposure_time=exposure_time, header=header, async_readout=async_readout_this_exp, show_progress=show_progress, **kwargs, ) exposures.append(exposure) if async_readout: self.write_to_log("Returning with async readout ongoing.") if len(exposures) == 1: return exposures[0] return exposures
[docs] async def reset(self, full: bool = False): """Reset the spectrographs.""" await self.call_device_method(Spectrograph.reset, full=full) self.last_exposure = None
[docs] async def initialise(self): """Initialises the spectrographs and flashes the ACF configuration file.""" await self.call_device_method(Spectrograph.initialise)
[docs] async def abort(self): """Aborts an ongoing exposure.""" await self.call_device_method(Spectrograph.abort) self.last_exposure = None