import string
import numpy as np
from datetime import datetime
from collections import OrderedDict
from scipy.stats import sigmaclip
from scipy.spatial import distance
# offers more options, but is about 10 x slower
# from astropy.stats import sigma_clip
from scipy.ndimage import maximum_filter
from astropy.table import Table
from astropy.io import fits
from astropy.nddata.utils import extract_array
import astropy.units as u
from . import __version__ as version
__all__ = ['translate_wcs', 'median_column_remover', 'identify_evt_sigmaclip',
'add_islands5533', 'energy_from_island',
'acis_grade', 'asca_grade',
'hotpixelfromtxt', 'hotpixelbyoccurence', 'make_hotpixellist',
'dist2nextevent',
'ExtractionChain',
'NoEventError',
]
grademap = np.array([[32, 64, 128],
[8, 0, 16],
[1, 2, 4]])
acis2asca = {0: [0], # single event
1: [1, 4, 5, 32, 128, 33, 36, 37, 129, 132, 133,
160, 161, 164, 165], # diagonal split
2: [64, 65, 68, 69, 2, 34, 130, 162], # vertical split
3: [8, 12, 136, 140], # horizontal split left
4: [16, 17, 48, 49], # horizontal split right
5: [3, 6, 9, 20, 40, 96, 144, 192, 13, 21, 35, 38, 44, 52, 53, 97,
100, 101, 131, 134, 137, 141, 145, 163, 166, 168, 172, 176,
177, 193, 196, 197], # L-shaped splits
6: [72, 76, 104, 108, 10, 11, 138, 139, 18, 22, 50, 54, 80, 81,
208, 209], # L & quads
# 7: [] # other
}
[docs]class NoEventError(Exception):
'''Exception to call if no events where found'''
pass
[docs]def translate_wcs(evt, colnames=['X', 'Y']):
'''Translate the WCS from image to event list
Header keywords for WCS are different for images vs. event lists.
This function translates from image to event list.
It does not deal with all possible keywords, just those that
are relevant in the context of this beamline.
It drops the third dimension of the WCS and instead adds a time
column based on frame number.
Parameters
----------
evt : `astropy.table.Table`
Event table with one or more image-type WCSs in the header.
colnames : list of two strings
Column names in the events list that the imaging WCS applies to.
'''
# Fits starts counting at 1
x = evt.colnames.index(colnames[0]) + 1
y = evt.colnames.index(colnames[1]) + 1
for a in [''] + list(string.ascii_letters):
if ('WCSNAME' + a) in evt.meta:
name = evt.meta.pop('WCSNAME' + a)
evt.meta['TWCS{}{}'.format(x, a)] = name
evt.meta['TWCS{}{}'.format(y, a)] = name
evt.meta.pop('WCSAXES' + a)
for old, new in zip(['CRVAL', 'CRPIX', 'CDELT', 'CUNIT', 'CTYPE'],
['TCRVL', 'TCRPX', 'TCDLT', 'TCUNI', 'TCTYP']):
# For all alternative WCS systems, use only 4 chars
if len(a) == 1:
new = new[: -1]
for xy, ind in zip('123', [x, y, None]):
val = evt.meta.pop(old + xy + a)
# Do not reassign Axis 3
if ind is not None:
evt.meta['{}{}{}'.format(new, ind, a)] = val
# New time column calculated fresh from header values
evt['TIME'] = (evt['FRAME'] + evt.meta['THROWOUT'][0]) * evt.meta['FRAMETIM'][0]
# remove NAXIS keywords - not useful for event list
naxlist = [k for k in evt.meta if k.startswith('NAXIS')]
for k in naxlist:
del evt.meta[k]
# Table is in extension and not simple
del evt.meta['SIMPLE']
[docs]def identify_evt_sigmaclip(image, sigma_clip_level=5, peak_size=3):
"""Find the events of the given image.
This function identifies local maxima, using some cuts.
Parameters
----------
image : np.array of shape (frame, x, y)
background subtracted image
sigma_clip_level : float
Level for sigma clipping
peak_size : int
Event are recognized, if they are highest pixel in an island
of size (peacksize * peak_size).
Returns
-------
events : `astropy.table.Table`
Event table
"""
sc = sigmaclip(image, high=sigma_clip_level, low=sigma_clip_level)
# Use maximum filter to identify local peaks
mf = maximum_filter(image, size=(1, peak_size, peak_size))
# masks first three columns which always have very low DN's
# What to do with this?
# filtered.mask[:, :, :3] = False
frame, x, y = ((image > sc.upper) & (image == mf)).nonzero()
# Fits convention is to start pixel counting at 1
evt = Table([x + 1, y + 1, frame], names=['X', 'Y', 'FRAME'])
evt['X'].unit = u.pix
evt['Y'].unit = u.pix
return evt
[docs]def add_islands5533(evt, image):
'''Extract 5x5 and 3x3 event islands from image at FRAME, X, Y pos in ``evt``
Parameters
----------
evt : `astropy.table.Table`
Event table
image : np.array of shape (frame, x, y)
3d background subtracted image
'''
evt['5X5'] = [extract_array(image[i, :, :], (5, 5), (j, k))
for i, j, k in zip(evt['FRAME'], evt['X'] - 1 - (evt.meta['ROIX0'][0] - 1),
evt['Y'] - 1 - (evt.meta['ROIY0'][0] - 1))]
evt['3X3'] = evt['5X5'].data[:, 1:-1, 1:-1]
[docs]def energy_from_island(evt, islandcol='3X3'):
'''Returns event energy based on event island
Parameters
----------
evt : `astropy.table.Table`
Event table
islandcol : string
Name of column that holds the data for the pixel island
Returns
-------
energy : `astropy.units.quantity.Quantity`
Event energy
'''
return (evt[islandcol].data.sum(axis=(1, 2)) +
np.random.rand(len(evt)) - .5) * 2.2 * 3.66 * u.eV
[docs]def acis_grade(evt):
'''Returns acis grade based on 3x3 event island
Parameters
----------
evt : `astropy.table.Table`
Event table
Returns
-------
grade : np.array
Acis grade 0-255
'''
return ((evt['3X3'].data > 10) * grademap).sum(axis=(1, 2))
[docs]def asca_grade(evt, acisgrade='GRADE'):
"""Returns asca grade given an acis grade.
Parameters
----------
evt : `astropy.table.Table`
Event table
acisgrade : string
Column name in ``evt`` that holds the acis grade 0-256
Returns
-------
asca grade : int
The asca grade 0-7
"""
# Set default to "other" and then loop over grades
asca = 7 * np.ones_like(evt[acisgrade])
for grade in acis2asca:
asca[np.isin(evt[acisgrade], acis2asca[grade])] = grade
return asca
[docs]def hotpixelfromtxt(evt, x, y):
'''Flag all events with coordinates matching a given hot pixel list.
Parameters
----------
evt : `astropy.table.Table`
Events table
x, y : list or np.array
List of X Y coordinates of hot pixels
flagcol : string
Name of column in the events file where hot pixels are recorded.
(If the column exists it will be overwritten.)
Returns
-------
hotpix : list of bool
Flag column
'''
if len(x) != len(y):
raise ValueError('x and y coordinates for hot pixels must have same number of entries.')
hotpix = [(a, b) for a, b in zip(x, y)]
return [(a, b) in hotpix for a, b in zip(evt['X'], evt['Y'])]
[docs]def hotpixelbyoccurence(evt, n=None):
"""Mark events with coordinates that appear more than a n times.
Parameters
----------
evt : `astropy.table.Table`
Events table
n : int
Lower bound number of times a pixel has to appear in order to be removed.
If not set, this defaults to 3, and then 1/3 of the number of frames
if there are more than 9.
Returns
-------
hotpix : np.array of bool
Flag column
"""
if n is None:
n = max(evt.meta['FRAMES'][0] / 3, 3)
xy = np.vstack([evt['X'], evt['Y']])
# This is the line where everything happens:
unique, unique_inv, unique_counts = np.unique(xy, return_inverse=True,
return_counts=True, axis=1)
return unique_counts[unique_inv] > n
[docs]def make_hotpixellist(evt, n):
'''Write hot pixel list to file
Parameters
----------
evt : `astropy.table.Table`
Events table
n : int
Lower bound number of times a pixel has to appear in order to be removed.
If not set, this defaults to 3, and then 1/3 of the number of frames
if there are more than 9.
Returns
-------
t : `astropy.table.Table`
Table of hot pixel coordinates
'''
xy = np.vstack([evt['X'], evt['Y']])
# This is the line where everything happens:
unique, unique_counts = np.unique(xy, return_counts=True, axis=1)
hotpix = unique[unique_counts > n]
t = Table(hotpix.T, names=['X', 'Y'])
t.meta = evt.meta.copy()
t.meta['EXTNAME'] = 'HOTPIX'
return t
[docs]def dist2nextevent(evt):
d = np.zeros_like(evt['X'])
xy = np.vstack([evt['X'].data, evt['Y'].data]).T
for f in set(evt['FRAME']):
ind = evt['FRAME'] == f
sq = distance.squareform(distance.pdist(xy[ind, :], 'chebyshev'))
np.fill_diagonal(sq, np.inf)
d[ind] = np.min(sq, axis=1)
return d