Making an animation

Sometimes it can be useful to make an animation to check whether a set of simulations worked, or to help give a demo. This section shows an example of how to use matplotlib and the OSKAR imager from within a loop to make each frame of an animation by iterating over time samples in a Measurement Set. The script below could either be used as-is, or adapted to a more complex use case. Each frame is generated by reading slices of visibility data in Plotter._animate_func, while the remainder of the script sets up the environment using calls to functions in matplotlib.

The script has the following command-line arguments:

usage: animate_ms.py [-h] [--fov_deg FOV_DEG] [--size SIZE] [--fps FPS]
                    [--out OUT] [--title TITLE]
                    MS [MS ...]

Make an animation from one or more Measurement Sets

positional arguments:
MS                 Measurement Set path(s)

optional arguments:
-h, --help         show this help message and exit
--fov_deg FOV_DEG  Field of view to image, in degrees (default: 0.5)
--size SIZE        Image side length, in pixels (default: 256)
--fps FPS          Frames per second in output (default: 10)
--out OUT          Output filename (default: out.mp4)
--title TITLE      Overall figure title (default: )

Download animate_ms.py:

  1#!/usr/bin/env python3
  2"""
  3Generate an animation by stepping through visibility time samples.
  4"""
  5import argparse
  6import copy
  7
  8import matplotlib
  9
 10matplotlib.use("Agg")
 11# pylint: disable=wrong-import-position
 12from mpl_toolkits.axes_grid1 import make_axes_locatable
 13from matplotlib import animation
 14import matplotlib.pyplot as plt
 15import numpy
 16import oskar
 17
 18# pylint: disable=too-many-instance-attributes
 19class Plotter:
 20    """Generate an animation by stepping through visibility time samples."""
 21
 22    def __init__(self):
 23        """Constructor."""
 24        self._artists = ()
 25        self._axes = None
 26        self._base_settings = {}
 27        self._fig = None
 28        self._ms_list = []
 29        self._ms_names = []
 30        self._num_frames = 0
 31        self._title = ""
 32
 33    def animate(self, imager_settings, ms_names, title="", fps=10, filename="out.mp4"):
 34        """Function to generate the animation.
 35
 36        Args:
 37            imager_settings (dict): Base settings for OSKAR imager.
 38            ms_names (list[str]): List of Measurement Sets to image.
 39            title (str): Main figure title.
 40            fps (int): Frames-per-second.
 41            filename (str): Name of output MP4 file.
 42        """
 43        # Store arguments.
 44        self._base_settings = imager_settings
 45        self._ms_names = ms_names
 46        self._title = title
 47        self._ms_list.clear()
 48
 49        # Work out the number of frames to generate.
 50        num_images = len(self._ms_names)
 51        self._num_frames = 0
 52        for i in range(num_images):
 53            ms = oskar.MeasurementSet.open(self._ms_names[i], readonly=True)
 54            num_rows = ms.num_rows
 55            num_stations = ms.num_stations
 56            num_baselines = (num_stations * (num_stations - 1)) // 2
 57            self._num_frames = max(self._num_frames, num_rows // num_baselines)
 58            self._ms_list.append(ms)
 59
 60        # Create the plot panels.
 61        num_cols = num_images
 62        if num_cols > 4:
 63            num_cols = 4
 64        num_rows = (num_images + num_cols - 1) // num_cols
 65        panel_size = 8
 66        if num_images > 1:
 67            panel_size = 5
 68        if num_images > 3:
 69            panel_size = 4
 70        fig_size = (num_cols * panel_size, num_rows * panel_size)
 71        fig, axes = plt.subplots(
 72            nrows=num_rows, ncols=num_cols, squeeze=False, figsize=fig_size
 73        )
 74        self._fig = fig
 75        self._axes = axes.flatten()
 76
 77        # Call the animate function.
 78        anim = animation.FuncAnimation(
 79            self._fig,
 80            self._animate_func,
 81            init_func=self._init_func,
 82            frames=range(0, self._num_frames),
 83            interval=1000.0 / fps,
 84            blit=False,
 85        )
 86
 87        # Save animation.
 88        anim.save(filename, writer="ffmpeg", bitrate=3500)
 89        plt.close(fig=fig)
 90
 91    def _init_func(self):
 92        """Internal initialisation function called by FuncAnimation."""
 93        # Create an empty image.
 94        imsize = self._base_settings["image/size"]
 95        zeros = numpy.zeros((imsize, imsize))
 96        zeros[0, 0] = 1
 97
 98        # Create list of matplotlib artists that must be updated each frame.
 99        artists = []
100
101        # Iterate plot panels.
102        for i in range(len(self._axes)):
103            ax = self._axes[i]
104            im = ax.imshow(zeros, aspect="equal", cmap="gnuplot2")
105            divider = make_axes_locatable(ax)
106            cax = divider.append_axes("right", size="5%", pad=0.05)
107            cbar = plt.colorbar(im, cax=cax)
108            ax.invert_yaxis()
109            ax.axes.xaxis.set_visible(False)
110            ax.axes.yaxis.set_visible(False)
111            if i < len(self._ms_names):
112                ax.set_title(self._ms_names[i])
113            else:
114                cbar.set_ticks([])
115                cbar.set_ticklabels([])
116            artists.append(im)
117
118        # Set figure title.
119        self._fig.suptitle(self._title, fontsize=16, y=0.95)
120
121        # Return tuple of artists to update.
122        self._artists = tuple(artists)
123        return self._artists
124
125    def _animate_func(self, frame):
126        """Internal function called per frame by FuncAnimation.
127
128        Args:
129            frame (int): Frame index.
130        """
131        # Iterate plot panels.
132        num_panels = len(self._ms_list)
133        for i in range(num_panels):
134            # Read the visibility meta data.
135            freq_start_hz = self._ms_list[i].freq_start_hz
136            freq_inc_hz = self._ms_list[i].freq_inc_hz
137            num_channels = self._ms_list[i].num_channels
138            num_stations = self._ms_list[i].num_stations
139            num_rows = self._ms_list[i].num_rows
140            num_baselines = (num_stations * (num_stations - 1)) // 2
141
142            # Read the visibility data and coordinates.
143            start_row = frame * num_baselines
144            if start_row >= num_rows or start_row + num_baselines > num_rows:
145                continue
146            (u, v, w) = self._ms_list[i].read_coords(start_row, num_baselines)
147            vis = self._ms_list[i].read_column("DATA", start_row, num_baselines)
148            num_pols = vis.shape[-1]
149
150            # Create settings for the imager.
151            params = copy.deepcopy(self._base_settings)
152            settings = oskar.SettingsTree("oskar_imager")
153            settings.from_dict(params)
154
155            # Make the image for this frame.
156            print(
157                "Generating frame %d/%d, panel %d/%d"
158                % (frame + 1, self._num_frames, i + 1, num_panels)
159            )
160            imager = oskar.Imager(settings=settings)
161            imager.set_vis_frequency(freq_start_hz, freq_inc_hz, num_channels)
162            imager.update(u, v, w, vis, end_channel=num_channels - 1, num_pols=num_pols)
163            data = imager.finalise(return_images=1)
164
165            # Update the plot panel and colourbar.
166            self._artists[i].set_data(data["images"][0])
167            self._artists[i].autoscale()
168
169
170def main():
171    """Main function."""
172    parser = argparse.ArgumentParser(
173        description="Make an animation from one or more Measurement Sets",
174        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
175    )
176    parser.add_argument(
177        "ms_names", metavar="MS", nargs="+", help="Measurement Set path(s)"
178    )
179    parser.add_argument(
180        "--fov_deg", type=float, default=0.5, help="Field of view to image, in degrees"
181    )
182    parser.add_argument(
183        "--size", type=int, default=256, help="Image side length, in pixels"
184    )
185    parser.add_argument(
186        "--fps", type=int, default=10, help="Frames per second in output"
187    )
188    parser.add_argument("--out", default="out.mp4", help="Output filename")
189    parser.add_argument("--title", default="", help="Overall figure title")
190    args = parser.parse_args()
191
192    # Imager settings.
193    imager_settings = {"image/fov_deg": args.fov_deg, "image/size": args.size}
194
195    # Make animation.
196    plotter = Plotter()
197    plotter.animate(imager_settings, args.ms_names, args.title, args.fps, args.out)
198
199
200if __name__ == "__main__":
201    main()

Example: Single-station drift scan of Galactic plane

As an example, the following OSKAR parameter file will generate a simulated Measurement Set for a 24-hour drift-scan observation of the Galactic plane using a telescope model consisting of a single 38-metre diameter SKA-Low station of 256 isotropic elements.

Download drift_scan_galaxy.ini:

[General]
app=oskar_sim_interferometer
version=2.8.0

[simulator]
double_precision=false

[sky]
healpix_fits/file=haslam_nside_128.fits
healpix_fits/min_abs_val=30.0

[observation]
mode=Drift scan
start_frequency_hz=1.0e+08
start_time_utc=2000-01-01 09:30:00.0
length=24:00:00.0
num_time_steps=96

[telescope]
input_directory=single_station.tm
pol_mode=Scalar
station_type=Isotropic beam

[interferometer]
ms_filename=drift_scan_galaxy.ms

The animation below was then produced by running the animate_ms.py script with the following command-line arguments using the output Measurement Set:

./animate_ms.py --fov_deg=180 --fps=20 --title="OSKAR drift scan test" --out=drift_scan.mp4 drift_scan_galaxy.ms