[go: up one dir, main page]

File: fluxcal.py

package info (click to toggle)
specreduce 1.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,028 kB
  • sloc: python: 1,432; makefile: 109
file content (313 lines) | stat: -rw-r--r-- 11,762 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
import os

import numpy as np

import matplotlib.pyplot as plt

from astropy.constants import c as cc
from astropy.table import Table
import astropy.units as u

from scipy.interpolate import UnivariateSpline

from specutils import Spectrum1D
from specreduce.core import SpecreduceOperation


__all__ = ['FluxCalibration']


class FluxCalibration(SpecreduceOperation):
    """
    Carries out routine flux calibration operations.

    Parameters
    ----------
    object_spectrum : a Spectrum1D object
            The observed object spectrum to apply the sensfunc to,
            with the wavelength of the data points in Angstroms
            as the ``spectral_axis``, and the magnitudes of the
            data as the ``flux``.
    airmass : float
            The value of the airmass. Note: NOT the header keyword.
    zeropoint : float, optional
            Conversion factor for mag->flux. (Default is 48.60).

    """

    def __call__(self, object_spectrum, airmass=1.00, zeropoint=48.60):
        self.object_spectrum = object_spectrum
        self.airmass = airmass
        self.zeropoint = zeropoint

    def mag2flux(self, spec_in=None):
        """
        Convert magnitudes to flux units. This is important for dealing with standards
        and files from IRAF, which are stored in AB mag units. To be clear, this converts
        to "PHOTFLAM" units in IRAF-speak. Assumes the common flux zeropoint used in IRAF.

        Parameters
        ----------
        spec_in: a Spectrum1D object, optional
            An input spectrum with wavelength of the data points in Angstroms
            as the ``spectral_axis`` and magnitudes of the data as the ``flux``.

        Returns
        -------
        spec_out: specutils.Spectrum1D
            Containing both ``flux`` and ``spectral_axis`` data in which
        the ``flux`` has been properly converted from mag->flux.

        """
        if spec_in is None:
            spec_in = self.object_spectrum

        lamb = spec_in.spectral_axis
        mag = spec_in.flux

        flux = (10.0**((mag + self.zeropt) / (-2.5))) * (cc.to('AA/s').value / lamb ** 2.0)
        flux = flux * u.erg / u.s / u.angstrom / (u.cm * u.cm)

        spec_out = Spectrum1D(spectral_axis=lamb, flux=flux)

        return spec_out

    @staticmethod
    def obs_extinction(obs_file):
        """
        Load the observatory-specific airmass extinction file from the supplied library

        Parameters
        ----------
        obs_file : str, {'apoextinct.dat', 'ctioextinct.dat', 'kpnoextinct.dat', 'ormextinct.dat'}
            The observatory-specific airmass extinction file. If not known for your
            observatory, use one of the provided files (e.g. `kpnoextinct.dat`).
            Following IRAF standard, extinction files have 2-column format
            wavelength (Angstroms), Extinction (Mag per Airmass)

        Returns
        -------
        Xfile: an Astropy table
            Table with the observatory extinction data

        """
        if len(obs_file) == 0:
            raise ValueError('Must select an observatory extinction file.')

        dir = os.path.join(os.path.dirname(os.path.realpath(__file__)),
                           'datasets', 'extinction')

        if not os.path.isfile(os.path.join(dir, obs_file)):
            msg = "No valid standard star found at: " + os.path.join(dir, obs_file)
            raise ValueError(msg)

        # To read in the airmass extinction curve
        Xfile = Table.read(os.path.join(dir, obs_file), format='ascii', names=('wave', 'X'))
        Xfile['wave'].unit = 'AA'

        return Xfile

    def airmass_cor(self, Xfile):
        """
        Correct the spectrum based on the airmass.
        Requires observatory extinction file.

        Parameters
        ----------
        Xfile : astropy.table.Table
            The extinction table from `obs_extinction`, with columns ('wave', 'X')
            that have standard units of: (angstroms, mag/airmass).

        Returns
        -------
        airmass_cor_spec: specutils.Spectrum1D
            The airmass-corrected Spectrum1D object.

        """
        object_spectrum = self.mag2flux()
        airmass = self.airmass

        obj_wave, obj_flux = object_spectrum.spectral_axis, object_spectrum.flux

        # linear interpol airmass extinction onto observed wavelengths
        new_X = np.interp(obj_wave.value, Xfile['wave'], Xfile['X'])

        # air_cor in units of mag/airmass, convert to flux/airmass
        airmass_ext = 10.0**(0.4 * airmass * new_X)

        airmass_cor_spec = Spectrum1D(flux=obj_flux * airmass_ext, spectral_axis=obj_wave)

        return airmass_cor_spec

    def onedstd(self, stdstar):
        """
        Load the onedstd from the supplied library.

        Parameters
        ----------
        stdstar : str
            Name of the standard star file in the specreduce/datasets/onedstds
            directory to be used for the flux calibration. The user must provide the
            subdirectory and file name.

            For example:
            >>> standard_sensfunc(obj_wave, obj_flux, stdstar='spec50cal/bd284211.dat', \
             mode='spline')  # doctest: +SKIP

            If no std is supplied, or an improper path is given, raises a ValueError.

        Returns
        -------
        standard: astropy.talbe.Table
            A table with the onedstd data.
        """
        std_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)),
                               'datasets', 'onedstds')

        if not os.path.isfile(os.path.join(std_dir, stdstar)):
            msg = "No valid standard star found at: " + os.path.join(std_dir, stdstar)
            raise ValueError(msg)

        standard = Table.read(os.path.join(std_dir, stdstar),
                              format='ascii', names=('wave', 'mag', 'width'))
        standard['wave'].unit = u.angstrom
        standard['width'].unit = u.angstrom

        # Standard star spectrum is stored in magnitude units (IRAF conventions)
        std_flux = self.mag2flux(spec_in=Spectrum1D(flux=standard['mag'],
                                                    spectral_axis=standard['wave']))
        std_flux = std_flux.flux
        standard['mag'].unit = u.mag
        standard.add_column(std_flux, name='flux')

        return standard

    def standard_sensfunc(self, standard, mode='linear', polydeg=9,
                          badlines=[6563, 4861, 4341], display=False):
        """
        Compute the standard star sensitivity function.

        Parameters
        ----------
        standard : astropy.table.Table
            output from ``onedstd``, has columns ('wave', 'width', 'mag', 'flux').
        mode : str, optional
            Can be "linear", "spline", or "poly" (Default is linear).
        polydeg : float, optional
            if mode='poly', this is the order of the polynomial to fit through.
            (Default is 9.)
        display : bool, optional
            If True, plot the sensfunc. (Default is False.)
        badlines : array-like list
            A list of values (lines) to mask-out of when generating sensfunc.

        Returns
        -------
        sensfunc_spec : specutils.Spectrum1D
            The sensitivity function in the covered wavelength range
            for the given standard star.

        """
        spec = self.mag2flux()

        obj_wave, obj_flux = spec.spectral_axis, spec.flux

        # Automatically exclude some lines b/c resolution dependent response
        badlines = np.array(badlines, dtype='float')  # Balmer lines

        # Down-sample (ds) the observed flux to the standard's bins
        obj_flux_ds = np.array([], dtype=np.float)
        obj_wave_ds = np.array([], dtype=np.float)
        std_flux_ds = np.array([], dtype=np.float)
        for i in range(len(standard['flux'])):
            rng = np.where((obj_wave.value >= standard['wave'][i] - standard['width'][i] / 2.0) &
                           (obj_wave.value < standard['wave'][i] + standard['width'][i] / 2.0))[0]

            IsH = np.where((badlines >= standard['wave'][i] - standard['width'][i] / 2.0) &
                           (badlines < standard['wave'][i] + standard['width'][i] / 2.0))[0]

            # Does this bin contain observed spectra, and no Balmer lines?
            if (len(rng) > 1) and (len(IsH) == 0):
                obj_flux_ds = np.append(obj_flux_ds, np.nanmean(obj_flux.value[rng]))
                obj_wave_ds = np.append(obj_wave_ds, standard['wave'][i])
                std_flux_ds = np.append(std_flux_ds, standard['flux'][i])

        # the ratio between the standard star catalog flux and observed flux
        ratio = np.abs(std_flux_ds / obj_flux_ds)

        # The actual fit the log of this sensfunc ratio
        # Since IRAF does the 2.5*log(ratio), everything would be in mag units
        LogSensfunc = np.log10(ratio)

        # If invalid interpolation mode selected, make it spline
        if mode.lower() not in ('linear', 'spline', 'poly'):
            mode = 'spline'
            import warnings
            warnings.warn("WARNING: invalid mode set. Changing to default mode 'spline'")

        # Interpolate the calibration (sensfunc) on to observed wavelength grid
        if mode.lower() == 'linear':
            sensfunc2 = np.interp(obj_wave.value, obj_wave_ds, LogSensfunc)
        elif mode.lower() == 'spline':
            spl = UnivariateSpline(obj_wave_ds, LogSensfunc, ext=0, k=2, s=0.0025)
            sensfunc2 = spl(obj_wave.value)
        elif mode.lower() == 'poly':
            fit = np.polyfit(obj_wave_ds, LogSensfunc, polydeg)
            sensfunc2 = np.polyval(fit, obj_wave.value)

        sensfunc_out = (10 ** sensfunc2) * standard['flux'].unit / obj_flux.unit

        sensfunc_spec = Spectrum1D(spectral_axis=obj_wave, flux=sensfunc_out)

        if display is True:
            plt.figure()
            plt.plot(obj_wave, obj_flux * sensfunc_out, c="C0",
                     label="Observed x sensfunc", alpha=0.5)

            # plt.scatter(standard['wave'], std_flux, color='C1', alpha=0.75, label="stdstar")
            plt.scatter(obj_wave_ds, std_flux_ds, color='C1', alpha=0.75)

            plt.xlabel("Wavelength")
            plt.ylabel("Flux")

            plt.xlim(np.nanmin(obj_wave.value), np.nanmax(obj_wave.value))
            plt.ylim(np.nanmin(obj_flux.value * sensfunc_out.value) * 0.98,
                     np.nanmax(obj_flux.value * sensfunc_out.value) * 1.02)
            # plt.legend()
            plt.show()

        return sensfunc_spec

    def apply_sensfunc(self, sensfunc):
        """
        Apply the derived sensitivity function, converts observed units (e.g. ADU/s)
        to physical units (e.g. erg/s/cm2/A).
        Sensitivity function is first linearly interpolated onto the wavelength scale
        of the observed data, and then directly multiplied.

        Parameters
        ----------
        sensfunc : astropy.table.Table
            The output of ``standard_sensfunc``, table has columns ('wave', 'S').

        Returns
        -------
        fluxcal_spec: specutils.Spectrum1D
            The sensfunc corrected ``Spectrum1D`` object.

        """
        spec = self.mag2flux()

        obj_wave, obj_flux = spec.spectral_axis, spec.flux

        # Sort, in case the sensfunc wavelength axis is backwards
        ss = np.argsort(obj_wave.value)

        # Interpolate the sensfunc onto the observed wavelength axis
        sensfunc2 = np.interp(obj_wave.value, sensfunc['wave'][ss], sensfunc['S'][ss])

        object_spectrum = obj_flux * (sensfunc2 * sensfunc['S'].unit)

        fluxcal_spec = Spectrum1D(spectral_axis=obj_wave, flux=object_spectrum)

        return fluxcal_spec