Source code for gort.exposure

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2023-08-11
# @Filename: exposure.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import json
import pathlib
import re
import warnings
from collections import defaultdict

from typing import TYPE_CHECKING, Any, Callable, Coroutine, Sequence

from astropy.io import fits
from astropy.time import Time
from rich.progress import BarColumn, MofNCompleteColumn, Progress, TextColumn

from sdsstools.time import get_sjd

from gort.exceptions import ErrorCode, GortSpecError
from gort.tools import (
    cancel_task,
    decap,
    get_md5sum,
    get_md5sum_from_spectro,
    insert_to_database,
    is_interactive,
    is_notebook,
    run_in_executor,
)


if TYPE_CHECKING:
    from gort.gort import Gort


__all__ = ["Exposure", "READOUT_TIME"]


READOUT_TIME = 55

HOOKS_TYPE = defaultdict[
    str,
    list[Callable[[Any], Coroutine]],
]


[docs] class Exposure(asyncio.Future["Exposure"]): """A class representing an exposure taken by a :obj:`.SpectrographSet`. Parameters ---------- gort A `.Gort` instance to communicate with the actors. exp_no The exposure sequence number. If :obj:`None`, the next valid sequence number will be used. flavour The image type. Defaults to ``'object'``. object The object name to be added to the header. specs List the spectrographs to expose. Defaults to all. Attributes ---------- hooks A dictionary of hooks to call in specific steps of the exposure. Each hook must be a list of coroutines to call in that specific situation. All coroutines for a given hook are called concurrently and depending on the hook they may be scheduled as a task and not awaited. Available hooks are: - ``'pre-readout'`` which is called with the header before readout begins; the coroutine can modify the header in place or perform any tasks that should be complete at the end of integration. - ``'post-readout'`` called as a task (not awaited) after the readout is complete. Receives the :obj:`.Exposure` object. To add a coroutine to a hook :: async def update_header(header): header.update({'KEY': 1}) exp = Exposure(g) exp.hooks['pre-readout'].append(update_header) """ def __init__( self, gort: Gort, exp_no: int | None = None, flavour: str | None = "object", object: str | None = "", specs: Sequence[str] | None = None, ): self.gort = gort self.specs = gort.specs self.devices = specs self.exp_no = exp_no or self.specs.get_expno() self.flavour = flavour or "object" self.object = object or "" self.start_time = Time.now() self._exposure_time: float | None = None self.error: bool = False self.reading: bool = False self._timer_task: asyncio.Task | None = None self._progress: Progress | None = None self.hooks: HOOKS_TYPE = defaultdict( list, { "exposure-starts": [], "pre-readout": [], "post-readout": [], }, ) if self.flavour not in ["arc", "object", "flat", "bias", "dark"]: raise GortSpecError( "Invalid flavour type.", error_code=ErrorCode.USAGE_ERROR, ) super().__init__() self.add_done_callback(self._when_done) def __repr__(self): return ( f"<Exposure (exp_no={self.exp_no}, flavour={self.flavour}, " f"object={self.object!r} error={self.error}, reading={self.reading}, " f"done={self.done()})>" )
[docs] async def expose( self, exposure_time: float | None = None, header: dict | None = None, async_readout: bool = False, show_progress: bool | None = None, object: str | None = None, raise_on_error: bool = True, ): """Exposes the spectrograph. Parameters ---------- exposure_time The exposure time. header A dictionary with the extra header values.. async_readout Returns after integration completes. Readout is initiated but handled asynchronously and can be await by awaiting the returned :obj:`.Exposure` object. show_progress Displays a progress bar with the elapsed exposure time. If :obj:`None` (the default), will show the progress bar only in interactive sessions. object The object name to be passed to the header. raise_on_error Whether to raise an error when the exposure is marked as errored. """ log = self.specs.write_to_log if self.specs.last_exposure is not None and not self.specs.last_exposure.done(): log("Waiting for previous exposure to read out.", "warning") await self.specs.last_exposure # Check that all specs are idle and not errored. status = await self.specs.status(simple=True) for spec_name, spec_status in status.items(): if self.devices is not None and spec_name not in self.devices: continue if "IDLE" not in spec_status["status_names"]: raise GortSpecError( "Some spectrographs are not IDLE.", error_code=ErrorCode.SECTROGRAPH_NOT_IDLE, ) if "ERROR" in spec_status["status_names"]: raise GortSpecError( "Some spectrographs have ERROR status. " "Solve this manually before exposing.", error_code=ErrorCode.SECTROGRAPH_NOT_IDLE, ) await self.specs.reset() header = header or {} # Set object name for header. if "OBJECT" not in header: if object is not None: header.update({"OBJECT": object}) elif self.object is not None and self.object != "": header.update({"OBJECT": self.object}) elif self.flavour != "object": header.update({"OBJECT": self.flavour}) else: header.update({"OBJECT": ""}) if show_progress is None: show_progress = is_interactive() or is_notebook() if exposure_time is None and self.flavour != "bias": raise GortSpecError( "Exposure time required for all flavours except bias.", error_code=ErrorCode.USAGE_ERROR, ) warnings.filterwarnings("ignore", message=".*cannot modify a done command.*") self.specs.last_exposure = self monitor_task: asyncio.Task | None = None self._exposure_time = exposure_time or 0.0 log_msg = f"Starting integration on spectrograph exposure {self.exp_no} " if self.flavour == "bias": log_msg += f"({self.flavour})." else: log_msg += f"({self.flavour or object}, {exposure_time:.1f} s)." log(log_msg, "info") try: if show_progress: await self.start_timer(self._exposure_time) self.start_time = Time.now() await self._call_hook("exposure-starts") await self.specs.send_command_all( "expose", devices=self.devices, flavour=self.flavour, exposure_time=exposure_time, seqno=self.exp_no, readout=False, timeout=(exposure_time or 0) + 30, ) # Call pre-readout tasks. await self._call_hook("pre-readout", header) # At this point we have integrated and are ready to read. self.reading = True log(f"Reading spectrograph exposure {self.exp_no} ...", "info") readout_task = asyncio.create_task( self.specs.send_command_all( "read", devices=self.devices, header=json.dumps(header), timeout=90, ) ) # Now launch the task that marks the Future done when the spec # is IDLE. If async_readout=False then that will return immediately # because the spec is already idle. If async_readout=True, this method # will return now and the task will mark the Future done when readout # complete (readout is ongoing and does not need to be launched). monitor_task = asyncio.create_task(self._done_monitor()) if not async_readout: await readout_task await monitor_task else: self.stop_timer() except Exception as err: # Cancel the monitor task await cancel_task(monitor_task) self.error = True self.set_result(self) if raise_on_error: raise GortSpecError( f"Exposure failed with error {err}", error_code=ErrorCode.SECTROGRAPH_FAILED_EXPOSING, ) else: log(f"Exposure failed with error {err}", "warning") finally: self.stop_timer() return self
[docs] async def start_timer( self, exposure_time: float, readout_time: float = READOUT_TIME, ): """Starts the rich timer.""" self.stop_timer() self._progress = Progress( TextColumn(f"[yellow]({self.exp_no})"), TextColumn("[progress.description]{task.description}"), BarColumn(bar_width=None), MofNCompleteColumn(), TextColumn("s"), expand=True, transient=True, auto_refresh=False, refresh_per_second=1, console=self.specs.gort._console, # Need to use same console as logger. ) exp_task = self._progress.add_task( "[blue] Integrating ...", total=int(exposure_time), ) readout_task = self._progress.add_task( "[blue] Reading ...", total=int(readout_time), visible=False, ) self._progress.start() async def update_timer(): elapsed = 0 while True: if self._progress is None: return elif elapsed < exposure_time: self._progress.update(exp_task, advance=1) else: self._progress.update( exp_task, description="[green] Integration complete", completed=int(exposure_time), ) self._progress.update(readout_task, advance=1, visible=True) asyncio.create_task(run_in_executor(self._progress.refresh)) await asyncio.sleep(1) elapsed += 1 self._timer_task = asyncio.create_task(update_timer()) return
[docs] def stop_timer(self): """Cancels the timer.""" if self._timer_task and not self._timer_task.done(): self._timer_task.cancel() self._timer_task = None if self._progress: for task in self._progress.task_ids: self._progress.remove_task(task) self._progress.stop() self._progress = None
[docs] async def verify_files(self): """Checks that the files have been written and have the right contents.""" config = self.specs.gort.config["specs"] HEADERS_CRITICAL = config["verification"]["headers"]["critical"] HEADERS_WARNING = config["verification"]["headers"]["warning"] for retry in range(3): # Very infrequently we have cases in which this check fails even if the # files are there. It may be a delay in the NFS disk or a race condition # somewhere. As a hotfile, we try three times with a delay before giving up. files = self.get_files() n_spec = len(self.devices) if self.devices else len(config["devices"]) n_files_expected = n_spec * 3 if (n_files_found := len(files)) < n_files_expected: if retry < 2: await asyncio.sleep(3) continue self.specs.write_to_log( f"Expected {n_files_expected} files but found {n_files_found}." "warning" ) for file in files: header = fits.getheader(str(file)) for key in HEADERS_CRITICAL: if key not in header: raise RuntimeError(f"Keyword {key} not present in {file!s}") for key in HEADERS_WARNING: if key not in header: self.specs.write_to_log( f"Keyword {key} not present in {file!s}", "warning", ) md5sum_spectro = get_md5sum_from_spectro(file) md5sum = get_md5sum(file) if md5sum != md5sum_spectro: raise RuntimeError(f"MD5 checksum validation failed for file {file!r}") if len(files) > 0: pattern_path = re.sub("([rbz][1-3])", "*", str(files[0])) self.specs.write_to_log(f"Files saved to {pattern_path!r}.") return files
[docs] def get_files(self): """Returns the files written by the exposure.""" sjd = get_sjd("LCO") config = self.specs.gort.config data_path = pathlib.Path(config["specs"]["data_path"].format(SJD=sjd)) return list(data_path.glob(f"*-[0]*{self.exp_no}.fits.gz"))
def _when_done(self, result): """Called when the future is done.""" asyncio.create_task(self._call_hook("post-readout", self, as_task=True)) async def _done_monitor(self): """Waits until the spectrographs are idle, and marks the Future done.""" log = self.specs.write_to_log await self.specs.send_command_all("wait_until_idle", allow_errored=True) self.reading = False for spec in self.specs.values(): reply = await spec.status(simple=True) if "ERROR" in reply["status_names"]: self.error = True log(f"Spectrograph exposure {self.exp_no} has completed.", "info") files = await self.verify_files() exposure_table = self.gort.config["services.database.tables.exposures"] try: async with asyncio.timeout(10): await self.write_to_db(files, exposure_table) except asyncio.TimeoutError: log( "Timeout writing to database. Data may not have been recorded.", "warning", ) except Exception as err: log( f"Error writing to database: {decap(err)}. " "Data may not have been recorded.", "warning", ) # Set the Future. self.set_result(self)
[docs] @staticmethod async def write_to_db(files: list[pathlib.Path], table_name: str): """Records the exposures in ``gortdb.exposure``.""" column_data: list[dict[str, Any]] = [] for file in files: header_ap = dict(await run_in_executor(fits.getheader, str(file))) header_ap.pop("COMMENT", None) header = {kk.upper(): vv for kk, vv in header_ap.items() if vv is not None} exposure_no = header.get("EXPOSURE", None) image_type = header.get("IMAGETYP", None) exposure_time = header.get("EXPTIME", None) spec = header.get("SPEC", None) ccd = header.get("CCD", None) mjd = header.get("SMJD", None) start_time = header.get("INTSTART", None) tile_id = header.get("TILE_ID", None) reobserved = header.get("REOBS", False) ancillary = header.get("ANCILLRY", False) if start_time is not None: start_time = Time(start_time, format="isot").datetime column_data.append( { "exposure_no": exposure_no, "image_type": image_type, "exposure_time": exposure_time, "spec": spec, "ccd": ccd, "mjd": mjd, "start_time": start_time, "tile_id": tile_id, "header": json.dumps(header), "reobserved": reobserved, "ancillary": ancillary, } ) await run_in_executor(insert_to_database, table_name, column_data)
async def _call_hook(self, hook_name: str, *args, as_task: bool = False, **kwargs): """Calls the coroutines associated with a hook.""" if hook_name not in self.hooks: raise ValueError(f"Invalid hook {hook_name!r}.") self.specs.write_to_log(f"Calling hook {hook_name!r} functions.") coros = self.hooks[hook_name] if not isinstance(coros, list) and asyncio.iscoroutinefunction(coros): task = asyncio.create_task(coros(*args, **kwargs)) elif isinstance(coros, list): task = asyncio.gather(*[coro(*args, **kwargs) for coro in coros]) else: raise ValueError(f"Invalid hook functions found for {hook_name!r}.") if not as_task: await task