""" Functions for calibration, including creation of pointingtables, application of pointingtables, and
merging pointingtables.
"""
__all__ = [
"create_pointingtable_from_rows",
"create_pointingtable_from_visibility",
"copy_pointingtable",
]
import copy
import logging
from typing import Union
import numpy.linalg
from rascil.data_models.memory_data_models import (
PointingTable,
Visibility,
QualityAssessment,
)
from rascil.data_models.polarisation import ReceptorFrame
from rascil.processing_components.util.coordinate_support import hadec_to_azel
from rascil.processing_components.visibility.visibility_geometry import (
calculate_visibility_hourangles,
)
log = logging.getLogger("rascil-logger")
def pointingtable_summary(pt: PointingTable):
"""Return string summarizing the Gaintable"""
return "%s rows, %.3f GB" % (pt["pixels"].data.shape, pt.size())
[docs]def create_pointingtable_from_visibility(
vis: Visibility,
pointing_frame="azel",
timeslice=None,
frequencyslice: float = None,
**kwargs
) -> PointingTable:
"""Create pointing table from visibility.
This makes an empty pointing table consistent with the Visibility.
:param vis: BlockVisibilty
:param timeslice: Time interval between solutions (s)
:param frequency_width: Frequency solution width (Hz)
:return: PointingTable
"""
nants = vis.visibility_acc.nants
if timeslice is None or timeslice == "auto":
utimes = numpy.unique(vis.time)
pointing_interval = vis.integration_time
else:
utimes = vis.time.data[0] + timeslice * numpy.unique(
numpy.round((vis.time.data - vis.time.data[0]) / timeslice)
)
pointing_interval = timeslice * numpy.ones_like(utimes)
# log.debug('create_pointingtable_from_visibility: times are %s' % str(utimes))
# log.debug('create_pointingtable_from_visibility: intervals are %s' % str(pointing_interval))
ntimes = len(utimes)
ufrequency = numpy.unique(vis.frequency)
nfrequency = len(ufrequency)
receptor_frame = ReceptorFrame(vis.visibility_acc.polarisation_frame.type)
nrec = receptor_frame.nrec
pointingshape = [ntimes, nants, nfrequency, nrec, 2]
pointing = numpy.zeros(pointingshape)
if nrec > 1:
pointing[..., 0, 0] = 0.0
pointing[..., 1, 0] = 0.0
pointing[..., 0, 1] = 0.0
pointing[..., 1, 1] = 0.0
ha = calculate_visibility_hourangles(vis).to("rad").value
dec = vis.phasecentre.dec.rad
latitude = vis.configuration.location.lat.rad
az, el = hadec_to_azel(ha, dec, latitude)
pointing_nominal = numpy.zeros([ntimes, nants, nfrequency, nrec, 2])
pointing_nominal[..., 0] = az[:, numpy.newaxis, numpy.newaxis, numpy.newaxis]
pointing_nominal[..., 1] = el[:, numpy.newaxis, numpy.newaxis, numpy.newaxis]
pointing_weight = numpy.ones(pointingshape)
pointing_time = utimes
pointing_frequency = ufrequency
pointing_residual = numpy.zeros([ntimes, nfrequency, nrec, 2])
pointing_frame = pointing_frame
pt = PointingTable(
pointing=pointing,
nominal=pointing_nominal,
time=pointing_time,
interval=pointing_interval,
weight=pointing_weight,
residual=pointing_residual,
frequency=pointing_frequency,
receptor_frame=receptor_frame,
pointing_frame=pointing_frame,
pointingcentre=vis.phasecentre,
configuration=vis.configuration,
)
return pt
[docs]def copy_pointingtable(pt: PointingTable, zero=False):
"""Copy a PointingTable
Performs a deepcopy of the data array
"""
if pt is None:
return pt
##assert isinstance(pt, PointingTable), pt
newpt = copy.copy(pt)
newpt.data = copy.deepcopy(pt.data)
if zero:
newpt.data["pt"][...] = 0.0
return newpt
[docs]def create_pointingtable_from_rows(
pt: PointingTable, rows: numpy.ndarray, makecopy=True
) -> Union[PointingTable, None]:
"""Create a PointingTable from selected rows
:param pt: PointingTable
:param rows: Boolean array of row selection
:param makecopy: Make a deep copy (True)
:return: PointingTable
"""
if rows is None or numpy.sum(rows) == 0:
return None
assert (
len(rows) == pt.ntimes
), "Lenpth of rows does not agree with lenpth of PointingTable"
##assert isinstance(pt, PointingTable), pt
if makecopy:
newpt = copy_pointingtable(pt)
newpt.data = copy.deepcopy(pt.data[rows])
return newpt
else:
pt.data = copy.deepcopy(pt.data[rows])
return pt
def qa_pointingtable(pt: PointingTable, context=None) -> QualityAssessment:
"""Assess the quality of a pointingtable
:param pt:
:return: AQ
"""
apt = numpy.abs(pt.pointing[pt.weight > 0.0])
ppt = numpy.angle(pt.pointing[pt.weight > 0.0])
data = {
"shape": pt.pointing.shape,
"maxabs-amp": numpy.max(apt),
"minabs-amp": numpy.min(apt),
"rms-amp": numpy.std(apt),
"medianabs-amp": numpy.median(apt),
"maxabs-phase": numpy.max(ppt),
"minabs-phase": numpy.min(ppt),
"rms-phase": numpy.std(ppt),
"medianabs-phase": numpy.median(ppt),
"residual": numpy.max(pt.residual),
}
return QualityAssessment(origin="qa_pointingtable", data=data, context=context)