Source code for gort.observer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2023-07-09
# @Filename: observer.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import logging
import re
import signal
from contextlib import contextmanager
from dataclasses import dataclass
from functools import partial, wraps
from time import time

from typing import TYPE_CHECKING, Any, Callable, Coroutine, TypedDict

from astropy.time import Time

from sdsstools.utils import GatheringTaskGroup

from gort.enums import Event, GuiderStatus, ObserverStageStatus
from gort.exceptions import (
    ErrorCode,
    GortError,
    GortObserverCancelledError,
    GortObserverError,
)
from gort.exposure import Exposure
from gort.tile import Coordinates, Tile
from gort.tools import (
    GuiderMonitor,
    OverheadDict,
    cancel_task,
    decap,
    handle_signals,
    record_overheads,
    register_observation,
    run_in_executor,
)
from gort.transforms import fibre_slew_coordinates, wrap_pa_hex


if TYPE_CHECKING:
    from gort.gort import Gort


__all__ = ["GortObserver"]


class InterrupHandlerHelper:
    """Helper for handling interrupts"""

    def __init__(self):
        self.observer: GortObserver | None = None
        self._callback: Callable | None = None

    def run_callback(self):
        if self._callback is None:
            return

        if self.observer:
            if (exposure := self.observer._current_exposure) is not None:
                exposure.error = True
                exposure.set_result(exposure)
                exposure.stop_timer()

            self.observer.gort.log.warning("Running cleanup due to keyboard interrupt.")

        self._callback()

    def set_callback(self, cb: Callable | None):
        self._callback = cb


interrupt_helper = InterrupHandlerHelper()
interrupt_signals = [signal.SIGINT, signal.SIGTERM]


class StagesDict(TypedDict):
    """A dictionary of observer stages."""

    slew: ObserverStageStatus
    acquire: ObserverStageStatus
    expose: ObserverStageStatus


def register_stage_status(coro):
    """Records the status of the stage."""

    @wraps(coro)
    async def wrapper(*args, **kwargs):
        self: GortObserver = args[0]
        stage = coro.__name__

        tile = self._tile
        tile_id = tile.tile_id if tile else None
        dither_position = self.dither_position if tile else None

        payload = {
            "tile_id": tile_id,
            "dither_position": dither_position,
            "stage": stage,
        }

        if self.cancelling:
            raise GortObserverCancelledError()

        self.stages[stage] = ObserverStageStatus.RUNNING
        await self.gort.notify_event(Event.OBSERVER_STAGE_RUNNING, payload=payload)

        try:
            result = await coro(*args, **kwargs)
        except Exception:
            self.stages[stage] = ObserverStageStatus.FAILED
            await self.gort.notify_event(Event.OBSERVER_STAGE_FAILED, payload=payload)
            raise

        self.stages[stage] = ObserverStageStatus.DONE
        await self.gort.notify_event(Event.OBSERVER_STAGE_DONE, payload=payload)

        return result

    return wrapper


[docs] class GortObserver: """A class to handle tile observations. Parameters ---------- gort The instance of :obj:`.Gort` used to communicate with the devices. tile The :obj:`.Tile` with the information about the observation. mask_positions_pattern The ``spec`` fibre mask positions to use. on_interrupt Callback to be called when the observation is interrupted. """ def __init__( self, gort: Gort, tile: Tile | None = None, mask_positions_pattern: str = "P1-*", on_interrupt: Callable | None = None, ): self.gort = gort self._tile = tile self.mask_positions = self._get_mask_positions(mask_positions_pattern) self.guide_task: asyncio.Future | None = None self.guider_monitor = GuiderMonitor(self.gort) self.standards: Standards | None = None self.dither_position: int = 0 self._current_exposure: Exposure | None = None self.overheads: dict[str, dict[str, int | float]] = {} self.stages: StagesDict = { "slew": ObserverStageStatus.WAITING, "acquire": ObserverStageStatus.WAITING, "expose": ObserverStageStatus.WAITING, } self.cancelling: bool = False interrupt_helper.set_callback(on_interrupt) interrupt_helper.observer = self # Necessary because this sets the standards # and dither position if the tile is not null. self.reset(tile)
[docs] def reset( self, tile: Tile | None = None, on_interrupt: Callable | None = None, reset_stages: bool = True, ): """Resets the observer.""" self._tile = tile self.standards = Standards(self, tile) if tile else None self.dither_position = tile.sci_coords.dither_position if tile else 0 self.guide_task = None self.guider_monitor.reset() self._current_exposure = None self.overheads = {} self.cancelling = False if reset_stages: self.stages: StagesDict = { "slew": ObserverStageStatus.WAITING, "acquire": ObserverStageStatus.WAITING, "expose": ObserverStageStatus.WAITING, } if on_interrupt is not None: interrupt_helper.set_callback(on_interrupt)
@property def tile(self): """Returns the current tile being observed.""" if not self._tile: raise GortObserverError("No tile has been set.") return self._tile
[docs] def set_tile(self, tile: Tile): """Sets the current tile. Implies reset.""" self.reset(tile)
[docs] def is_running(self): """Returns :obj:`True` if the observer is running.""" return self.get_running_stage() is not None
[docs] def get_running_stage(self): """Returns the running stage.""" for stage, status in self.stages.items(): if status == ObserverStageStatus.RUNNING: return stage return None
def __repr__(self): tile_id = self._tile.tile_id if self._tile else "none" return f"<GortObserver (tile_id={tile_id})>" @property def has_standards(self): """Returns :obj:`True` if standards will be observed.""" if not self.standards: return False return len(self.standards.standards) > 0
[docs] async def observe_tile( self, tile: Tile | int | None = None, ra: float | None = None, dec: float | None = None, pa: float = 0.0, use_scheduler: bool = True, dither_position: int | None = None, exposure_time: float = 900.0, n_exposures: int = 1, async_readout: bool = False, keep_guiding: bool = False, skip_slew_when_acquired: bool = True, guide_tolerance: float | None = None, acquisition_timeout: float | None = None, show_progress: bool | None = None, run_cleanup: bool = True, adjust_focus: bool = True, cleanup_on_interrupt: bool = True, ) -> tuple[bool, list[Exposure]]: """Performs all the operations necessary to observe a tile. Parameters ---------- tile The ``tile_id`` to observe, or a :obj:`.Tile` object. If not provided, observes the next tile suggested by the scheduler (requires ``use_scheduler=True``). ra,dec The RA and Dec where to point the science telescopes. The other telescopes are pointed to calibrators that fit the science pointing. Cannot be used with ``tile``. pa Position angle of the IFU. Defaults to PA=0. use_scheduler Whether to use the scheduler to determine the ``tile_id`` or select calibrators. dither_position The dither position to use. If not provided, uses the tile default dither position or zero. exposure_time The length of the exposure in seconds. n_exposures Number of exposures to take while guiding. async_readout Whether to wait for the readout to complete or return as soon as the readout begins. If :obj:`False`, the exposure is registered but the observation is not finished. This should be :obj:`True` during normal science operations to allow the following acquisition to occur during readout. keep_guiding If :obj:`True`, keeps the guider running after the last exposure. This should be :obj:`False` during normal science operations. skip_slew_when_acquired If the tile has been acquired and is guiding, skips the slew, modifies the dither position, waits until the new position has been acquired, and starts exposing. guide_tolerance The guide tolerance in arcsec. A telescope will not be considered to be guiding if its separation to the commanded field is larger than this value. If :obj:`None`, default values from the configuration file are used. acquisition_timeout The maximum time allowed for acquisition. In case of timeout the acquired fields are evaluated and an exception is raised if the acquisition failed. If :obj:`None`, uses the default value from the configuration file. show_progress Displays a progress bar with the elapsed exposure time. run_cleanup Whether to run the cleanup routine. adjust_focus Adjusts the focuser positions based on temperature drift before starting the observation. This works best if the focus has been initially determined using a focus sweep. cleanup_on_interrupt If ``True``, registers a signal handler to catch interrupts and run the cleanup routine. """ write_log = self.write_to_log # Create tile. if isinstance(tile, Tile): pass elif tile is not None or (tile is None and ra is None and dec is None): if use_scheduler: tile = await run_in_executor(Tile.from_scheduler, tile_id=tile) else: raise GortError("Not enough information to create a tile.") elif ra is not None and dec is not None: if use_scheduler: tile = await run_in_executor(Tile.from_scheduler, ra=ra, dec=dec, pa=pa) else: tile = await run_in_executor(Tile.from_coordinates, ra, dec, pa=pa) else: raise GortError("Not enough information to create a tile.") assert isinstance(tile, Tile) if dither_position is not None: tile.set_dither_position(dither_position) elif tile.sci_coords.dither_position is None: self.write_to_log( "No dither position defined. Using dither_position=0.", "warning", ) tile.set_dither_position(0) if cleanup_on_interrupt: interrupt_cb = partial(self.gort.run_script_sync, "cleanup") else: interrupt_cb = None is_acquired: bool = False is_guiding = self.gort.guiders.sci.status & GuiderStatus.GUIDING # We require the tile to be acquired and guiding to skip the slew. The current # tile must match the tile_id of the new tile requested and acquisition must # have been completed. if ( skip_slew_when_acquired and self._tile is not None and tile.tile_id and self._tile.tile_id == tile.tile_id and is_guiding and self.stages["acquire"] == ObserverStageStatus.DONE ): is_acquired = True # Reset the tile self.reset(tile, on_interrupt=interrupt_cb, reset_stages=not is_acquired) # Run the cleanup routine to be extra sure. if run_cleanup: await self.gort.cleanup(turn_lamps_off=False) # Wrap the PA to the range -30 to 30. pa = tile.sci_coords.pa new_pa = wrap_pa_hex(tile.sci_coords.pa) if new_pa != pa: write_log(f"Wrapping sci PA from {pa:.3f} to {new_pa:.3f}.") tile.sci_coords.pa = new_pa if tile.tile_id is not None: extra_info = "" if tile.reobserved: extra_info += " (reobserving)" elif tile.ancillary: extra_info += " (ancillary tile)" write_log( f"Observing tile_id={tile.tile_id} on " f"dither position #{self.dither_position}{extra_info}.", "info", ) if adjust_focus: await self.gort.guiders.adjust_focus() await self.gort.notify_event( Event.OBSERVER_NEW_TILE, payload={ "tile_id": tile.tile_id, "dither_position": self.dither_position, "reobserved": tile.reobserved, "ancillary": tile.ancillary, }, ) exposures: list[Exposure] = [] failed: bool = False try: if not is_acquired: # Slew telescopes and move fibsel mask. await self.slew() # Start guiding. await self.acquire( guide_tolerance=guide_tolerance, timeout=acquisition_timeout, ) else: write_log(f"Acquiring dither position #{self.dither_position}", "info") async with GatheringTaskGroup() as group: group.create_task(self.set_dither_position(self.dither_position)) if self.standards: group.create_task(self.standards.reacquire_first()) # Need to restart the guider monitor so that the new exposure # gets the range of guider frames that correspond to this dither. # GortObserver.expose() doesn't do this because we ask for a single # exposure. self.guider_monitor.reset() # Exposing _exposure = await self.expose( exposure_time=exposure_time, show_progress=show_progress, count=n_exposures, async_readout=async_readout, keep_guiding=keep_guiding, dither_position=self.dither_position, ) if not isinstance(_exposure, list): exposures = [_exposure] else: exposures = _exposure if self.cancelling: await self.gort.guiders.stop() write_log("Reading exposure before cancelling.", "warning") await asyncio.gather(*exposures) raise GortObserverCancelledError() except GortObserverCancelledError: write_log("Observation cancelled.", "warning") failed = len(exposures) == 0 except KeyboardInterrupt: write_log("Observation interrupted by user.", "warning") failed = True finally: # Finish observation. await self.finish_observation(keep_guiding=keep_guiding and not failed) return (not failed, exposures)
[docs] @handle_signals(interrupt_signals, interrupt_helper.run_callback) @register_stage_status async def slew(self, telescopes: list[str] = ["sci", "skye", "skyw", "spec"]): """Slew to the telescope fields.""" cotasks = [] sci: tuple[float, float, float] | None = None spec: tuple[float, float] | None = None sky: dict[str, tuple[float, float]] = {} with self.register_overhead("slew:stop-guiders"): # Stops guiders. await self.gort.guiders.stop() # Slew telescopes. self.write_to_log(f"Slewing to tile_id={self.tile.tile_id}.", level="info") if "sci" in telescopes: sci = ( self.tile.sci_coords.ra, self.tile.sci_coords.dec, self.tile.sci_coords.pa, ) self.write_to_log(f"Science: {str(self.tile.sci_coords)}") if "spec" in telescopes: if self.tile.spec_coords and len(self.tile.spec_coords) > 0: first_spec = self.tile.spec_coords[0] # For spec we slew to the fibre with which we'll observe first. # This should save a bit of time converging. spec = fibre_slew_coordinates( first_spec.ra, first_spec.dec, self.mask_positions[0], derotated=False, ) self.write_to_log(f"Spec: {first_spec} on {self.mask_positions[0]}") sky = {} for skytel in ["SkyE", "SkyW"]: skytel_l = skytel.lower() if skytel_l in self.tile.sky_coords and skytel_l in telescopes: sky_coords_tel = self.tile.sky_coords[skytel_l] if sky_coords_tel is not None: sky[skytel_l] = (sky_coords_tel.ra, sky_coords_tel.dec) self.write_to_log(f"{skytel}: {sky_coords_tel}") # For sci we want to slew the k-mirror so that we can apply small positive # offsets without backlash. So we slew to the tile PA-stop_degs_before. kmirror_config = self.gort.config["telescopes"]["kmirror"] stop_degs_before = kmirror_config.get("stop_degs_before", {}).get("sci", 0.0) cotasks.append( self.gort.telescopes.goto( sci=sci, spec=spec, skye=sky.get("skye", None), skyw=sky.get("skyw", None), sci_km_stop_degs_before=stop_degs_before, ) ) # Move fibsel to first position. fibsel = self.gort.telescopes.spec.fibsel cotasks.append(fibsel.move_to_position(self.mask_positions[0], rehome=True)) # Execute. with self.register_overhead("slew:slew"): await asyncio.gather(*cotasks)
[docs] @handle_signals(interrupt_signals, interrupt_helper.run_callback) @register_stage_status async def acquire( self, guide_tolerance: float | None = None, timeout: float | None = None, telescopes: list[str] = ["sci", "skye", "skyw", "spec"], ): """Acquires the field in all the telescopes. Blocks until then. Parameters ---------- guide_tolerance The guide tolerance in arcsec. A telescope will not be considered to be guiding if its separation to the commanded field is larger than this value. If `None`, default values from the configuration file are used. timeout The maximum time allowed for acquisition. In case of timeout the acquired fields are evaluated and an exception is raised if the acquisition failed. If :obj:`None`, uses the default value from the configuration file. telescopes The list of telescopes to acquire. By default all telescopes are acquired. Raises ------ GortObserverError If the acquisition failed or the minimum required telescopes are not guiding. """ timeout = timeout or self.gort.config["observer.acquisition_timeout"] with self.register_overhead("acquisition:stop-guiders"): # Make sure we are not guiding or that the previous is on. if self.guide_task is not None and not self.guide_task.done(): await self.gort.guiders.stop() await cancel_task(self.guide_task) # Determine telescopes on which to guide. guide_coros = [] guide_on_telescopes: list[str] = [] n_skies = 0 for tel in telescopes: coords = self.tile[tel] if coords is None or (isinstance(coords, list) and len(coords) == 0): continue if tel == "spec" and isinstance(coords, list): coords = coords[0] assert isinstance(coords, Coordinates) guide_on_telescopes.append(tel) if "sky" in tel: n_skies += 1 # Pixel in the MF on which to guide. Always None/central pixel # except for sci if defined. For spec we guide on the pixel of the # first fibre/mask position. pixel = coords._mf_pixel if tel != "spec" else self.mask_positions[0] guide_tolerance_tel = self.gort.config["observer"]["guide_tolerance"][tel] guide_tolerance_tel = guide_tolerance or guide_tolerance_tel self.write_to_log( f"Starting guider on {tel} with guide tolerance " f"{guide_tolerance_tel:.1f} arcsec." ) guide_coros.append( self.gort.guiders[tel].guide( ra=coords.ra, dec=coords.dec, pa=-coords.pa, # -1 since k-mirror handiness is opposite to tile PA guide_tolerance=guide_tolerance_tel, pixel=pixel, ) ) if "spec" not in guide_on_telescopes: with self.register_overhead("acquisition:move-fibsel-500"): self.write_to_log("Not using spec: blocking fibre mask.", "warning") await self.gort.telescopes.spec.fibsel.move_relative(500) if n_skies == 0: self.write_to_log("No sky positions defined.", "warning") with self.register_overhead("acquisition:start-guide-loop"): # Start guide loop. self.guider_monitor.start_monitoring() self.guide_task = asyncio.gather(*guide_coros) with self.register_overhead("acquisition:acquire"): await asyncio.sleep(2) # Wait until convergence. self.write_to_log("Waiting for guiders to converge.") guide_status = await asyncio.gather( *[ self.gort.guiders[tel].wait_until_guiding(timeout=timeout) for tel in guide_on_telescopes ] ) has_timedout = any([gs[3] for gs in guide_status]) if has_timedout: self.write_to_log("Some acquisitions timed out.", "warning") # TODO: we need to stop guiders that timed out. try: for ii, tel in enumerate(guide_on_telescopes): is_guiding = guide_status[ii][0] if tel == "sci" and not is_guiding: raise GortObserverError( "Science telescope is not guiding.", error_code=ErrorCode.ACQUISITION_FAILED, payload={ "observer": True, "tile_id": self.tile.tile_id, "telescope": "sci", }, ) if tel == "spec": if not is_guiding: raise GortObserverError( "Spec telescope is not guiding.", error_code=ErrorCode.ACQUISITION_FAILED, payload={ "observer": True, "tile_id": self.tile.tile_id, "telescope": "spec", "fibsel_position": self.mask_positions[0], }, ) else: await self.gort.guiders.spec.apply_corrections(False) if "sky" in tel and not is_guiding: self.write_to_log(f"{tel} telescope is not guiding.", "warning") except Exception: self.write_to_log("Stopping guide loops.", "warning") await self.gort.guiders.stop() raise self.write_to_log("All telescopes are now guiding.")
[docs] @handle_signals(interrupt_signals, interrupt_helper.run_callback) @register_stage_status async def expose( self, exposure_time: float = 900.0, show_progress: bool | None = None, count: int = 1, async_readout: bool = False, keep_guiding: bool = True, object: str | None = None, dither_position: int | None = None, exposure_starts_callback: Callable[[Any], Coroutine] | None = None, ): """Starts exposing the spectrographs. Parameters ---------- exposure_time The length of the exposure in seconds. show_progress Displays a progress bar with the elapsed exposure time. count Number of exposures. If ``iterate_over_standards=True``, a full sequence of standards will be observed during each exposure. async_readout Whether to wait for the readout to complete or return as soon as the readout begins. If :obj:`False`, the exposure is registered but the observation is not finished. keep_guiding If :obj:`True`, keeps the guider running after the last exposure. object The object name to be added to the header. dither_position The dither position. If :obj:`None`, uses the first dither position in the tile. Only relevant for exposure registration. exposure_starts_callback A callback to be called when the exposure starts. Returns ------- exposures Either a single `.Exposure` or a list of exposures if ``count>1``. """ tile_id = self.tile.tile_id dither_position = dither_position or self.tile.sci_coords.dither_position if object is None: if self.tile.object: object = self.tile.object elif tile_id is not None and tile_id > 0: object = f"tile_id={tile_id}" last_exposure = self.gort.specs.last_exposure if last_exposure is not None and not last_exposure.done(): self.write_to_log("Waiting for previous exposure to read out.", "warning") await last_exposure # Last chance to bail out before the exposure. if self.cancelling: raise GortObserverCancelledError() exposures: list[Exposure] = [] _n_exposures = 0 for nexp in range(1, count + 1): self.write_to_log( f"Starting {exposure_time:.1f} s exposure ({nexp}/{count}).", "info", ) with self.register_overhead(f"expose:pre-exposure-{nexp}"): # Refresh guider data for this exposure. if _n_exposures > 0: self.guider_monitor.reset() # Move fibre selector to the first position. Should be there unless # count > 1 in which case we need to move it back. if self.standards: await self.standards.start_iterating(exposure_time) exposure = Exposure(self.gort, flavour="object", object=object) self._current_exposure = exposure exposure.hooks["pre-readout"].append(self._pre_readout) exposure.hooks["post-readout"].append(self._post_readout) if exposure_starts_callback: exposure.hooks["exposure-starts"].append(exposure_starts_callback) # TODO: this is a bit dangerous because we are registering the # exposure before it's actually written to disk. Maybe we should # wait until _post_readout() to register, but then the scheduler # needs to be changed to not return the same tile twice. We are also # scheduling the registration before the exposure ends because the AS # endpoint that register the observation can take up to 5 seconds and I # want to avoid waiting. registration_task = asyncio.create_task( self.register_exposure( exposure, nexp=nexp, tile_id=tile_id, dither_position=dither_position, delay=exposure_time - 5.0, ) ) with self.register_overhead(f"expose:integration-{nexp}"): try: await exposure.expose( exposure_time=exposure_time, show_progress=show_progress, async_readout=True, ) except Exception: await cancel_task(registration_task) raise else: await registration_task if nexp == count and not keep_guiding: with self.register_overhead(f"expose:stop-guiders-{nexp}"): await self.gort.guiders.stop() if nexp == count and async_readout: return exposure else: await exposure exposures.append(exposure) _n_exposures += 1 if len(exposures) == 1: return exposures[0] else: return exposures
[docs] @handle_signals(interrupt_signals, interrupt_helper.run_callback) async def finish_observation(self, keep_guiding: bool = False): """Finishes the observation, stops the guiders, etc.""" self.write_to_log("Finishing observation.", "info") with self.register_overhead("finish-observation"): # Should have been cancelled in _update_header(), but just in case. if self.standards is not None: await self.standards.cancel() if ( not keep_guiding and self.guide_task is not None and not self.guide_task.done() ): await self.gort.guiders.stop() await self.guide_task tile_id = self.tile.tile_id if tile_id is None or tile_id <= 0: tile_id = None # Write overheads to database. payload: list[OverheadDict] = [ { "observer_id": id(self), "tile_id": tile_id, "dither_position": int(value["dither_position"]), "stage": name, "start_time": value["t0"], "end_time": value["t0"] + value["elapsed"], "duration": value["elapsed"], } for name, value in self.overheads.items() ] try: record_overheads(payload) except Exception as err: self.write_to_log( f"Failed saving overheads to database: {decap(err)}", "error", )
[docs] async def register_exposure( self, exposure: Exposure, nexp: int = 1, tile_id: int | None = None, dither_position: int = 0, delay: float = 0.0, ): """Registers the exposure in the database.""" # Because of the delay argument we cannot use the register_overhead # context manager here, so we do it manually. self.overheads[f"exposure:register-exposure-{nexp}"] = { "dither_position": self.tile.sci_coords.dither_position, "t0": time(), "elapsed": 0.0, } # Delay registering the observation. This is useful to be able to call this # coroutine immediately after the exposure starts, but have it register the # exposure only after a delay. await asyncio.sleep(delay) t0 = time() if exposure._exposure_time is None: raise GortObserverError("Exposure time cannot be 'None'.") if exposure.flavour != "object": return standards: list[int] = [] if self.standards: for stdn, std_data in self.standards.standards.items(): if std_data.observed == 1: pk = self.tile.spec_coords[stdn - 1].pk if pk is not None: standards.append(pk) skies = [sky.pk for sky in self.tile.sky_coords.values() if sky.pk is not None] self.write_to_log("Registering observation.", "info") registration_payload = { "exposure_no": exposure.exp_no, "dither": dither_position, "exposure_time": exposure._exposure_time, "jd": float(exposure.start_time.jd), "seeing": -999.0, "standards": standards, "skies": skies, } if tile_id is not None: registration_payload["tile_id"] = tile_id self.write_to_log(f"Registration payload {registration_payload}") try: await register_observation(registration_payload) except Exception as err: self.write_to_log(f"Failed registering exposure: {decap(err)}", "error") else: self.write_to_log("Registration complete.") self.overheads[f"exposure:register-exposure-{nexp}"]["elapsed"] = time() - t0
[docs] async def set_dither_position(self, dither: int): """Reacquire science telescope for a new dither position.""" valid_status = ( GuiderStatus.GUIDING | GuiderStatus.DRIFTING | GuiderStatus.ACQUIRING ) sci_status = self.gort.guiders.sci.status if sci_status is None or not (sci_status & valid_status): raise GortObserverError("sci guider must be active to set dither position.") self.tile.set_dither_position(dither) await self.gort.guiders.sci.set_pixel(self.tile.sci_coords._mf_pixel) # Wait until converges. with self.register_overhead(f"set-dither-position:acquire-{dither}"): await asyncio.sleep(2) self.write_to_log("Waiting for 'sci' guider to converge.") await self.gort.guiders.sci.wait_until_guiding(timeout=120)
[docs] def write_to_log( self, message: str, level: str = "debug", header: str | None = None, event: Event | None = None, extra_payload: dict[str, Any] = {}, ): """Writes a message to the log with a custom header. Parameters ---------- message The message to log. level The level to use for logging: ``'debug'``, ``'info'``, ``'warning'``, or ``'error'``. header The header to prepend to the message. By default uses the class name. event If specified, emits an event of this type. extra_payload Additional payload to include in the event. """ if header is None: header = f"({self.__class__.__name__}) " message = f"{header}{message}" level_int = logging._nameToLevel[level.upper()] self.gort.log.log(level_int, message) if event: payload = { "observer": True, "message": message, "level": level, "tile_id": self.tile.tile_id, } payload.update(extra_payload) asyncio.create_task(self.gort.notify_event(event, payload=payload))
[docs] @contextmanager def register_overhead(self, name: str): """Measures and registers and overhead.""" t0 = time() yield self.overheads[name] = { "dither_position": self.tile.sci_coords.dither_position, "t0": t0, "elapsed": time() - t0, }
def _get_mask_positions(self, pattern: str): """Returns mask positions sorted by motor steps.""" mask_config = self.gort.config["telescopes"]["mask_positions"] all_positions = self.gort.telescopes.spec.fibsel.list_positions() positions = [pos for pos in all_positions if re.match(pattern, pos)] return sorted(positions, key=lambda p: mask_config[p]) async def _pre_readout(self, header: dict[str, Any]): """Updates the exposure header with pointing and guiding information.""" tile_id = self.tile.tile_id dither_pos = self.tile.sci_coords.dither_position # Pointing coordinates taking into account the dither position. position_coords = self.tile.sci_coords.get_dither_radec() tile_header = { "TILE_ID": (tile_id or -999, "The tile_id of this observation"), "DPOS": (dither_pos, "Dither position"), "REOBS": (self.tile.reobserved, "Tile is being reobserved"), "ANCILLRY": (self.tile.ancillary, "Ancillary tile"), "POSCIRA": round(position_coords[0], 6), "POSCIDE": round(position_coords[1], 6), "POSCIPA": round(self.tile.sci_coords.pa, 4), } if self.tile["skye"]: tile_header.update( { "POSKYERA": round(self.tile.sky_coords["skye"].ra, 6), "POSKYEDE": round(self.tile.sky_coords["skye"].dec, 6), "POSKYEPA": round(self.tile.sky_coords["skye"].pa, 4), "SKYENAME": self.tile.sky_coords["skye"].name, } ) if self.tile["skyw"]: tile_header.update( { "POSKYWRA": round(self.tile.sky_coords["skyw"].ra, 6), "POSKYWDE": round(self.tile.sky_coords["skyw"].dec, 6), "POSKYWPA": round(self.tile.sky_coords["skyw"].pa, 4), "SKYWNAME": self.tile.sky_coords["skyw"].name, } ) header.update(tile_header) header.update(self.guider_monitor.to_header()) # At this point the shutter is closed so let's stop observing standards. # This also finishes updating the standards table. if self.has_standards and self.standards: await self.standards.cancel() # There is some overhead between when we set t0 for the first standard # and when the exposure actually begins. This leads to the first standard # having longer exposure time than open shutter. if self.has_standards and self._current_exposure is not None: start_time = self._current_exposure.start_time.unix self.standards.standards[1].t0 = start_time header.update(self.standards.to_header()) async def _post_readout(self, exposure: Exposure): """Post exposure tasks.""" # tile_id = self.tile.tile_id # dither_pos = self.tile.sci_coords.dither_position # if exposure.error is True: # if tile_id is not None: # mark_exposure_bad(tile_id, dither_pos) # self.write_to_log( # "Exposure returned with errors. Tile-dither has been marked as bad.", # "error", # ) return
@dataclass class Standard: """A class to represent a standard star.""" n: int ra: float dec: float source_id: int = -1 acquired: bool = False observed: bool = False t0: float = 0.0 t1: float = 0.0 fibre: str = "" class Standards: """Iterates over standards and monitors observed standards.""" def __init__( self, observer: GortObserver, tile: Tile, ): self.observer = observer self.gort = observer.gort self.tile = tile self.iterate_task: asyncio.Task | None = None self.current_standard: int = 1 self.standards = self._get_frame() @property def mask_positions(self): """Returns the list of mask positions from `.GortObserver`.""" return self.observer.mask_positions def _get_frame(self): """Constructs the standard data frame.""" standards: dict[int, Standard] = {} for stdn, cc in enumerate(self.tile.spec_coords): standards[stdn + 1] = Standard( n=stdn + 1, ra=cc.ra, dec=cc.dec, source_id=cc.source_id or -1, ) return standards async def start_iterating(self, exposure_time: float) -> None: """Iterates over the fibre mask positions. Moving the ``spec`` telescope to each one of the standard star, acquires the new field, and adjusts the time to stay on each target. # TODO: right now we are assuming that the first standard has been # acquired when we call this method. This is not always the case and # it caused a bug when observing multiple dither positions so we should # check. """ if len(self.standards) == 0: return await self.gort.telescopes.spec.fibsel.move_to_position(self.mask_positions[0]) await cancel_task(self.iterate_task) self.standards = self._get_frame() self.current_standard = 1 self.standards[1].acquired = True self.standards[1].t0 = time() self.standards[1].fibre = self.mask_positions[0] self.iterate_task = asyncio.create_task(self._iterate(exposure_time)) async def reacquire_first(self): """Re-acquires the first standard. This method should only be called when observing second and subsequent dithers in a tile. """ if self.iterate_task and not self.iterate_task.done(): await self.cancel() self.current_standard = 1 # Home the fibsel just to be sure. await self.gort.telescopes.spec.fibsel.home() if not (await self.acquire_standard(0)): raise GortObserverError("Failed to re-acquire first standard.") async def acquire_standard(self, standard_idx: int): """Acquires a standard star and starts guiding on it.""" guider_pixels: dict[str, tuple[float, float]] guider_pixels = self.gort.config["guiders"]["devices"]["spec"]["named_pixels"] # Tolerance to start guiding guide_tolerance_spec = self.gort.config["observer"]["guide_tolerance"]["spec"] # Moving the mask to an intermediate position while we move around. spec_tel = self.gort.telescopes.spec await spec_tel.fibsel.move_relative(500) overhead_root = f"standards:standard-{self.current_standard}" # New coordinates to observe. new_coords = self.tile.spec_coords[standard_idx] new_mask_position = self.mask_positions[standard_idx] event_payload = { "observer": True, "tile_id": self.tile.tile_id, "telescope": "spec", "n_standard": self.current_standard, "fibsel_position": new_mask_position, "coordinates": [new_coords.ra, new_coords.dec], } # Pixel on the MF corresponding to the new fibre/mask hole on # which to guide. We use this tabulated list instead of # offset_to_master_frame_pixel() because the latter coordinates # are less precise as they do not include IFU rotation and more # precise metrology. new_guider_pixel = guider_pixels[new_mask_position] self.observer.write_to_log( f"Moving to standard #{self.current_standard} ({new_coords}) " f"on fibre {new_mask_position}.", "info", ) await self.gort.notify_event( Event.OBSERVER_ACQUISITION_START, payload=event_payload, ) # Slew to new coordinates. We actually slew to the coordinates # that make the new star close to the fibre that will observe it. slew_ra, slew_dec = fibre_slew_coordinates( new_coords.ra, new_coords.dec, new_mask_position, derotated=False, ) # Finish guiding on spec telescope. self.observer.write_to_log("Re-slewing 'spec' telescope.") with self.observer.register_overhead(f"{overhead_root}-slew"): cotasks = [ self.gort.guiders["spec"].stop(), spec_tel.goto_coordinates(ra=slew_ra, dec=slew_dec, force=True), ] await asyncio.gather(*cotasks) # Start to guide. Note that here we use the original coordinates # of the star along with the pixel on the master frame on which to # guide. See the note in fibre_slew_coordinates(). self.observer.write_to_log("Starting to guide on spec telescope.") asyncio.create_task( self.gort.guiders.spec.guide( ra=new_coords.ra, dec=new_coords.dec, guide_tolerance=guide_tolerance_spec, pixel=new_guider_pixel, ) ) with self.observer.register_overhead(f"{overhead_root}-acquire"): result = await self.gort.guiders.spec.wait_until_guiding(timeout=60) if result[0] is False: self.observer.write_to_log( f"Timed out acquiring standard {self.current_standard}.", "warning", event=Event.OBSERVER_STANDARD_ACQUISITION_FAILED, extra_payload={**event_payload, "reason": "timeout"}, ) return False self.observer.write_to_log( f"Standard #{self.current_standard} on " f"{new_mask_position!r} has been acquired.", "info", ) # Move mask to uncover fibre. await spec_tel.fibsel.move_to_position(new_mask_position) # Do not guide. This means RA/Dec drifting will happen # but not rotation drifting since we are guiding on a point # source. await self.gort.guiders.spec.apply_corrections(False) return True async def cancel(self): """Cancels iteration.""" self.observer.write_to_log("Cancelling standards iteration.", "debug") await cancel_task(self.iterate_task) if len(self.standards) == 0: return if self.standards[self.current_standard].acquired: if not self.standards[self.current_standard].observed: self.standards[self.current_standard].observed = True self.standards[self.current_standard].t1 = time() async def _iterate(self, exposure_time: float): """Iterate task.""" # Time to acquire a standard. ACQ_PER_STD = 30 spec_coords = self.tile.spec_coords # If we have zero or one standards, do nothing. The spec telescope # is already pointing to the first mask position. if len(spec_coords) <= 1: return # Time at which the exposure began. t0 = time() # Time to spend on each mask position. n_stds = len(spec_coords) # Calculate the time to actually be on target, taking into account # how long we expect to take acquiring. time_per_position = exposure_time / n_stds - ACQ_PER_STD if time_per_position < 0: self.observer.write_to_log( "Exposure time is too short to observe this " "many standards. I will do what I can.", "warning", ) time_per_position = exposure_time / n_stds # Time at which we started observing the last standard. t0_last_std = t0 # Number of standards observed. n_observed = 1 # Index of current standard being observed. current_std_idx = 0 while True: await asyncio.sleep(1) # We consider than if there is less 2 * ACQ_PER_STD left in the # exposure there is no point in going to the next standard. t_now = time() if t_now - t0 > exposure_time - 2 * ACQ_PER_STD: self.observer.write_to_log("Exiting standard loop.") self.observer.write_to_log( f"Standards observed: {n_observed}/{n_stds}.", "info", ) return # Time to move to another standard? if t_now - t0_last_std > time_per_position: # Check that we haven't run out standards. If so, # keep observing this one. if len(spec_coords) == current_std_idx + 1: continue # Register the previous standard. if self.standards[self.current_standard].acquired: self.standards[self.current_standard].t1 = time() self.standards[self.current_standard].observed = True # Increase current index and acquire the next standard. current_std_idx += 1 self.current_standard += 1 if not (await self.acquire_standard(current_std_idx)): continue n_observed += 1 t0_last_std = time() new_mask_position = self.mask_positions[current_std_idx] self.standards[self.current_standard].acquired = True self.standards[self.current_standard].t0 = time() self.standards[self.current_standard].fibre = new_mask_position def to_header(self): """Returns observed standards as a header-ready dictionary.""" header_data = {} for nstd, data in self.standards.items(): header_data[f"STD{nstd}ID"] = data.source_id if data.source_id > 0 else None header_data[f"STD{nstd}RA"] = data.ra header_data[f"STD{nstd}DE"] = data.dec header_data[f"STD{nstd}ACQ"] = data.observed if data.observed: header_data[f"STD{nstd}T0"] = Time(data.t0, format="unix").isot header_data[f"STD{nstd}T1"] = Time(data.t1, format="unix").isot header_data[f"STD{nstd}EXP"] = round(data.t1 - data.t0, 1) header_data[f"STD{nstd}FIB"] = data.fibre return header_data