1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568
|
# Licensed under a 3-clause BSD style license - see LICENSE.rst
import warnings
from dataclasses import dataclass, field
import numpy as np
from astropy import units as u
from astropy.modeling import Model, models, fitting
from astropy.nddata import NDData, VarianceUncertainty
from specreduce.core import SpecreduceOperation
from specreduce.tracing import Trace, FlatTrace
from specutils import Spectrum1D
__all__ = ['BoxcarExtract', 'HorneExtract', 'OptimalExtract']
def _get_boxcar_weights(center, hwidth, npix):
"""
Compute weights given an aperture center, half width,
and number of pixels.
Based on `get_boxcar_weights()` from a JDAT Notebook by Karl Gordon:
https://github.com/spacetelescope/jdat_notebooks/blob/main/notebooks/MIRI_LRS_spectral_extraction/miri_lrs_spectral_extraction.ipynb
Parameters
----------
center : float, required
The index of the aperture's center pixel on the larger image's
cross-dispersion axis.
hwidth : float, required
Half of the aperture's width in the cross-dispersion direction.
npix : float, required
The number of pixels in the larger image's cross-dispersion
axis.
Returns
-------
weights : `~numpy.ndarray`
A 2D image with weights assigned to pixels that fall within the
defined aperture.
"""
weights = np.zeros((npix))
if hwidth == 0:
# the logic below would return all zeros anyways, so might as well save the time
# (negative widths should be avoided by earlier logic!)
return weights
if center-hwidth > npix-0.5 or center+hwidth < -0.5:
# entire window is out-of-bounds
return weights
lower_edge = max(-0.5, center-hwidth) # where -0.5 is lower bound of the image
upper_edge = min(center+hwidth, npix-0.5) # where npix-0.5 is upper bound of the image
# let's avoid recomputing the round repeatedly
int_round_lower_edge = int(round(lower_edge))
int_round_upper_edge = int(round(upper_edge))
# inner pixels that get full weight
# the round in conjunction with the +1 handles the half-pixel "offset",
# the upper bound doesn't have the +1 because array slicing is inclusive on the lower index and
# exclusive on the upper-index
# NOTE: round(-0.5) == 0, which is helpful here for the case where lower_edge == -0.5
weights[int_round_lower_edge+1:int_round_upper_edge] = 1
# handle edge pixels (for cases where an edge pixel is fully-weighted, this will set it again,
# but should still compute a weight of 1. By using N:N+1, we avoid index errors if the edge
# is outside the image bounds. But we do need to avoid negative indices which would count
# from the end of the array.
if int_round_lower_edge >= 0:
weights[int_round_lower_edge:int_round_lower_edge+1] = round(lower_edge) + 0.5 - lower_edge
weights[int_round_upper_edge:int_round_upper_edge+1] = upper_edge - (round(upper_edge) - 0.5)
return weights
def _ap_weight_image(trace, width, disp_axis, crossdisp_axis, image_shape):
"""
Create a weight image that defines the desired extraction aperture.
Based on `ap_weight_images()` from a JDAT Notebook by Karl Gordon:
https://github.com/spacetelescope/jdat_notebooks/blob/main/notebooks/MIRI_LRS_spectral_extraction/miri_lrs_spectral_extraction.ipynb
Parameters
----------
trace : `~specreduce.tracing.Trace`, required
trace object
width : float, required
width of extraction aperture in pixels
disp_axis : int, required
dispersion axis
crossdisp_axis : int, required
cross-dispersion axis
image_shape : tuple with 2 elements, required
size (shape) of image
Returns
-------
wimage : `~numpy.ndarray`
a 2D weight image defining the aperture
"""
wimage = np.zeros(image_shape)
hwidth = 0.5 * width
image_sizes = image_shape[crossdisp_axis]
# loop in dispersion direction and compute weights.
for i in range(image_shape[disp_axis]):
# TODO trace must handle transposed data (disp_axis == 0)
# pass trace.trace.data[i] to avoid any mask if part of the regions is out-of-bounds
wimage[:, i] = _get_boxcar_weights(trace.trace.data[i], hwidth, image_sizes)
return wimage
def _align_along_trace(img, trace_array, disp_axis=1, crossdisp_axis=0):
"""
Given an arbitrary trace ``trace_array`` (an np.ndarray), roll
all columns of ``nddata`` to shift the NDData's pixels nearest
to the trace to the center of the spatial dimension of the
NDData.
"""
# TODO: this workflow does not support extraction for >2D spectra
if not (disp_axis == 1 and crossdisp_axis == 0):
# take the transpose to ensure the rows are the cross-disp axis:
img = img.T
n_rows, n_cols = img.shape
# indices of all columns, in their original order
rows = np.broadcast_to(np.arange(n_rows)[:, None], img.shape)
cols = np.broadcast_to(np.arange(n_cols), img.shape)
# we want to "roll" each column so that the trace sits in
# the central row of the final image
shifts = trace_array.astype(int) - n_rows // 2
# we wrap the indices so we don't index out of bounds
shifted_rows = np.mod(rows + shifts[None, :], n_rows)
return img[shifted_rows, cols]
@dataclass
class BoxcarExtract(SpecreduceOperation):
"""
Does a standard boxcar extraction.
Example: ::
trace = FlatTrace(image, trace_pos)
extract = BoxcarExtract(image, trace)
spectrum = extract(width=width)
Parameters
----------
image : `~astropy.nddata.NDData`-like or array-like, required
image with 2-D spectral image data
trace_object : Trace, required
trace object
width : float, optional
width of extraction aperture in pixels
disp_axis : int, optional
dispersion axis
crossdisp_axis : int, optional
cross-dispersion axis
Returns
-------
spec : `~specutils.Spectrum1D`
The extracted 1d spectrum expressed in DN and pixel units
"""
image: NDData
trace_object: Trace
width: float = 5
disp_axis: int = 1
crossdisp_axis: int = 0
# TODO: should disp_axis and crossdisp_axis be defined in the Trace object?
@property
def spectrum(self):
return self.__call__()
def __call__(self, image=None, trace_object=None, width=None,
disp_axis=None, crossdisp_axis=None):
"""
Extract the 1D spectrum using the boxcar method.
Parameters
----------
image : `~astropy.nddata.NDData`-like or array-like, required
image with 2-D spectral image data
trace_object : Trace, required
trace object
width : float, optional
width of extraction aperture in pixels [default: 5]
disp_axis : int, optional
dispersion axis [default: 1]
crossdisp_axis : int, optional
cross-dispersion axis [default: 0]
Returns
-------
spec : `~specutils.Spectrum1D`
The extracted 1d spectrum with flux expressed in the same
units as the input image, or u.DN, and pixel units
"""
image = image if image is not None else self.image
trace_object = trace_object if trace_object is not None else self.trace_object
width = width if width is not None else self.width
disp_axis = disp_axis if disp_axis is not None else self.disp_axis
crossdisp_axis = crossdisp_axis if crossdisp_axis is not None else self.crossdisp_axis
# handle image processing based on its type
self.image = self._parse_image(image)
# TODO: this check can be removed if/when implemented as a check in FlatTrace
if isinstance(trace_object, FlatTrace):
if trace_object.trace_pos < 1:
raise ValueError('trace_object.trace_pos must be >= 1')
if width < 0:
raise ValueError("width must be positive")
# weight image to use for extraction
wimg = _ap_weight_image(
trace_object,
width,
disp_axis,
crossdisp_axis,
self.image.shape)
# extract
ext1d = np.sum(self.image.data * wimg, axis=crossdisp_axis)
return Spectrum1D(ext1d * self.image.unit,
spectral_axis=self.image.spectral_axis)
@dataclass
class HorneExtract(SpecreduceOperation):
"""
Perform a Horne (a.k.a. optimal) extraction on a two-dimensional
spectrum.
Parameters
----------
image : `~astropy.nddata.NDData`-like or array-like, required
The input 2D spectrum from which to extract a source. An
NDData object must specify uncertainty and a mask. An array
requires use of the ``variance``, ``mask``, & ``unit`` arguments.
trace_object : `~specreduce.tracing.Trace`, required
The associated 1D trace object created for the 2D image.
disp_axis : int, optional
The index of the image's dispersion axis. [default: 1]
crossdisp_axis : int, optional
The index of the image's cross-dispersion axis. [default: 0]
bkgrd_prof : `~astropy.modeling.Model`, optional
A model for the image's background flux.
[default: models.Polynomial1D(2)]
variance : `~numpy.ndarray`, optional
(Only used if ``image`` is not an NDData object.)
The associated variances for each pixel in the image. Must
have the same dimensions as ``image``. If all zeros, the variance
will be ignored and treated as all ones. If any zeros, those
elements will be excluded via masking. If any negative values,
an error will be raised. [default: None]
mask : `~numpy.ndarray`, optional
(Only used if ``image`` is not an NDData object.)
Whether to mask each pixel in the image. Must have the same
dimensions as ``image``. If blank, all non-NaN pixels are
unmasked. [default: None]
unit : `~astropy.units.Unit` or str, optional
(Only used if ``image`` is not an NDData object.)
The associated unit for the data in ``image``. If blank,
fluxes are interpreted in DN. [default: None]
"""
image: NDData
trace_object: Trace
bkgrd_prof: Model = field(default=models.Polynomial1D(2))
variance: np.ndarray = field(default=None)
mask: np.ndarray = field(default=None)
unit: np.ndarray = field(default=None)
disp_axis: int = 1
crossdisp_axis: int = 0
# TODO: should disp_axis and crossdisp_axis be defined in the Trace object?
@property
def spectrum(self):
return self.__call__()
def _parse_image(self, image,
variance=None, mask=None, unit=None, disp_axis=1):
"""
Convert all accepted image types to a consistently formatted
Spectrum1D object.
HorneExtract needs its own version of this method because it is
more stringent in its requirements for input images. The extra
arguments are needed to handle cases where these parameters were
specified as arguments and those where they came as attributes
of the image object.
Parameters
----------
image : `~astropy.nddata.NDData`-like or array-like, required
The image to be parsed. If None, defaults to class' own
image attribute.
variance : `~numpy.ndarray`, optional
(Only used if ``image`` is not an NDData object.)
The associated variances for each pixel in the image. Must
have the same dimensions as ``image``. If all zeros, the variance
will be ignored and treated as all ones. If any zeros, those
elements will be excluded via masking. If any negative values,
an error will be raised.
mask : `~numpy.ndarray`, optional
(Only used if ``image`` is not an NDData object.)
Whether to mask each pixel in the image. Must have the same
dimensions as ``image``. If blank, all non-NaN pixels are
unmasked.
unit : `~astropy.units.Unit` or str, optional
(Only used if ``image`` is not an NDData object.)
The associated unit for the data in ``image``. If blank,
fluxes are interpreted in DN.
disp_axis : int, optional
The index of the image's dispersion axis. Should not be
changed until operations can handle variable image
orientations. [default: 1]
"""
if isinstance(image, np.ndarray):
img = image
elif isinstance(image, u.quantity.Quantity):
img = image.value
else: # NDData, including CCDData and Spectrum1D
img = image.data
# mask is set as None when not specified upon creating a Spectrum1D
# object, so we must check whether it is absent *and* whether it's
# present but set as None
if getattr(image, 'mask', None) is not None:
mask = image.mask
elif mask is not None:
pass
else:
mask = ~np.isfinite(img)
if img.shape != mask.shape:
raise ValueError('image and mask shapes must match.')
# Process uncertainties, converting to variances when able and throwing
# an error when uncertainties are missing or less easily converted
if (hasattr(image, 'uncertainty')
and image.uncertainty is not None):
if image.uncertainty.uncertainty_type == 'var':
variance = image.uncertainty.array
elif image.uncertainty.uncertainty_type == 'std':
warnings.warn("image NDData object's uncertainty "
"interpreted as standard deviation. if "
"incorrect, use VarianceUncertainty when "
"assigning image object's uncertainty.")
variance = image.uncertainty.array**2
elif image.uncertainty.uncertainty_type == 'ivar':
variance = 1 / image.uncertainty.array
else:
# other options are InverseUncertainty and UnknownUncertainty
raise ValueError("image NDData object has unexpected "
"uncertainty type. instead, try "
"VarianceUncertainty or StdDevUncertainty.")
elif (hasattr(image, 'uncertainty')
and image.uncertainty is None):
# ignore variance arg to focus on updating NDData object
raise ValueError('image NDData object lacks uncertainty')
else:
if variance is None:
raise ValueError("if image is a numpy or Quantity array, a "
"variance must be specified. consider "
"wrapping it into one object by instead "
"passing an NDData image.")
elif image.shape != variance.shape:
raise ValueError("image and variance shapes must match")
if np.any(variance < 0):
raise ValueError("variance must be fully positive")
if np.all(variance == 0):
# technically would result in infinities, but since they're all
# zeros, we can override ones to simulate an unweighted case
variance = np.ones_like(variance)
if np.any(variance == 0):
# exclude such elements by editing the input mask
mask[variance == 0] = True
# replace the variances to avoid a divide by zero warning
variance[variance == 0] = np.nan
variance = VarianceUncertainty(variance)
unit = getattr(image, 'unit',
u.Unit(unit) if unit is not None else u.Unit('DN'))
spectral_axis = getattr(image, 'spectral_axis',
np.arange(img.shape[disp_axis]) * u.pix)
return Spectrum1D(img * unit, spectral_axis=spectral_axis,
uncertainty=variance, mask=mask)
def __call__(self, image=None, trace_object=None,
disp_axis=None, crossdisp_axis=None,
bkgrd_prof=None,
variance=None, mask=None, unit=None):
"""
Run the Horne calculation on a region of an image and extract a
1D spectrum.
Parameters
----------
image : `~astropy.nddata.NDData`-like or array-like, required
The input 2D spectrum from which to extract a source. An
NDData object must specify uncertainty and a mask. An array
requires use of the ``variance``, ``mask``, & ``unit`` arguments.
trace_object : `~specreduce.tracing.Trace`, required
The associated 1D trace object created for the 2D image.
disp_axis : int, optional
The index of the image's dispersion axis.
crossdisp_axis : int, optional
The index of the image's cross-dispersion axis.
bkgrd_prof : `~astropy.modeling.Model`, optional
A model for the image's background flux.
variance : `~numpy.ndarray`, optional
(Only used if ``image`` is not an NDData object.)
The associated variances for each pixel in the image. Must
have the same dimensions as ``image``. If all zeros, the variance
will be ignored and treated as all ones. If any zeros, those
elements will be excluded via masking. If any negative values,
an error will be raised.
mask : `~numpy.ndarray`, optional
(Only used if ``image`` is not an NDData object.)
Whether to mask each pixel in the image. Must have the same
dimensions as ``image``. If blank, all non-NaN pixels are
unmasked.
unit : `~astropy.units.Unit` or str, optional
(Only used if ``image`` is not an NDData object.)
The associated unit for the data in ``image``. If blank,
fluxes are interpreted in DN.
Returns
-------
spec_1d : `~specutils.Spectrum1D`
The final, Horne extracted 1D spectrum.
"""
image = image if image is not None else self.image
trace_object = trace_object if trace_object is not None else self.trace_object
disp_axis = disp_axis if disp_axis is not None else self.disp_axis
crossdisp_axis = crossdisp_axis if crossdisp_axis is not None else self.crossdisp_axis
bkgrd_prof = bkgrd_prof if bkgrd_prof is not None else self.bkgrd_prof
variance = variance if variance is not None else self.variance
mask = mask if mask is not None else self.mask
unit = unit if unit is not None else self.unit
# parse image and replace optional arguments with updated values
self.image = self._parse_image(image, variance, mask, unit, disp_axis)
variance = self.image.uncertainty.array
unit = self.image.unit
# mask any previously uncaught invalid values
or_mask = np.logical_or(mask,
~np.isfinite(self.image.data))
img = np.ma.masked_array(self.image.data, or_mask)
mask = img.mask
# If the trace is not flat, shift the rows in each column
# so the image is aligned along the trace:
if isinstance(trace_object, FlatTrace):
mean_init_guess = trace_object.trace
else:
img = _align_along_trace(
img,
trace_object.trace,
disp_axis=disp_axis,
crossdisp_axis=crossdisp_axis
)
# Choose the initial guess for the mean of
# the Gaussian profile:
mean_init_guess = np.broadcast_to(
img.shape[crossdisp_axis] // 2, img.shape[disp_axis]
)
# co-add signal in each image column
ncols = img.shape[crossdisp_axis]
xd_pixels = np.arange(ncols) # y plot dir / x spec dir
coadd = img.sum(axis=disp_axis) / ncols
# fit source profile, using Gaussian model as a template
# NOTE: could add argument for users to provide their own model
gauss_prof = models.Gaussian1D(amplitude=coadd.max(),
mean=coadd.argmax(), stddev=2)
# Fit extraction kernel to column with combined gaussian/bkgrd model
ext_prof = gauss_prof + bkgrd_prof
fitter = fitting.LevMarLSQFitter()
fit_ext_kernel = fitter(ext_prof, xd_pixels, coadd)
# use compound model to fit a kernel to each image column
# NOTE: infers Gaussian1D source profile; needs generalization for others
kernel_vals = []
norms = []
for col_pix in range(img.shape[disp_axis]):
# set gaussian model's mean as column's corresponding trace value
fit_ext_kernel.mean_0 = mean_init_guess[col_pix]
# NOTE: support for variable FWHMs forthcoming and would be here
# fit compound model to column
fitted_col = fit_ext_kernel(xd_pixels)
# save result and normalization
kernel_vals.append(fitted_col)
norms.append(fit_ext_kernel.amplitude_0
* fit_ext_kernel.stddev_0 * np.sqrt(2*np.pi))
# transform fit-specific information
kernel_vals = np.array(kernel_vals).T
norms = np.array(norms)
# calculate kernel normalization, masking NaNs
g_x = np.ma.sum(kernel_vals**2 / variance, axis=crossdisp_axis)
# sum by column weights
weighted_img = np.ma.divide(img * kernel_vals, variance)
result = np.ma.sum(weighted_img, axis=crossdisp_axis) / g_x
# multiply kernel normalization into the extracted signal
extraction = result * norms
# convert the extraction to a Spectrum1D object
return Spectrum1D(extraction * unit,
spectral_axis=self.image.spectral_axis)
@dataclass
class OptimalExtract(HorneExtract):
"""
An alias for `HorneExtract`.
"""
__doc__ += HorneExtract.__doc__
pass
|