Source code for gort.devices.guider

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2023-06-18
# @Filename: nps.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
from datetime import datetime, timedelta, timezone
from functools import partial

from typing import TYPE_CHECKING

import polars
from packaging.version import Version

from gort import config
from gort.devices.core import GortDevice, GortDeviceSet
from gort.enums import GuiderStatus
from gort.exceptions import ErrorCode, GortError, GortGuiderError
from gort.tools import GuiderMonitor, cancel_task


if TYPE_CHECKING:
    from clu import AMQPReply

    from gort.gort import Gort
    from gort.remote import ActorReply


__all__ = ["Guider", "GuiderSet"]


[docs] class Guider(GortDevice): """Class representing a guider.""" def __init__(self, gort: Gort, name: str, actor: str, **kwargs): super().__init__(gort, name, actor) self.separation: float | None = None self.status: GuiderStatus = GuiderStatus.IDLE self.guider_monitor = GuiderMonitor(self.gort, self.name) self.gort.add_reply_callback(self._status_cb) @property def ag(self): """Gets the :obj:`.AG` device associated with this guider.""" return self.gort.ags[self.name] @property def telescope(self): """Gets the :obj:`.Telescope` device associated with this guider.""" return self.gort.telescopes[self.name]
[docs] async def update_status(self): """Returns the guider status.""" status_reply = await self.actor.commands.status() return status_reply.flatten()
async def _status_cb(self, reply: AMQPReply): """Listens to guider keywords and updates the internal state.""" if reply.sender == self.actor.name: if "status" in reply.body: self.status = GuiderStatus(int(reply.body["status"], 16)) if "measured_pointing" in reply.body: self.separation = reply.body["measured_pointing"]["separation"]
[docs] async def wait_until_guiding( self, guide_tolerance: float | None = None, timeout: float | None = None, ) -> tuple[bool, GuiderStatus, float | None, bool]: """Waits until the guider has converged. Parameters ---------- guide_tolerance The minimum separation, in arcsec, between the measured and desired positions that needs to be reached before returning. If :obj:`None`, waits until guiding (as opposed to acquisition) begins. timeout Maximum time, in seconds, to wait before returning. If :obj:`None`, waits indefinitely. If the timeout is reached it does not raise an exception. Returns ------- reached Whether the desired minimum separation was reached. status The current :obj:`.GS`. separation The current separation. timedout :obj:`True` if the acquisition timed out. """ # Initial delay to allow time for the guider to switch to DRIFTING status. await asyncio.sleep(1) elapsed = 1 while True: has_acquired = ( self.status is not None and self.separation is not None and self.status & GuiderStatus.GUIDING and not self.status & GuiderStatus.DRIFTING and (guide_tolerance is None or self.separation < guide_tolerance) ) if has_acquired: return (True, self.status, self.separation, False) elapsed += 1 if timeout is not None and elapsed > timeout: return (False, self.status, self.separation, True) await asyncio.sleep(1)
[docs] async def expose(self, *args, continuous: bool = False, **kwargs): """Exposes this telescope cameras. Parameters ---------- args,kwargs Arguments to be passed to the guider expose command. continuous Whether to expose the camera continuously. If :obj:`False` it takes a single exposure. """ while True: await self.actor.commands.expose(*args, **kwargs) if not continuous: return
[docs] async def focus( self, inplace=False, sweep: bool = True, guess: float | None = None, step_size: float = config["guiders.focus.step_size"], steps: int = config["guiders.focus.steps"], exposure_time: float = config["guiders.focus.exposure_time"], ): """Focus the telescope. Parameters ---------- inplace If :obj:`True`, focuses the telescope where it is pointing at. Otherwise points to zenith. sweep Performs a focus sweep around the initial guess position to find the best focus. If :obj:`False`, the focus position is determined based on the current bench temperature. guess The initial guess for the focuser position. If :obj:`None`, the initial guess is determined based on the current bench temperature. step_size The size, in focuser units, of each step. steps The total number of step points. Must be an odd number. exposure_time The exposure time for each step. """ reply_callback = partial(self.log_replies, skip_debug=False) if sweep is False: self.write_to_log("Adjusting focus position.", "info") await self.actor.commands.adjust_focus(reply_callback=reply_callback) return await self.update_status() if self.status & GuiderStatus.NON_IDLE: self.write_to_log( "Guider is not idle. Stopping it before focusing.", level="warning", ) await self.stop() # Send telescopes to zenith. if not inplace: self.write_to_log("Moving telescope to zenith.") await self.gort.telescopes[self.name].goto_named_position( "zenith", altaz_tracking=True, ) try: self.write_to_log(f"Focusing telescope {self.name}.", "info") replies = await self.actor.commands.focus( reply_callback=reply_callback, guess=guess, step_size=step_size, steps=steps, exposure_time=exposure_time, ) best_focus = replies.get("best_focus") if best_focus is None: raise GortError("best_focus keyword was not emitted.") elif best_focus["focus"] < 0.5 or best_focus["r2"] < 0.5: raise GortError( "Estimated focus does not seem to be correct. " "Please repeat the focus sweep." ) focus = best_focus["focus"] fwhm = best_focus["fwhm"] self.write_to_log( f"Best focus: {fwhm} arcsec at {focus} DT", "info", ) return focus, fwhm except GortError as err: self.write_to_log(f"Failed focusing with error: {err}", level="error") finally: self.separation = None return -999, -999
[docs] async def adjust_focus(self): """Adjusts the focus position based on the current bench temperature.""" await self.focus(sweep=False)
[docs] async def guide( self, ra: float | None = None, dec: float | None = None, exposure_time: float = 5.0, pixel: tuple[float, float] | str | None = None, monitor: bool = True, output_monitor_data: bool = True, **guide_kwargs, ): """Starts the guide loop. This command blocks until `.stop` is called. Parameters ---------- ra,dec The coordinates to acquire. If :obj:`None`, the current telescope coordinates are used. exposure_time The exposure time of the AG integrations. pixel The pixel on the master frame on which to guide. Defaults to the central pixel. This can also be the name of a known pixel position for this telescope, e.g., ``'P1-1'`` for ``spec``. monitor Whether to monitor the guide loop and output the average and last guide metrics every 30 seconds. output_monitor_data Whether to output the monitor data to the log. guide_kwargs Other keyword arguments to pass to ``lvmguider guide``. The includes the ``pa`` argument that if not provided is assumed to be zero. """ monitor_task: asyncio.Task | None = None # The PA argument in lvmguider was added in 0.4.0a0. if self.version == Version("0.99.0") or self.version < Version("0.4.0a0"): if "pa" in guide_kwargs: guide_kwargs.pop("pa") self.separation = None await self.update_status() if self.status & GuiderStatus.NON_IDLE: raise GortGuiderError( "Guider is not IDLE", error_code=ErrorCode.COMMAND_FAILED, ) if ra is None or dec is None: status = await self.telescope.status() ra_status = status["ra_j2000_hours"] * 15 dec_status = status["dec_j2000_degs"] ra = ra if ra is not None else ra_status dec = dec if dec is not None else dec_status config = self.gort.config if isinstance(pixel, str): if pixel not in config["guiders"]["devices"][self.name]["named_pixels"]: raise GortGuiderError( f"Invalid pixel name {pixel!r}.", error_code=ErrorCode.INVALID_PIXEL_NAME, ) pixel = config["guiders"]["devices"][self.name]["named_pixels"][pixel] log_msg = f"Guiding at RA={ra:.6f}, Dec={dec:.6f}" if pixel is not None: log_msg += f", pixel=({pixel[0]:.1f}, {pixel[1]:.1f})." self.write_to_log(log_msg, level="info") try: if monitor: self.guider_monitor.start_monitoring() if output_monitor_data: monitor_task = asyncio.create_task(self._monitor_task()) await self.actor.commands.guide( reply_callback=partial(self.log_replies, skip_debug=False), ra=ra, dec=dec, exposure_time=exposure_time, reference_pixel=pixel, **guide_kwargs, ) except Exception as err: # Deal with the guide command being cancelled when we stop it. if "This command has been cancelled" not in str(err): raise finally: await cancel_task(monitor_task)
async def _monitor_task(self, timeout: float = 30): """Monitors guiding and reports average and last guide metrics.""" while True: try: await asyncio.sleep(timeout) # Get updated data df = self.guider_monitor.get_dataframe() if df is None: continue df = df.filter(polars.col.telescope == self.name) # Select columns. df = df.select( [ "frameno", "time", "n_sources", "focus_position", "fwhm", "ra", "dec", "ra_offset", "dec_offset", "separation", "mode", ] ) # Remove NaN rows. df = df.drop_nulls() now = datetime.now(timezone.utc) time_range = now - timedelta(seconds=timeout) time_data = df.filter(polars.col.time > time_range).sort("time") if ( len(time_data) == 0 or "fwhm" not in time_data or "separation" not in time_data ): continue # Calculate and report last. last = time_data.tail(1) sep_last = round(last["separation"][0], 3) fwhm_last = round(last["fwhm"][0], 2) mode_last = last["mode"][0] self.write_to_log( f"Last: sep={sep_last} arcsec; fwhm={fwhm_last} arcsec; " f"mode={mode_last!r}", "info", ) # Calculate and report averages. sep_avg = round(time_data["separation"].mean(), 3) # type: ignore fwhm_avg = round(time_data["fwhm"].mean(), 2) # type: ignore self.write_to_log( f"Average ({timeout} s): sep={sep_avg} arcsec; " f"fwhm={fwhm_avg} arcsec", "info", ) except asyncio.CancelledError: return except Exception as err: self.write_to_log(f"Error in guider monitor: {err}", "warning")
[docs] async def stop(self) -> None: """Stops the guide loop. Parameters ---------- wait_until_stopped Blocks until the guider is idle. """ self.write_to_log("Stopping guider.", "info") await self.actor.commands.stop() self.status = GuiderStatus.IDLE self.guider_monitor.stop_monitoring()
[docs] async def set_pixel(self, pixel: tuple[float, float] | str | None = None): """Sets the master frame pixel on which to guide. Parameters ---------- pixel The pixel on the master frame on which to guide. Defaults to the central pixel. This can also be the name of a known pixel position for this telescope, e.g., ``'P1-1'`` for ``spec``. """ config = self.gort.config if isinstance(pixel, str): if pixel not in config["guiders"]["devices"][self.name]["named_pixels"]: raise GortGuiderError( f"Invalid pixel name {pixel!r}.", error_code=ErrorCode.INVALID_PIXEL_NAME, ) pixel = config["guiders"]["devices"][self.name]["named_pixels"][pixel] if pixel is None: await self.actor.commands.reset_pixel() else: await self.actor.commands.set_pixel(*pixel)
[docs] async def apply_corrections(self, enable: bool = True): """Enable/disable corrections being applied to the axes.""" await self.actor.commands.corrections(mode="enable" if enable else "disable")
[docs] async def monitor( self, ra: float | None = None, dec: float | None = None, exposure_time: float = 5.0, sleep: float = 60, monitor: bool = True, ): """Guides at a given position, sleeping between exposures. This is a convenience function mainly to monitor transparency during bad weather conditions. The telescope will be slewed to a given position (default to zenith) and guide with a low cadence. This results in the guider keywords, including transparency and FWHM, being output and the plots in Grafana being updated. After cancelling the monitoring make sure to stop the guiders with the :obj:`.Guider.stop` method. Parameter --------- ra,dec The coordinates to acquire. If :obj:`None`, the current zenith coordinates are used. exposure_time The exposure time of the AG integrations. sleep The time to sleep between exposures (seconds). monitor Start the guider monitor task. Data collected during the monitoring can be access as a :obj:`polars.DataFrame` from :obj:`Guider.guider_monitor.get_dataframe() <.GuiderMonitor.get_dataframe>`. """ if ra is None and dec is None: await self.telescope.goto_named_position("zenith", altaz_tracking=True) # Get approximate RA/Dec. It doesn't really matter, we just want to guide # on a field that's close to zenith. tel_status = await self.telescope.status() ra = tel_status.get("ra_j2000_hours") dec = tel_status.get("dec_j2000_degs") if ra is None or dec is None: raise GortGuiderError("Cannot determine telescope RA/Dec.") ra *= 15.0 elif (ra is None and dec is not None) or (ra is not None and dec is None): raise ValueError("Both RA and Dec need to be provided.") # Even if we already went to zenith in alt/az we need to go to these # coordinates again to make sure the kmirror is set. await self.telescope.goto_coordinates(ra, dec) await self.guide( ra=ra, dec=dec, exposure_time=exposure_time, sleep=sleep, monitor=monitor, output_monitor_data=False, )
[docs] async def get_focus_info(self): """Returns the guider focus information.""" info_reply: ActorReply = await self.actor.commands.focus_info() info = info_reply.flatten() for key in ["reference_focus", "current_focus"]: focus_ts = info[key]["timestamp"] if focus_ts is None: continue time = focus_ts.split("T")[1] if not focus_ts.endswith("Z") and "+" not in time and "-" not in time: focus_ts += "Z" info[key]["timestamp"] = datetime.fromisoformat(focus_ts) now = info["current_focus"]["timestamp"] ref_ts = info["reference_focus"]["timestamp"] if now is not None and ref_ts is not None: age = (now - ref_ts).total_seconds() else: age = None info["reference_focus"]["age"] = age return info
[docs] class GuiderSet(GortDeviceSet[Guider]): """A set of telescope guiders.""" __DEVICE_CLASS__ = Guider __DEPLOYMENTS__ = ["lvmguider"]
[docs] async def expose(self, *args, continuous: bool = False, **kwargs): """Exposes all the cameras using the guider. Parameters ---------- args,kwargs Arguments to be passed to :obj:`.Guider.expose`. continuous Whether to expose the camera continuously. If :obj:`False` it takes a single exposure. """ await self.call_device_method( Guider.expose, *args, continuous=continuous, **kwargs, )
[docs] async def take_darks(self): """Takes AG darks.""" # Move telescopes to park to prevent light, since we don't have shutters. # We use goto_named_position to prevent disabling the telescope and having # to rehome. self.write_to_log("Moving telescopes to park position.", level="info") await self.gort.telescopes.goto_named_position("park") # Take darks. self.write_to_log("Taking darks.", level="info") cmds = [] for guider in self.values(): cmds.append( guider.actor.commands.expose( flavour="dark", reply_callback=partial(guider.log_replies, skip_debug=False), ) ) if len(cmds) > 0: await asyncio.gather(*cmds)
[docs] async def focus( self, inplace=False, guess: float | dict[str, float] | None = None, step_size: float = 0.5, steps: int = 7, exposure_time: float = 5.0, ): """Focus all the telescopes. Parameters ---------- inplace If :obj:`True`, focuses the telescopes where they are pointing at. Otherwise points to zenith. guess The initial guesses for focuser position. If :obj:`None`, an estimate based on the current bench temperatures is used. It can also be a float value, which will be used for all telescopes, or a mapping of telescope name to guess value. Missing values will default to the configuration value. step_size The size, in focuser units, of each step. steps The total number of step points. Must be an odd number. exposure_time The exposure time for each step. """ self.write_to_log("Running focus sequence.", "info") if guess is None: guess_dict = {} elif isinstance(guess, dict): guess_dict = guess else: guess_dict = {guider_name: guess for guider_name in self} jobs = [ self[guider_name].focus( inplace=inplace, sweep=True, guess=guess_dict.get(guider_name, None), step_size=step_size, steps=steps, exposure_time=exposure_time, ) for guider_name in self ] results = await asyncio.gather(*jobs) best_focus: list[str] = [] error: bool = False for itel, name in enumerate(self): result = results[itel] if result is None: continue best_focus.append(f"{name}: {result[1]}") if any(result) < 0: error = True self.write_to_log("Best focus: " + ", ".join(best_focus), "info") if error: self.write_to_log("One or more focus values are invalid.", "error") return False return True
[docs] async def adjust_focus(self): """Adjusts the focus position based on the current bench temperature.""" await asyncio.gather(*[self[gname].adjust_focus() for gname in self])
[docs] async def guide(self, *args, **kwargs): """Guide on all telescopes. Parameters ---------- args,kwargs Arguments to be passed to :obj:`.Guider.guide`. """ await self.call_device_method(Guider.guide, *args, **kwargs)
[docs] async def stop(self): """Stops the guide loop on all telescopes.""" await self.call_device_method(Guider.stop)
[docs] async def apply_corrections(self, enable: bool = True): """Enable/disable corrections being applied to the axes.""" await self.call_device_method(Guider.apply_corrections, enable=enable)
[docs] async def monitor(self, *args, **kwargs): """Guides at a given position, sleeping between exposures. See :obj:`.Guider.monitor` for details. """ await self.call_device_method(Guider.monitor, *args, **kwargs)
[docs] async def wait_until_guiding( self, names: list[str] | None = None, guide_tolerance: float | None = None, timeout: float | None = None, ): """Waits until the guiders have converged. Parameters ---------- names List of telescopes to wait for convergence. guide_tolerance The minimum separation, in arcsec, between the measured and desired positions that needs to be reached before returning. If :obj:`None`, waits until guiding (as opposed to acquisition) begins. timeout Maximum time, in seconds, to wait before returning. If :obj:`None`, waits indefinitely. If the timeout is reached it does not raise an exception. Returns ------- status A dictionary with the telescope names and a tuple indicating whether the desired minimum separation was reached. The current ``GuiderStatus``, and the current separation for that telescope. """ names = names or list(self) results = await asyncio.gather( *[ self[name].wait_until_guiding( guide_tolerance=guide_tolerance, timeout=timeout, ) for name in names ] ) return dict(zip(names, results))