from __future__ import absolute_import, division, print_function
from astropy import units as u
from astropy.coordinates import SkyCoord
import numpy as np
from ebltable.tau_from_model import OptDepth
from .base import environs as env
from .base.transfer import calc_conv_prob, calc_lin_pol
from .utils.interp2d import Interp2D
import logging
import sys
[docs]
class Source(object):
"""
Class that stores information about the source:
redshift
Sky coordinates: ra, dec, l,b
As well as jet parameters: bulk Lorentz factor, observation
and jet opening angle
"""
[docs]
def __init__(self, z, **kwargs):
"""
Initialize the source class
Parameters
----------
z: float
source redshift
ra: float or string
Right Ascension compatible with :py:class:`~astropy.coordinates.SkyCoord` object
dec: float or string
Declination compatible with :py:class:`~astropy.coordinates.SkyCoord` object
l: float
Galactic longitude or None. If given, overwrites ra and dec
b: float
Galactic latitude or None. If given, overwrites ra and dec
theta_obs: float
Angle between l.o.s. and jet axis in degrees (default: 3.)
bLorentz: float
bulk lorentz factor of gamma-ray emitting plasma (default: 10.)
theta_jet: float
Jet opening angle in degrees. Default: 1/bLorentz
"""
kwargs.setdefault('ra', None)
kwargs.setdefault('dec', None)
kwargs.setdefault('l', None)
kwargs.setdefault('b', None)
kwargs.setdefault('theta_obs', 3.)
kwargs.setdefault('bLorentz', 10.)
kwargs.setdefault('theta_jet', np.rad2deg(1./kwargs['bLorentz']))
if kwargs['ra'] is None and kwargs['dec'] is None and \
kwargs['l'] is None and kwargs['b'] is None:
raise ValueError("Coordinates cannot all be of None Type!")
# calculate doppler factor
self._bLorentz = kwargs['bLorentz']
self._theta_obs = kwargs['theta_obs']
self._theta_jet = kwargs['theta_jet']
self._z = z
self._doppler = None
self._c = None
self._c_gal = None
self._ra = None
self._dec = None
self._l = None
self._b = None
self.calc_doppler()
self.set_ra_dec_l_b(ra=kwargs['ra'], dec=kwargs['dec'],
l=kwargs['l'], b= kwargs['b'])
return
@property
def z(self):
return self._z
@property
def ra(self):
return self._ra
@property
def dec(self):
return self._dec
@property
def l(self):
return self._l
@property
def b(self):
return self._b
@property
def c(self):
return self._c
@property
def c_gal(self):
return self._c_gal
@property
def theta_obs(self):
return self._theta_obs
@property
def theta_jet(self):
return self._theta_jet
@property
def bLorentz(self):
return self._bLorentz
@theta_obs.setter
def theta_obs(self, theta_obs):
if type(theta_obs) == u.Quantity:
self._theta_obs = theta_obs.to('degree').value
else:
self._theta_obs = theta_obs
self.calc_doppler()
return
@z.setter
def z(self, z):
self._z = z
@theta_jet.setter
def theta_jet(self, theta_jet):
if type(theta_jet) == u.Quantity:
self._theta_jet = theta_jet.to('degree').value
else:
self._theta_jet = theta_jet
return
@bLorentz.setter
def bLorentz(self, bLorentz):
self._bLorentz = bLorentz
self.calc_doppler()
return
@ra.setter
def ra(self, ra):
if type(ra) == u.Quantity:
self._ra = ra.to('degree').value
else:
self._ra = ra
self.set_ra_dec_l_b(self._ra,self._dec)
return
@dec.setter
def dec(self, dec):
if type(dec) == u.Quantity:
self._dec = dec.to('degree').value
else:
self._dec = dec
self.set_ra_dec_l_b(self._ra,self._dec)
return
@l.setter
def l(self, l):
if type(l) == u.Quantity:
self._l = l.to('degree').value
else:
self._l = l
self.set_ra_dec_l_b(self._ra,self._dec, l = self._l, b = self._b)
return
@b.setter
def b(self, b):
if type(b) == u.Quantity:
self._b = b.to('degree').value
else:
self._b = b
self.set_ra_dec_l_b(self._ra, self._dec, l=self._l, b=self._b)
return
[docs]
def calc_doppler(self):
"""Calculate the Doppler factor of the plasma"""
self._doppler = 1./(self._bLorentz * (1. - np.sqrt(1. - 1./self._bLorentz**2.) *
np.cos(np.radians(self.theta_obs))))
return
[docs]
def set_ra_dec_l_b(self,ra,dec,l = None, b = None):
"""Set l and b or ra and dec"""
if l is None and b is None:
if type(ra) == str:
self._c = SkyCoord(ra, dec, frame='icrs')
elif type(ra) == float:
self._c = SkyCoord(ra, dec, frame='icrs', unit='deg')
self._c_gal = self._c.galactic
self._l = self._c_gal.l.value
self._b = self._c_gal.b.value
else:
self._l = l
self._b = b
self._c_gal = SkyCoord(l, b, frame='galactic', unit='deg')
self._c = self._c_gal.icrs
self._ra = self._c.ra.value
self._dec = self._c.dec.value
return
[docs]
class ALP(object):
"""
Class to store the ALP parameters
"""
[docs]
def __init__(self, m, g):
"""
Initialize the ALP class
Parameters
----------
m: float
ALP mass in neV.
g: float
photon-ALP coupling in 10^-11 GeV^-1.
"""
self._m = m
self._g = g
return
@property
def m(self):
return self._m
@property
def g(self):
return self._g
@m.setter
def m(self, m):
if type(m) == u.Quantity:
self._m = m.to('neV').value
else:
self._m = m
return
@g.setter
def g(self, g):
if type(g) == u.Quantity:
self._g = g.to('10**-11GeV-1').value
else:
self._g = g
return
class NamedClassList(list):
"""
A list of classes indexable with an integer or class's name.
As it is a subclasses the built-in class ``list``, it provides all the methods of the
standard ``list`` class.
Adapted from
https://github.com/s3rvac/blog/tree/master/en-2014-10-11-indexing-python-lists-with-integer-or-object-name
"""
def __getitem__(self, key):
return self._delegate_to_list('__getitem__', key)
def __setitem__(self, key, value):
return self._delegate_to_list('__setitem__', key, value)
def __delitem__(self, key):
return self._delegate_to_list('__delitem__', key)
def keys(self):
return [type(item).__name__.strip("mix") for item in self]
def values(self):
return self
def items(self):
return [(self.keys()[i], self[i]) for i in range(len(self))]
def _delegate_to_list(self, method, key, *args):
if isinstance(key, str):
key = self._index_of(key)
if sys.version_info[0] < 3:
return getattr(super(NamedClassList, self), method)(key, *args)
else:
return getattr(super(), method)(key, *args)
def _index_of(self, name):
for index, item in enumerate(self):
if type(item).__name__ == name or type(item).__name__.strip("Mix") == name:
return index
raise IndexError('no object named {!r}'.format(name))
[docs]
class ModuleList(object):
"""
Class that collects all environments for photon-ALP mixing
and manages common parameters such as photon-ALP coupling,
the ALP mass, the source, and the energies at which
the photon-ALP oscillation is computed.
"""
[docs]
def __init__(self,
alp,
source,
pin=None,
EGeV=None,
seed=None,
log_level='info'):
"""
Initialize the class, energy range, and polarization matrices
Parameters
----------
alp: :py:class:`~gammaALPs.ALP`
:py:class:`~gammaALPs.ALP` object with ALP parameters
source: :py:class:`~gammaALPs.Source`
:py:class:`~gammaALPs.Source` object with source parameters
pin: array-like
3x3 dim matrix with initial polarization.
Default: un-polarized photon beam
EGeV: array-like
n-dim numpy array with gamma-ray energies in GeV
Default: log-spaced array between 1 GeV and 10 TeV
seed: int, optional
Seed for RandomState for numpy random numbers.
Must be convertible to 32 bit unsigned integers.
log_level: str
level for logging, either 'debug', 'info', 'warning' or 'error'
"""
self._alp = alp
self._source = source
if EGeV is None:
self._EGeV = np.logspace(0., 4., 100)
else:
self._EGeV = EGeV
# initialize final polarization states
self._px = np.diag((1., 0., 0.))
self._py = np.diag((0., 1., 0.))
self._pa = np.diag((0., 0., 1.))
if pin is None:
self._pin = np.diag((1., 1., 0.)) * 0.5
else:
self._pin = pin
self._modules = NamedClassList()
self._seed = seed
self._px_src = None
self._py_src = None
self._pa_src = None
self._px_final = None
self._py_final = None
self._pa_final = None
self._lin_pol = None
self._circ_pol = None
self.__nsim_max = None
self._atten = None
self._eblnorm = None
self.__init_logger(log_level)
return
@property
def alp(self):
return self._alp
@property
def source(self):
return self._source
@property
def EGeV(self):
return self._EGeV
@property
def px(self):
return self._px
@property
def py(self):
return self._py
@property
def pa(self):
return self._pa
@property
def pin(self):
return self._pin
@property
def seed(self):
return self._seed
@property
def modules(self):
return self._modules
@pin.setter
def pin(self,pin):
self._pin = pin
return
@EGeV.setter
def EGeV(self, EGeV):
if type(EGeV) == u.Quantity:
self._EGeV = EGeV.to('GeV').value
else:
self._EGeV = EGeV
# update energies for all modules
for m in self.modules:
if type(m) == OptDepth:
self._atten = np.exp(-self._eblnorm * m.opt_depth(self.source.z,EGeV / 1e3))
elif type(m) == env.MixGMF:
m.EGeV = EGeV
elif type(m) == env.MixIGMFCell:
m.EGeV = EGeV
m._ee *= (1. + m._z_mean)
elif type(m) == env.MixJet:
m.EGeV = EGeV * (1. + self.source.z)
m._ee /= m._source._doppler
elif type(m) == env.MixJetHelicalTangled:
m.EGeV = EGeV * (1. + self.source.z)
m._ee /= m._gammas
else:
m.EGeV = EGeV * (1. + self.source.z)
return
@seed.setter
def seed(self, seed):
self._seed = seed
def __init_logger(self, log_level):
logging.basicConfig(format='\033[0;36m%(filename)10s:\033'
'[0;35m%(lineno)4s\033[0;0m --- %(levelname)7s: %(message)s')
self._logger = logging.getLogger('gamma_alps')
self._logger.setLevel(log_level.upper())
logging.addLevelName(logging.DEBUG, "\033[1;32m%s\033[1;0m" % logging.getLevelName(logging.DEBUG))
logging.addLevelName(logging.INFO, "\033[1;36m%s\033[1;0m" % logging.getLevelName(logging.INFO))
logging.addLevelName(logging.WARNING, "\033[1;31m%s\033[1;0m" % logging.getLevelName(logging.WARNING))
logging.addLevelName(logging.ERROR, "\033[1;41m%s\033[1;0m" % logging.getLevelName(logging.ERROR))
[docs]
def add_propagation(self, environ, order, **kwargs):
"""
Add a propagation environment to the module list
Parameters
----------
environ: str
identifier for environment, see notes for possibilities
order: int
the order of the environment along the line of sight,
starting at zero, where zero is closest to the source and highest value is closest to the
observer
Notes
-----
kwargs are passed to the specific environment.
Available environments are the classes given in :py:class:`~gammaALPs.base.environs`,
where all the specific options are listed.
The identifiers for the environments are:
- IGMF: initializes :py:class:`~gammaALPs.base.environs.MixIGMFCell` for mixing
in intergalactic magnetic field (IGMF) which is assumed to be of a cell-like structure
- ICMCell: initializes :py:class:`~gammaALPs.base.environs.MixICMCell` for mixing
in intra-cluster medium which is assumed to be of a cell-like structure
- ICMGaussTurb: initializes :py:class:`~gammaALPs.base.environs.MixICMGaussTurb` for mixing
in intra-cluster medium which is assumed to follow a Gaussian turbulence spectrum
- ICMStructured: initializes :py:class:`~gammaALPs.base.environs.MixICMStructured`for mixing
in intra-cluster medium which is assumed to be structured
with radial, poloidal and toroidal components, see 1008.5353 and 1908.03084
- Jet: initializes :py:class:`~gammaALPs.base.environs.MixJet` for mixing
in the AGN jet, where the field is assumed to be coherent
- JetHelicalTangled: initializes :py:class:`~gammaALPs.base.environs.MixJetHelicalTangled` for mixing
in the AGN jet with two field components (tangled and helical)
- GMF: initializes :py:class:`~gammaALPs.base.environs.MixGMF` for mixing
in the Galactic magnetic field (GMF) of the Milky Way
- File: initializes :py:class:`~gammaALPs.base.environs.MixFromFile` for mixing
in a magnetic field given by a data file
- Array: initializes :py:class:`~gammaALPs.base.environs.MixFromArray` for mixing
in a magnetic field given by a numpy arrays for B,psi,nel,r, and dL
- EBL: initializes :py:class:`~ebltable.tau_from_model.OptDepth` for EBL attenuation,
i.e. no photon-ALP mixing in the intergalactic medium
"""
kwargs.setdefault('eblmodel', 'dominguez')
kwargs.setdefault('eblnorm', 1.)
kwargs.setdefault('seed', self._seed)
self._eblnorm = kwargs['eblnorm']
if environ == 'EBL':
self._modules.insert(order, OptDepth.readmodel(model=kwargs['eblmodel']))
self._atten = np.exp(-self._eblnorm *\
self._modules[order].opt_depth(self.source.z, self.EGeV / 1e3))
elif environ == 'IGMF':
self._modules.insert(order, env.MixIGMFCell(self.alp, self.source,
EGeV=self.EGeV,
**kwargs))
elif environ == 'ICMCell':
self._modules.insert(order, env.MixICMCell(self.alp,
EGeV=self.EGeV * (1. + self.source.z),
**kwargs))
elif environ == 'ICMGaussTurb':
self._modules.insert(order, env.MixICMGaussTurb(self.alp,
EGeV=self.EGeV * (1. + self.source.z),
**kwargs))
elif environ == 'ICMStructured':
self._modules.insert(order, env.MixICMStructured(self.alp,
EGeV=self.EGeV * (1. + self.source.z),
**kwargs))
elif environ == 'Jet':
self._modules.insert(order, env.MixJet(self.alp, self.source,
EGeV=self.EGeV * (1. + self.source.z),
**kwargs))
elif environ == 'JetHelicalTangled':
self._modules.insert(order, env.MixJetHelicalTangled(self.alp, self.source,
EGeV=self.EGeV * (1. + self.source.z),
**kwargs))
elif environ == 'GMF':
self._modules.insert(order, env.MixGMF(self.alp, self.source,
EGeV=self.EGeV,
**kwargs))
elif environ == 'File':
self._modules.insert(order, env.MixFromFile(self.alp,
EGeV=self.EGeV,
**kwargs))
elif environ == 'Array':
self._modules.insert(order, env.MixFromArray(self.alp,
EGeV=self.EGeV,
**kwargs))
else:
raise ValueError("Unkwon Environment chosen")
return
[docs]
def add_disp_abs(self, EGeV, r_kpc, disp, module_id, type_matrix='dispersion', **kwargs):
"""
Add dispersion, absorption, or extra momentum difference term to a propagation module using
interpolation of of a 2d dispersion / absorption matrix
Parameters
----------
EGeV: array-like
n-dim array with gamma-ray energies in GeV at which dispersion/absorption/momentum difference matrix
is calculated
r_kpc: array-like
m-dim array with distnaces in kpc at which dispersion/absorption/momentum difference matrix
is calculated
disp: array-like
n x m-dim array with dispersion (unitless) / absorption (in kpc^-1) / momentum difference (in kpc^-1)
module_id: int
id of module to which dispersion / absorption /momentum difference is added
type_matrix: str
either 'dispersion', 'absorption', or 'Delta', specifies type of matrix disp
"""
if module_id >= len(self.modules):
raise ValueError("requested module id is out of range")
if isinstance(self.modules[module_id], OptDepth):
raise RuntimeError("dispersion / absorption cannot be applied to EBL only module")
# interpolate matrix
intp = Interp2D(np.log10(EGeV), r_kpc, disp, **kwargs)
# cast to values used for propagation
new_disp = intp(np.log10(self.modules[module_id].EGeV), self.modules[module_id]._r)
if type_matrix == 'dispersion':
self.modules[module_id].chi = new_disp
elif type_matrix == 'absorption':
self.modules[module_id].Gamma = new_disp
elif type_matrix == 'Delta':
self.modules[module_id].Delta = new_disp
else:
raise ValueError("type_matrix must either be 'dispersion', 'absorption' or 'Delta'")
def _check_modules_random_fields(self):
"""
Check how many modules have n > 1 B field realizations.
At the moment, only one component is allowed to have multiple realizations.
"""
self._all_nsim = []
for im, m in enumerate(self.modules):
if not type(m) == OptDepth:
self._all_nsim.append(m.nsim)
else:
# only one realization of EBL
self._all_nsim.append(1)
self._logger.debug(self._all_nsim)
self._logger.debug(np.array(self._all_nsim) > 1)
if np.sum(np.array(self._all_nsim) > 1) > 1:
self._logger.error("More than one module has multiple B-field realizations, not supported")
self._logger.error("Number of realizations for the ALP modules: {0}".format(self._all_nsim))
raise RuntimeError
return
def _multiply_env(self,istart,istop,n):
"""
Helper function to multiply the transfer matrices
of the different traversed environments.
The equations of motions give :math:`P_i = \mathrm{Tr}(\rho_i \mathcal{T} \rho_0 \mathcal{T}^\dagger`
and the total transfer matrix is given by :math:`\mathcal{T} = \mathcal{T}_n \ldots \mathcal{T}_0`
such that the transfer matrix corresponding to environment closest to source (subscript 0)
is also closest to initical polarization matrix :math:`\rho_0`.
"""
for it, Tenv in enumerate(self._Tenv[istart:istop]):
if self.__nsim_max == (self._all_nsim[istart:istop])[it]:
nn = n
else:
nn = 0
if not it:
Tsrc = Tenv[nn]
else:
Tsrc = np.matmul(Tenv[nn], Tsrc)
return Tsrc
[docs]
def run(self, multiprocess=1):
"""
Run the photon-ALP conversion probability calculation for all modules
Parameters
----------
multiprocess: int
number of cores to perform calculation; only used when random B field is requested
by one of the modules and number of realizations is > 1
Returns
-------
Px, Py, Pa: tuple with :py:class:`numpy.ndarray`
N x M dim. arrays with final photon and ALP states (Px, Py, Pa).
Each P_i is of dimension (number of realizations x number of energy bins).
Flat arrays are returned when number of realizations = 1.
"""
# update energies of all modules
# also accounts for changed redshift
self.EGeV = self._EGeV
# Calculate the transfer matrices for each environment
self._Tenv = []
self._check_modules_random_fields()
for im, m in enumerate(self.modules):
if not type(m) == OptDepth:
self._logger.info('Running Module {0:n}: {1}'.format(im, type(m)))
self._logger.debug('Photon-ALP mixing in {0}: {1:.3e}'.format(type(m), m.alp.g))
self._logger.debug('ALP mass in {0}: {1:.3e}'.format(type(m), m.alp.m))
self._logger.debug('Energy range in {0}: {1:.3e}-{2:.3e}, n = {2:n}'.format(
type(m), m.EGeV[0], m.EGeV[-1], m.EGeV.size))
self._logger.debug('Number of B-field real. in {0}: {1:n}'.format(type(m), m.nsim))
if multiprocess > 1:
T = m.calc_transfer_multi(nprocess=multiprocess)
else:
T = []
for i in range(m.nsim):
T.append(m.calc_transfer(nsim = i, nprocess=1))
self._Tenv.append(np.array(T))
# check if we have simple EBL absorption present
# in which case we calculate separately the mixing in the source,
# the EBL absorption, and the mixing near the observer.
# Otherwise only use transfer matrices to calculate the total
# oscillation probability
self.__nsim_max = np.max(self._all_nsim)
self._px_final, self._py_final, self._pa_final = [],[],[]
self._lin_pol, self._circ_pol = [],[]
if OptDepth in [type(t) for t in self.modules]:
# get the index of the EBL module
idx = [type(t) for t in self.modules].index(OptDepth)
# convert self._atten to transfer matrices
t_ebl = np.zeros((self.EGeV.size, 3, 3))
t_ebl[:, 0, 0] = np.sqrt(self._atten)
t_ebl[:, 1, 1] = np.sqrt(self._atten)
t_ebl[:, 2, 2] = 1
t_list = [t_ebl]
t = np.array(t_list)
#insert in list of self._Tenv
self._Tenv.insert(idx, t)
# mutlitply all environments for all B-field realizations
for n in range(self.__nsim_max):
Tobs = self._multiply_env(0,len(self._Tenv),n)
self._px_final.append(calc_conv_prob(self.pin, self.px, Tobs))
self._py_final.append(calc_conv_prob(self.pin, self.py, Tobs))
self._pa_final.append(calc_conv_prob(self.pin, self.pa, Tobs))
l, c = calc_lin_pol(self.pin, Tobs)
self._lin_pol.append(l)
self._circ_pol.append(c)
self._px_final = np.array(self._px_final)
self._py_final = np.array(self._py_final)
self._pa_final = np.array(self._pa_final)
self._lin_pol = np.array(self._lin_pol)
self._circ_pol = np.array(self._circ_pol)
return self._px_final, self._py_final, self._pa_final