#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2023-06-25
# @Filename: ag.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import asyncio
from gort.devices.core import GortDevice, GortDeviceSet
from gort.exceptions import GortDeviceError
from gort.gort import Gort
from gort.tools import ping_host, run_lvmapi_task
__all__ = ["AG", "AGSet"]
[docs]
class AG(GortDevice):
"""Class representing an AG camera."""
def __init__(self, gort: Gort, name: str, actor: str, **device_data):
super().__init__(gort, name, actor)
self.telescope = name
self.ips = {
"east": device_data["ips"]["east"]
if "ips" in device_data and "east" in device_data["ips"]
else None,
"west": device_data["ips"]["west"]
if "ips" in device_data and "west" in device_data["ips"]
else None,
}
offline_cameras = self.gort.config.get("ags", {}).get("offline_cameras", [])
self.offline_cameras = [cam for cam in offline_cameras if cam.startswith(name)]
@property
def n_cameras(self):
"""The number of AG cameras for this telescope."""
all_cams = len([1 for ip in self.ips.values() if ip is not None])
return all_cams - len(self.offline_cameras)
[docs]
async def status(self):
"""Returns the status of the AG."""
return await self.actor.commands.status()
[docs]
async def is_idle(self):
"""Returns :obj:`True` if all the cameras are idle."""
status = await self.status()
for reply in status.replies:
if reply["status"]["camera_state"] != "idle":
return False
return True
[docs]
async def reconnect(self):
"""Reconnect the AG cameras."""
self.write_to_log("Reconnecting AG cameras.")
actor_reply = await self.actor.commands.reconnect()
for reply in actor_reply.replies:
if "error" in reply:
error = reply["error"]
if "arv-device-error-quark" in error:
raise GortDeviceError(
f"One or more {self.name} cameras failed to reconnect. "
f"The cameras may be in locked mode. Error: {error}"
)
else:
self.write_to_log(
f"Error reconnecting {self.name} cameras: {error}",
level="error",
)
[docs]
async def expose(
self,
exposure_time: float = 5.0,
flavour: str = "object",
**kwargs,
):
"""Exposes the cameras.
Parameters
----------
exposure_time
The amount of time, in seconds, to expose the cameras.
flavour
The type of image to take, one of ``'bias'``, ``'dark'``, ``'flat'``,
or ``'object'``.
kwargs
Any parameters to send to the ``lvmcam expose`` command.
"""
if flavour == "bias":
kwargs["bias"] = True
elif flavour == "flat":
kwargs["flat"] = True
elif flavour == "dark":
kwargs["dark"] = True
else:
kwargs["object"] = True
return await self.actor.commands.expose(
exptime=exposure_time,
**kwargs,
)
[docs]
async def check_camera(self, ping: bool = True, status: bool = True):
"""Checks that the AG cameras are responding.
Parameters
----------
ping
Whether to ping the cameras.
status
Whether to check the camera status.
"""
if not ping and not status:
raise ValueError("At least one of ping or status must be True.")
failed: set[str] = set()
if ping:
for side, ip in self.ips.items():
name = f"{self.name}-{side}"
if ip is None:
continue
if not await ping_host(ip):
self.write_to_log(f"AG {name} did not ping.", "warning")
failed.add(name)
else:
self.write_to_log(f"AG {name} pinged back.", "debug")
if status:
try:
await self.status()
except Exception:
failed.update(self.ips.keys())
if len(failed) == 0:
return True
raise GortDeviceError(
f"The following AG cameras are not responding: {', '.join(failed)}."
)
[docs]
class AGSet(GortDeviceSet[AG]):
"""A set of auto-guiders."""
__DEVICE_CLASS__ = AG
__DEPLOYMENTS__ = ["lvmagcam"]
@property
def n_cameras(self):
"""The number of AG cameras in the set."""
return sum([ag.n_cameras for ag in self.values()])
[docs]
async def reconnect(self):
"""Reconnects all the AG cameras.
Returns
-------
cameras
A list of cameras available after reconnecting.
"""
replies = await asyncio.gather(
*[ag.reconnect() for ag in self.values()],
return_exceptions=True,
)
for reply in replies:
if isinstance(reply, BaseException):
raise reply
return await self.list_alive_cameras()
[docs]
async def are_idle(self):
"""Returns :obj:`True` if all the cameras are idle."""
return all(await asyncio.gather(*[ag.is_idle() for ag in self.values()]))
[docs]
async def check_cameras(self, allow_power_cycle: bool = True):
"""Checks that all cameras are responding.
Parameters
----------
allow_power_cycle
Whether to allow power cycling the cameras if they are not responding.
"""
try:
await asyncio.gather(*[ag.check_camera() for ag in self.values()])
except GortDeviceError:
if not allow_power_cycle:
raise GortDeviceError("One or more AG cameras are not responding.")
self.write_to_log(
"One or more AG cameras are not responding. Power cycling...",
"warning",
)
await self.power_cycle()
await self.check_cameras(allow_power_cycle=False)
return True
[docs]
async def list_alive_cameras(self):
"""Returns a list of cameras found alive and well.
Currently when a camera has disconnected, the status command still reports
it as "online", but it doesn't report its temperature or other parameters.
"""
all_status = await asyncio.gather(
*[ag.status() for ag in self.values()],
return_exceptions=True,
)
replies = []
for status in all_status:
if isinstance(status, BaseException):
continue
for reply in status.replies:
try:
replies.append({"actor": status.actor.name, **reply["status"]})
except Exception:
pass
cameras: list[str] = []
for reply in replies:
actor = reply["actor"]
telescope = actor.split(".")[1]
camera = reply.get("camera", None)
state = reply.get("camera_state", None)
if camera and state:
cameras.append(f"{telescope}-{camera}")
return cameras
[docs]
async def expose(
self,
exposure_time: float = 5.0,
flavour: str = "object",
**kwargs,
):
"""Exposes the cameras.
Parameters
----------
exposure_time
The amount of time, in seconds, to expose the cameras.
flavour
The type of image to take, one of ``'bias'``, ``'dark'``, ``'flat'``,
or ``'object'``.
kwargs
Any parameters to send to the ``lvmcam expose`` command.
"""
return await asyncio.gather(
*[
ag.expose(
exposure_time=exposure_time,
flavour=flavour,
**kwargs,
)
for ag in self.values()
]
)
[docs]
async def power_cycle(self):
"""Power cycles all the cameras."""
await run_lvmapi_task(
"/macros/power_cycle_ag_cameras",
params={"reconnect": False},
)
# Restart the actors
await self.restart()
await asyncio.sleep(15)
for retry in range(2):
try:
alive_cameras = await self.list_alive_cameras()
except Exception:
alive_cameras = []
if len(alive_cameras) != self.n_cameras:
if retry == 1:
raise RuntimeError("Not all cameras are responding.")
self.write_to_log(
"Not all cameras are responding. Waiting 30 seconds and retrying.",
"warning",
)
await asyncio.sleep(30)
await self.reconnect()
return True