Source code for gort.devices.telescope

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2023-03-10
# @Filename: telescope.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
from collections import defaultdict
from time import time

from typing import TYPE_CHECKING, ClassVar

import numpy

from lvmopstools.retrier import Retrier

from gort.devices.core import GortDevice, GortDeviceSet
from gort.enums import Event
from gort.exceptions import ErrorCode, GortTelescopeError
from gort.tools import angular_separation, kubernetes_restart_deployment


if TYPE_CHECKING:
    from gort.gort import Gort
    from gort.remote import ActorReply


__all__ = ["Telescope", "TelescopeSet", "KMirror", "FibSel", "Focuser", "MoTanDevice"]


[docs] class MoTanDevice(GortDevice): """A TwiceAsNice device.""" #: Artificial delay introduced to prevent all motors to slew at the same time. SLEW_DELAY: ClassVar[float | dict[str, float]] = 0 device_type: ClassVar[str] def __init__(self, gort: Gort, name: str, actor: str): super().__init__(gort, name, actor) class_name = self.__class__.__name__ timeouts = self.gort.config["telescopes"]["timeouts"].get(class_name.lower()) self.timeouts: defaultdict[str, float | None] self.timeouts = defaultdict(lambda: None, timeouts) self.telescope = self.name.split(".")[0] self.device: str if "fibsel" in self.name: self.device = "fibsel" elif "foc" in self.name: self.device = "focuser" elif "km" in self.name: self.device = "kmirror" else: raise GortTelescopeError(f"Invalid device type {self.name!r}.")
[docs] async def status(self): """Returns the status of the device.""" return await self.run_command("status")
[docs] async def is_reachable(self): """Is the device reachable?""" is_reachable = await self.run_command("isReachable") return bool(is_reachable.get("Reachable"))
[docs] async def check_reachable(self): """Checks if the device is reachable or issues an error.""" if not (await self.is_reachable()): raise GortTelescopeError( f"MoTAN device {self.name!r} is not reachable.", error_code=ErrorCode.MOTAN_DEVICE_NOT_REACHABLE, payload={"telescope": self.telescope, "device": self.device}, )
[docs] async def is_moving(self): """Is the device moving.""" is_moving = await self.run_command("isMoving") return bool(is_moving.get("Moving"))
[docs] async def slew_delay(self): """Sleeps the :obj:`.SLEW_DELAY` amount.""" if isinstance(self.SLEW_DELAY, (float, int)): await asyncio.sleep(self.SLEW_DELAY) else: await asyncio.sleep(self.SLEW_DELAY[self.telescope])
[docs] @Retrier(max_attempts=3, delay=2) async def stop(self, force: bool = False): """Stop the K-mirror movement.""" if await self.is_moving() or force: self.write_to_log("Stopping slew.") if self.device_type == "kmirror": await self.run_command("slewStop", timeout=self.timeouts["slewStop"]) await self.run_command("stop", timeout=self.timeouts["slewStop"]) await asyncio.sleep(1) if await self.is_moving(): raise GortTelescopeError( f"Failed to stop the {self.name}.", error_code=ErrorCode.MOTAN_DEVICE_CANNOT_STOP, payload={"telescope": self.telescope, "device": self.device}, )
[docs] async def run_command( self, command: str, *args, n_retries: int = 3, delay: float = 1, **kwargs, ) -> ActorReply: """Runs a MoTan command with retries.""" return await self.actor.commands[command]( *args, n_retries=n_retries, delay=delay, **kwargs, )
[docs] class KMirror(MoTanDevice): """A device representing a K-mirror.""" SLEW_DELAY = 0 device_type = "kmirror"
[docs] async def home(self): """Homes the k-mirror.""" await self.check_reachable() await self.slew_delay() await self.stop(force=True) self.write_to_log("Homing k-mirror.", level="info") await self.run_command("moveToHome", timeout=self.timeouts["moveToHome"]) self.write_to_log("k-mirror homing complete.")
[docs] async def park(self): """Park the k-mirror at 90 degrees.""" await self.move(90)
[docs] @Retrier(max_attempts=3, delay=2) async def move(self, degs: float): """Move the k-mirror to a position in degrees. Does NOT track after the move. Parameters ---------- degs The position to which to move the k-mirror, in degrees. """ await self.check_reachable() await self.slew_delay() await self.stop() self.write_to_log(f"Moving k-mirror to {degs:.3f} degrees.", level="info") await self.run_command( "moveAbsolute", degs, "deg", timeout=self.timeouts["moveAbsolute"], )
[docs] async def slew( self, ra: float, dec: float, offset_angle: float = 0.0, stop_degs_before: float = 0.0, ): """Moves the mirror to the position for ``ra, dec`` and starts slewing. Parameters ---------- ra Right ascension of the field to track, in degrees. dec Declination of the field to track, in degrees. offset_angle Derotation offset in degrees. This value is converted to the -180 to 180 deg range before sending it to the k-mirror. stop_degs_before Number of degrees to stop before reaching the desired position angle. This has the effect of actually slewing to ``offset_angle-stop_degs_before``. Useful if we want to be sure that positive offsets will be applied without backlash. """ await self.check_reachable() await self.slew_delay() await self.stop() offset_angle %= 360 if offset_angle > 180: offset_angle -= 360 if offset_angle == 0: msg = f"Slewing k-mirror to ra={ra:.6f} dec={dec:.6f} and tracking." else: msg = ( f"Slewing k-mirror to ra={ra:.6f} dec={dec:.6f} " f"pa={offset_angle:.3f} and tracking." ) self.write_to_log(msg, level="info") stop_degs_before = abs(stop_degs_before) if abs(stop_degs_before) > 0: self.write_to_log(f"Using stop_degs_before={stop_degs_before}.") await self.run_command( "slewStart", ra / 15.0, dec, seg_time=self.gort.config["telescopes"]["kmirror"]["seg_time"], seg_min_num=self.gort.config["telescopes"]["kmirror"]["seg_min_num"], offset_angle=offset_angle - stop_degs_before, timeout=self.timeouts["slewStart"], )
[docs] class Focuser(MoTanDevice): """A device representing a focuser.""" SLEW_DELAY = 0 device_type = "focuser"
[docs] async def home(self): """Homes the focuser. Parameters ---------- restore_position Whether to restore the previous focuser position after homing. """ await self.check_reachable() await self.slew_delay() await self.stop(force=True) # Store current position to restore it later. status = await self.status() current_position = status.get("Position") self.write_to_log("Homing focuser.", level="info") await self.run_command("moveToHome", timeout=self.timeouts["moveToHome"]) self.write_to_log("Focuser homing complete.") if current_position is not None and not numpy.isnan(current_position): self.write_to_log(f"Restoring position {current_position} DT.") await self.move(current_position)
[docs] async def move(self, dts: float): """Move the focuser to a position in DT.""" await self.check_reachable() await self.slew_delay() await self.stop() self.write_to_log(f"Moving focuser to {dts:.3f} DT.", level="info") await self.run_command( "moveAbsolute", dts, "DT", timeout=self.timeouts["moveAbsolute"], )
[docs] class FibSel(MoTanDevice): """A device representing the fibre mask in the spectrophotometric telescope.""" # We really don't want a delay here because it would slow down the acquisition # of new standards, and anyway the fibre selector usually moves by itself. SLEW_DELAY: float = 0 # Rehome after this many seconds. HOME_AFTER: float | None = None device_type = "fibsel" def __init__(self, gort: Gort, name: str, actor: str): super().__init__(gort, name, actor) self.__last_homing: float = 0
[docs] async def home(self): """Homes the fibre selector.""" await self.check_reachable() await self.slew_delay() await self.stop(force=True) self.write_to_log("Homing fibsel.", level="info") await self.run_command("moveToHome", timeout=self.timeouts["moveToHome"]) self.write_to_log("Fibsel homing complete.") self.__last_homing = time()
[docs] def list_positions(self) -> list[str]: """Returns a list of valid positions.""" return list(self.gort.config["telescopes"]["mask_positions"])
async def _check_home(self): """Checks if a homing is required before moving the mask.""" if self.HOME_AFTER is None: return if time() - self.__last_homing > self.HOME_AFTER: self.write_to_log("Rehoming fibsel before moving.", "warning") await self.home()
[docs] async def move_to_position(self, position: str | int, rehome: bool = False): """Moves the spectrophotometric mask to the desired position. Parameters ---------- position A position in the form `PN-M` where ``N=1,2`` and ``M=1-12``, in which case the mask will rotate to expose the fibre with that name. If ``position`` is a number, moves the mask to that value. rehome Home the fibre selector before moving to the new position. """ await self.check_reachable() if rehome: await self.home() if isinstance(position, str): mask_positions = self.gort.config["telescopes"]["mask_positions"] if position not in mask_positions: raise GortTelescopeError( f"Cannot find position {position!r}.", error_code=ErrorCode.FIBSEL_INVALID_POSITION, ) steps = mask_positions[position] self.write_to_log(f"Moving mask to {position}: {steps} DT.", level="info") else: steps = position self.write_to_log(f"Moving mask to {steps} DT.", level="info") await self._move("moveAbsolute", steps)
[docs] async def move_relative(self, steps: float): """Move the mask a number of motor steps relative to the current position.""" await self.check_reachable() self.write_to_log(f"Moving fibre mask {steps} steps.") await self._move("moveRelative", steps)
async def _move(self, command: str, steps: float, allow_rehoming: bool = True): """Moves the fibre selector. If the move fails, tries rehoming.""" try: await self._check_home() await self.slew_delay() await self.stop() await self.run_command(command, steps, timeout=self.timeouts[command]) except Exception as err: if allow_rehoming: self.write_to_log( f"Failed to move fibsel with error: {err} - Rehoming and retrying.", "warning", ) # Notify this event so that it can be taken into account, for # example to add a flag if this happens during the standards # iteration loop. await self.gort.notify_event( Event.UNEXPECTED_FIBSEL_REHOME, payload={"time": time()}, ) await self.home() await self._move(command, steps, allow_rehoming=False) else: raise
[docs] class Telescope(GortDevice): """Class representing an LVM telescope functionality.""" def __init__(self, gort: Gort, name: str, actor: str, **kwargs): super().__init__(gort, name, actor) self.is_homed: bool = False self.config = self.gort.config["telescopes"] self.pwi = self.actor kmirror_actor = kwargs.get("kmirror", None) if kmirror_actor: self.has_kmirror = True self.km = KMirror(self.gort, f"{self.name}.km", kmirror_actor) else: self.has_kmirror = False self.km = None self.focuser = Focuser(self.gort, f"{self.name}.focuser", kwargs["focuser"]) self.fibsel = ( FibSel(self.gort, f"{self.name}.fibsel", "lvm.spec.fibsel") if self.name == "spec" else None ) if self.name in self.gort.guiders: self.guider = self.gort.guiders[name] else: self.guider = None self.timeouts = self.gort.config["telescopes"]["timeouts"]["pwi"]
[docs] async def init(self): """Determines the initial state of the telescope.""" # If the axes are enabled, we assume the telescope is homed. if not self.is_homed and (await self.is_ready()): self.is_homed = True
[docs] async def status(self): """Retrieves the status of the telescope.""" reply: ActorReply = await self.pwi.commands.status() return reply.flatten()
[docs] async def is_parked(self): """Is the telescope parked?""" park_position = self.gort.config["telescopes"]["named_positions"]["park"]["all"] status = await self.status() if status["is_enabled"] or status["is_tracking"] or status["is_slewing"]: return False alt_diff = numpy.abs(status["altitude_degs"] - park_position["alt"]) az_diff = numpy.abs(status["azimuth_degs"] - park_position["az"]) if alt_diff > 5 or az_diff > 5: return False return True
[docs] async def is_ready(self): """Checks if the telescope is ready to be moved.""" status = await self.status() is_connected = status.get("is_connected", False) is_enabled = status.get("is_enabled", False) return is_connected and is_enabled
[docs] async def initialise(self, home: bool | None = None): """Connects to the telescope and initialises the axes. Parameters ---------- home If :obj:`True`, runs the homing routine after initialising. """ if not (await self.is_ready()): self.write_to_log("Initialising telescope.") await self.pwi.commands.setConnected(True) await self.pwi.commands.setEnabled(True) if home is True: await self.home()
[docs] async def home( self, home_telescope: bool = True, home_km: bool = True, home_focuser: bool = False, home_fibsel: bool = False, ): """Initialises and homes the telescope. Parameters --------- home_telescope Homes the telescope. Defaults to :obj:`True`. home_km Homes the K-mirror, if present. Defaults to :obj:`True`. home_focuser Homes the focuser. Defaults to :obj:`False`. home_fibsel Homes the fibre selector, if present. Defaults to :obj:`False`. """ home_subdevices_task: asyncio.Future | None = None subdev_tasks = [] if self.km is not None and home_km: subdev_tasks.append(self.km.home()) if self.fibsel is not None and home_fibsel: subdev_tasks.append(self.fibsel.home()) if self.focuser is not None and home_focuser: subdev_tasks.append(self.focuser.home()) home_subdevices_task = asyncio.gather(*subdev_tasks) if home_telescope: if await self.gort.enclosure.is_local(): raise GortTelescopeError( "Cannot home in local mode.", error_code=ErrorCode.CANNOT_MOVE_LOCAL_MODE, ) self.write_to_log("Homing telescope.", level="info") if not (await self.is_ready()): await self.pwi.commands.setConnected(True) await self.pwi.commands.setEnabled(True) await self.pwi.commands.findHome() # findHome does not block, so wait a reasonable amount of time. await asyncio.sleep(15) self.is_homed = True if home_subdevices_task is not None and not home_subdevices_task.done(): await home_subdevices_task
[docs] async def park( self, disable=True, use_pw_park=False, alt_az: tuple[float, float] | None = None, kmirror: bool = True, force: bool = False, ): """Parks the telescope. Parameters ---------- disable Disables the axes after reaching the park position. use_pw_park Uses the internal park position stored in the PlaneWave mount. alt_az A tuple with the alt and az position at which to park the telescope. If not provided, defaults to the ``park`` named position. kmirror Whether to park the mirror at 90 degrees. force Moves the telescope even if the mode is local. """ if await self.gort.enclosure.is_local(): raise GortTelescopeError( "Cannot home in local mode.", error_code=ErrorCode.CANNOT_MOVE_LOCAL_MODE, ) await self.initialise() if use_pw_park: self.write_to_log("Parking telescope to PW default position.", level="info") await self.pwi.commands.park() elif alt_az is not None: self.write_to_log(f"Parking telescope to alt={alt_az[0]}, az={alt_az[1]}.") await self.goto_coordinates( alt=alt_az[0], az=alt_az[1], kmirror=False, altaz_tracking=False, force=force, ) else: coords = self.gort.config["telescopes"]["named_positions"]["park"]["all"] alt = coords["alt"] az = coords["az"] self.write_to_log(f"Parking telescope to alt={alt}, az={az}.", level="info") await self.goto_coordinates( alt=alt, az=az, kmirror=False, altaz_tracking=False, force=force, ) if disable: self.write_to_log("Disabling telescope.") await self.pwi.commands.setEnabled(False) if kmirror and self.km: self.write_to_log("Parking k-mirror.", level="info") await self.km.park() self.is_homed = False
[docs] async def stop(self): """Stops the mount.""" self.write_to_log("Stopping the mount.", "warning") await self.actor.commands.stop()
[docs] async def goto_coordinates( self, ra: float | None = None, dec: float | None = None, pa: float = 0.0, alt: float | None = None, az: float | None = None, kmirror: bool = True, kmirror_kwargs: dict = {}, altaz_tracking: bool = False, force: bool = False, retry: bool = True, ): """Moves the telescope to a given RA/Dec or Alt/Az. Parameters ---------- ra Right ascension coordinates to move to, in degrees. dec Declination coordinates to move to, in degrees. pa Position angle of the IFU. Defaults to PA=0. alt Altitude coordinates to move to, in degrees. az Azimuth coordinates to move to, in degrees. kmirror Whether to move the k-mirror into position. Only when the coordinates provided are RA/Dec. kmirror_kwargs Dictionary of keyword arguments to pass to :obj:`.KMirror.slew`. altaz_tracking If :obj:`True`, starts tracking after moving to alt/az coordinates. By default the PWI won't track with those coordinates. force Move the telescopes even if mode is local. retry Retry once if the coordinates are not reached. """ if not force and (await self.gort.enclosure.is_local()): self.write_to_log("Checking if enclosure is in local mode.") raise GortTelescopeError( "Cannot move telescope in local mode.", error_code=ErrorCode.CANNOT_MOVE_LOCAL_MODE, ) kmirror_task: asyncio.Task | None = None if kmirror and self.km and ra is not None and dec is not None: kmirror_task = asyncio.create_task( self.km.slew( ra, dec, offset_angle=pa, **kmirror_kwargs, ) ) # Commanded and reported coordinates. To be used to check if we reached # the correct position. commanded: tuple[float, float] reported: tuple[float, float] await self.initialise() if not self.is_homed: self.write_to_log("Telescope is not homed. Homing now.", "warning") await self.home() if ra is not None and dec is not None: is_radec = ra is not None and dec is not None and not alt and not az assert is_radec, "Invalid input parameters" self.write_to_log(f"Moving to ra={ra:.6f} dec={dec:.6f}.", level="info") ra = float(numpy.clip(ra, 0, 360)) dec = float(numpy.clip(dec, -90, 90)) await self.pwi.commands.gotoRaDecJ2000( ra / 15.0, dec, timeout=self.timeouts["slew"], ) # Check the position the PWI reports. status = await self.status() ra_status = status["ra_j2000_hours"] * 15 dec_status = status["dec_j2000_degs"] commanded = (ra, dec) reported = (ra_status, dec_status) elif alt is not None and az is not None: is_altaz = alt is not None and az is not None and not ra and not dec assert is_altaz, "Invalid input parameters" self.write_to_log(f"Moving to alt={alt:.6f} az={az:.6f}.", level="info") await self.pwi.commands.gotoAltAzJ2000( alt, az, timeout=self.timeouts["slew"], ) # Check the position the PWI reports. status = await self.status() az_status = status["azimuth_degs"] alt_status = status["altitude_degs"] commanded = (az, alt) reported = (az_status, alt_status) else: raise GortTelescopeError("Invalid coordinates.") # Check if we reached the position. If not retry once or fail. separation = angular_separation(*commanded, *reported) if separation > 0.5: if retry: self.write_to_log( "Telescope failed to reach the desired position. Retrying", "warning", ) # Need to make sure the k-mirror is not moving before re-trying. if kmirror_task is not None and not kmirror_task.done(): await kmirror_task await asyncio.sleep(3) return await self.goto_coordinates( ra=ra, dec=dec, alt=alt, az=az, pa=pa, kmirror=kmirror, kmirror_kwargs=kmirror_kwargs, altaz_tracking=altaz_tracking, force=force, retry=False, ) else: await self.actor.commands.setEnabled(False) raise GortTelescopeError( "Telescope failed to reach desired position. " "The axes have been disabled for safety. " "Try re-homing the telescope.", error_code=ErrorCode.FAILED_REACHING_COMMANDED_POSITION, ) if alt is not None and az is not None and altaz_tracking: await self.pwi.commands.setTracking(enable=True) if kmirror_task is not None: await kmirror_task
[docs] async def goto_named_position( self, name: str, altaz_tracking: bool = False, force: bool = False, ): """Sends the telescope to a named position. Parameters ---------- name The name of the position, e.g., ``'zenith'``. altaz_tracking Whether to start tracking after reaching the position, if the coordinates are alt/az. force Move the telescope even in local enclosure mode. """ if not force and (await self.gort.enclosure.is_local()): raise GortTelescopeError( "Cannot move telescope in local mode.", error_code=ErrorCode.CANNOT_MOVE_LOCAL_MODE, ) if name not in self.config["named_positions"]: raise GortTelescopeError( f"Invalid named position {name!r}.", error_code=ErrorCode.INVALID_TELESCOPE_POSITION, ) position_data = self.config["named_positions"][name] if self.name in position_data: coords = position_data[self.name] elif "all" in position_data: coords = position_data["all"] else: raise GortTelescopeError( "Cannot find position data.", error_code=ErrorCode.INVALID_TELESCOPE_POSITION, ) if "alt" in coords and "az" in coords: coro = self.goto_coordinates( alt=coords["alt"], az=coords["az"], altaz_tracking=altaz_tracking, force=force, ) elif "ra" in coords and "dec" in coords: coro = self.goto_coordinates( ra=coords["ra"], dec=coords["dec"], force=force, ) else: raise GortTelescopeError( "No ra/dec or alt/az coordinates found.", error_code=ErrorCode.INVALID_TELESCOPE_POSITION, ) await coro
[docs] async def offset( self, ra: float | None = None, dec: float | None = None, axis0: float | None = None, axis1: float | None = None, ): """Apply an offset to the telescope axes. Parameters ---------- ra Offset in RA, in arcsec. dec Offset in Dec, in arcsec. axis0 Offset in axis 0, in arcsec. axis1 Offset in axis 1, in arcsec. """ if any([ra, dec]) and any([axis0, axis1]): raise GortTelescopeError( "Cannot offset in ra/dec and axis0/axis1 at the same time." ) kwargs = {} if any([ra, dec]): if ra is not None: kwargs["ra_add_arcsec"] = ra if dec is not None: kwargs["dec_add_arcsec"] = dec self.write_to_log(f"Offsetting RA={ra:.3f}, Dec={dec:.3f}.") elif any([axis0, axis1]): if axis0 is not None: kwargs["axis0_add_arcsec"] = axis0 if axis1 is not None: kwargs["axis1_add_arcsec"] = axis1 self.write_to_log(f"Offsetting axis0={axis0:.3f}, axis1={axis1:.3f}.") else: # This should not happen. raise GortTelescopeError("No offsets provided.") await self.actor.commands.offset(**kwargs) self.write_to_log("Offset complete.") return True
[docs] class TelescopeSet(GortDeviceSet[Telescope]): """A representation of a set of telescopes.""" __DEVICE_CLASS__ = Telescope __DEPLOYMENTS__ = [ "lvmpwi-sci", "lvmpwi-spec", "lvmpwi-skye", "lvmpwi-skyw", "lvmtan", ] def __init__(self, gort: Gort, data: dict[str, dict]): super().__init__(gort, data) self.guiders = self.gort.guiders
[docs] async def initialise(self): """Initialise all telescopes.""" await asyncio.gather(*[tel.initialise() for tel in self.values()])
[docs] async def restart(self): """Restarts the ``lvmpwi`` and ``lvmtan`` deployments and re-homes.""" result = await super().restart() if result is False: self.write_to_log( "Some deployments failed to restart. Not homing devices.", "error", ) return self.write_to_log("Waiting 10 seconds for devices to reconnect.", "info") await asyncio.sleep(10) self.write_to_log("Homing telescope a restart.") await self.home(home_kms=True, home_focusers=True, home_fibsel=True)
[docs] async def restart_lvmtan(self): """Restarts and rehomes Twice-As-Nice controller. After the actor has been restarted the K-mirrors, focuser, and fibre selector are rehomed. The focuser positions are preserved. """ self.write_to_log("Restarting deployment lvmtan and waiting 25 s.", "info") await kubernetes_restart_deployment("lvmtan") await asyncio.sleep(25) await self.home( home_telescopes=False, home_kms=True, home_focusers=True, home_fibsel=True, )
[docs] async def home( self, home_telescopes: bool = True, home_kms: bool = True, home_focusers: bool = False, home_fibsel: bool = False, ): """Initialises and homes the telescope. Parameters --------- home_telescopes Homes the telescopes. Defaults to :obj:`True`. home_kms Homes the K-mirrors, if present. Defaults to :obj:`True`. home_focusers Homes the focusers. Defaults to :obj:`False`. home_fibsel Homes the fibre selector, if present. Defaults to :obj:`False`. """ self.write_to_log("Rehoming all telescopes.", "info") await self.call_device_method( Telescope.home, home_telescope=home_telescopes, home_km=home_kms, home_focuser=home_focusers, home_fibsel=home_fibsel, )
[docs] async def park( self, disable=True, use_pw_park=False, alt_az: tuple[float, float] | None = None, kmirror=True, force=False, ): """Parks the telescopes. Parameters ---------- disable Disables the axes after reaching the park position. use_pw_park Uses the internal park position stored in the PlaneWave mounts. alt_az A tuple with the alt and az position at which to park the telescopes. If not provided, defaults to the ``park`` named position. kmirror Whether to park the mirrors at 90 degrees. force Moves the telescopes even if the mode is local. """ await self.call_device_method( Telescope.park, disable=disable, use_pw_park=use_pw_park, alt_az=alt_az, kmirror=kmirror, force=force, )
[docs] async def goto_coordinates_all( self, ra: float | None = None, dec: float | None = None, alt: float | None = None, az: float | None = None, kmirror: bool = True, altaz_tracking: bool = False, force: bool = False, ): """Moves all the telescopes to a given RA/Dec or Alt/Az. Parameters ---------- ra Right ascension coordinates to move to. dec Declination coordinates to move to. alt Altitude coordinates to move to. az Azimuth coordinates to move to. kmirror Whether to move the k-mirror into position. Only when the coordinates provided are RA/Dec. altaz_tracking If :obj:`True`, starts tracking after moving to alt/az coordinates. By defaul the PWI won't track with those coordinates. force Move the telescopes even if mode is local. """ await self.call_device_method( Telescope.goto_coordinates, ra=ra, dec=dec, alt=alt, az=az, kmirror=kmirror, altaz_tracking=altaz_tracking, force=force, )
[docs] async def goto_named_position( self, name: str, altaz_tracking: bool = False, force: bool = False, ): """Sends the telescopes to a named position. Parameters ---------- name The name of the position, e.g., ``'zenith'``. altaz_tracking Whether to start tracking after reaching the position, if the coordinates are alt/az. force Move the telescopes even in local enclosure mode. """ await self._check_local(force) if name not in self.gort.config["telescopes"]["named_positions"]: raise GortTelescopeError( f"Invalid named position {name!r}.", error_code=ErrorCode.INVALID_TELESCOPE_POSITION, ) await self.call_device_method( Telescope.goto_named_position, name=name, altaz_tracking=altaz_tracking, force=force, )
[docs] async def stop(self): """Stops all the mounts.""" await self.call_device_method(Telescope.stop)
[docs] async def goto( self, sci: tuple[float, float] | tuple[float, float, float] | None = None, spec: tuple[float, float] | None = None, skye: tuple[float, float] | None = None, skyw: tuple[float, float] | None = None, sci_km_stop_degs_before: float = 0.0, force: bool = False, ): """Sends each telescope to a different position. Parameters ---------- sci The RA and Dec where to slew the science telescope. A third value can be provided for the PA. Note that in this case a -1 factor is applied before sending it to the K-mirror. spec The RA and Dec where to slew the spectrophotometric telescope. skye The RA and Dec where to slew the skyE telescope. skyw The RA and Dec where to slew the skyW telescope. sci_km_stop_degs_before The number of degrees before the desired position where to send the science k-mirror. Useful if we want to be sure that positive offsets will be applied without backlash. """ await self._check_local(force) jobs = [] if sci is not None: kmirror_kwargs = {"stop_degs_before": sci_km_stop_degs_before} if len(sci) == 2: jobs.append( self["sci"].goto_coordinates( ra=sci[0], dec=sci[1], kmirror_kwargs=kmirror_kwargs, ) ) else: jobs.append( self["sci"].goto_coordinates( ra=sci[0], dec=sci[1], pa=-sci[2], # Note the -1 here kmirror_kwargs=kmirror_kwargs, ) ) if spec is not None: jobs.append(self["spec"].goto_coordinates(ra=spec[0], dec=spec[1])) if skye is not None: jobs.append(self["skye"].goto_coordinates(ra=skye[0], dec=skye[1])) if skyw is not None: jobs.append(self["skyw"].goto_coordinates(ra=skyw[0], dec=skyw[1])) if len(jobs) == 0: return await asyncio.gather(*jobs)
async def _check_local(self, force: bool = False): """Checks if the telescope is in local mode and raises an error.""" is_local = await self.gort.enclosure.is_local() if is_local and not force: raise GortTelescopeError( "Cannot move telescopes in local mode.", error_code=ErrorCode.CANNOT_MOVE_LOCAL_MODE, )