Source code for rascil.processing_components.calibration.pointing

""" Functions for calibration, including creation of pointingtables, application of pointingtables, and
merging pointingtables.

"""

__all__ = [
    "create_pointingtable_from_rows",
    "create_pointingtable_from_visibility",
    "copy_pointingtable",
]

import copy
import logging

from typing import Union

import numpy.linalg

from rascil.data_models.memory_data_models import (
    PointingTable,
    Visibility,
    QualityAssessment,
)
from rascil.data_models.polarisation import ReceptorFrame

from rascil.processing_components.util.coordinate_support import hadec_to_azel
from rascil.processing_components.visibility.visibility_geometry import (
    calculate_visibility_hourangles,
)

log = logging.getLogger("rascil-logger")


def pointingtable_summary(pt: PointingTable):
    """Return string summarizing the Gaintable"""
    return "%s rows, %.3f GB" % (pt["pixels"].data.shape, pt.size())


[docs]def create_pointingtable_from_visibility( vis: Visibility, pointing_frame="azel", timeslice=None, frequencyslice: float = None, **kwargs ) -> PointingTable: """Create pointing table from visibility. This makes an empty pointing table consistent with the Visibility. :param vis: BlockVisibilty :param timeslice: Time interval between solutions (s) :param frequency_width: Frequency solution width (Hz) :return: PointingTable """ nants = vis.visibility_acc.nants if timeslice is None or timeslice == "auto": utimes = numpy.unique(vis.time) pointing_interval = vis.integration_time else: utimes = vis.time.data[0] + timeslice * numpy.unique( numpy.round((vis.time.data - vis.time.data[0]) / timeslice) ) pointing_interval = timeslice * numpy.ones_like(utimes) # log.debug('create_pointingtable_from_visibility: times are %s' % str(utimes)) # log.debug('create_pointingtable_from_visibility: intervals are %s' % str(pointing_interval)) ntimes = len(utimes) ufrequency = numpy.unique(vis.frequency) nfrequency = len(ufrequency) receptor_frame = ReceptorFrame(vis.visibility_acc.polarisation_frame.type) nrec = receptor_frame.nrec pointingshape = [ntimes, nants, nfrequency, nrec, 2] pointing = numpy.zeros(pointingshape) if nrec > 1: pointing[..., 0, 0] = 0.0 pointing[..., 1, 0] = 0.0 pointing[..., 0, 1] = 0.0 pointing[..., 1, 1] = 0.0 ha = calculate_visibility_hourangles(vis).to("rad").value dec = vis.phasecentre.dec.rad latitude = vis.configuration.location.lat.rad az, el = hadec_to_azel(ha, dec, latitude) pointing_nominal = numpy.zeros([ntimes, nants, nfrequency, nrec, 2]) pointing_nominal[..., 0] = az[:, numpy.newaxis, numpy.newaxis, numpy.newaxis] pointing_nominal[..., 1] = el[:, numpy.newaxis, numpy.newaxis, numpy.newaxis] pointing_weight = numpy.ones(pointingshape) pointing_time = utimes pointing_frequency = ufrequency pointing_residual = numpy.zeros([ntimes, nfrequency, nrec, 2]) pointing_frame = pointing_frame pt = PointingTable( pointing=pointing, nominal=pointing_nominal, time=pointing_time, interval=pointing_interval, weight=pointing_weight, residual=pointing_residual, frequency=pointing_frequency, receptor_frame=receptor_frame, pointing_frame=pointing_frame, pointingcentre=vis.phasecentre, configuration=vis.configuration, ) return pt
[docs]def copy_pointingtable(pt: PointingTable, zero=False): """Copy a PointingTable Performs a deepcopy of the data array """ if pt is None: return pt ##assert isinstance(pt, PointingTable), pt newpt = copy.copy(pt) newpt.data = copy.deepcopy(pt.data) if zero: newpt.data["pt"][...] = 0.0 return newpt
[docs]def create_pointingtable_from_rows( pt: PointingTable, rows: numpy.ndarray, makecopy=True ) -> Union[PointingTable, None]: """Create a PointingTable from selected rows :param pt: PointingTable :param rows: Boolean array of row selection :param makecopy: Make a deep copy (True) :return: PointingTable """ if rows is None or numpy.sum(rows) == 0: return None assert ( len(rows) == pt.ntimes ), "Lenpth of rows does not agree with lenpth of PointingTable" ##assert isinstance(pt, PointingTable), pt if makecopy: newpt = copy_pointingtable(pt) newpt.data = copy.deepcopy(pt.data[rows]) return newpt else: pt.data = copy.deepcopy(pt.data[rows]) return pt
def qa_pointingtable(pt: PointingTable, context=None) -> QualityAssessment: """Assess the quality of a pointingtable :param pt: :return: AQ """ apt = numpy.abs(pt.pointing[pt.weight > 0.0]) ppt = numpy.angle(pt.pointing[pt.weight > 0.0]) data = { "shape": pt.pointing.shape, "maxabs-amp": numpy.max(apt), "minabs-amp": numpy.min(apt), "rms-amp": numpy.std(apt), "medianabs-amp": numpy.median(apt), "maxabs-phase": numpy.max(ppt), "minabs-phase": numpy.min(ppt), "rms-phase": numpy.std(ppt), "medianabs-phase": numpy.median(ppt), "residual": numpy.max(pt.residual), } return QualityAssessment(origin="qa_pointingtable", data=data, context=context)