import math
import numpy as np
import os
PI = 3.14159265358979323846 # math.pi
encoding = 'ascii'
npDouble = np.float64
def sph2cart(r, theta, phi):
return [
r * math.sin(theta) * math.cos(phi),
r * math.sin(theta) * math.sin(phi),
r * math.cos(theta)
]
def cart2sph(x, y, z):
# note that a faster vectorized version of this can be found at:
# https://stackoverflow.com/questions/4116658/faster-numpy-cartesian-to-spherical-coordinate-conversion/4116803#4116803
XsqPlusYsq = x ** 2 + y ** 2
r = math.sqrt(XsqPlusYsq + z ** 2) # r
elev = math.atan2(z, math.sqrt(XsqPlusYsq)) # theta
az = math.atan2(y, x) # phi
return r, elev, az
[docs]class Grid:
'''
This class is described in pages 12-15 of the paper
The class Grid is initialized with `q_max` and `grid_size`.
It is used to create/describe a grid of `q`, `theta`, `phi` angle values.
These values can be described using two sets of indexing:
1. The overall index `m`
2. The individual angle indices `i`, `j`, `k`
'''
def __init__(self, q_max, grid_size):
if grid_size % 2:
raise ValueError("Grid size must be even")
self.q_max = q_max
self.grid_size = grid_size
self.extra_shells = 3
@property
def step_size(self):
'''
The difference between q's in the grid.
:return: double q_max/N
'''
return npDouble(self.q_max / (self.N))
@property
def N(self):
return int(self.grid_size / 2)
@property
def actual_size(self):
return self.N + self.extra_shells
def _G_i_q(self, i):
return 6 * i + 12 * i ** 2 + 6 * i ** 3
[docs] def create_grid(self):
'''
a generator that returns q, theta, phi angles in phi-major order
'''
for i in range(0, self.N + 4):
if i == 0:
yield 0, 0, 0
continue
J_i = (3 * i) + 1
K_ij = 6 * i
for j in range(0, J_i):
for k in range(0, K_ij):
yield self.angles_from_indices(i, j, k)
[docs] def angles_from_indices(self, i, j, k):
'''
receives angle indices i,j,k and returns their q, theta, and phi angle values.
:param i: angle indice i
:param j: angle indice j
:param k: angle indice k
:return: q, theta, and phi angle values
'''
if i == 0:
return 0, 0, 0
q = npDouble(i * self.step_size)
theta_ij = npDouble((j * PI) / (3 * i))
phi_ijk = npDouble((k * PI) / (3 * i))
return q, theta_ij, phi_ijk
[docs] def indices_from_index(self, m):
'''
:param m: receives an overall index m.
:return: individual q, theta, and phi indices: i, j, k
'''
if m == 0:
return 0, 0, 0
i = math.floor((m / 6) ** (1. / 3.))
if m > self._G_i_q(i):
i += 1
R_i = m - self._G_i_q(i - 1) - 1
j = math.floor(R_i / (6 * i))
k = R_i - 6 * i * j
return i, j, k
[docs] def angles_from_index(self, m):
'''
:param m: receives an overall index m
:return: returns the matching q, theta, and phi angle values
'''
i, j, k = self.indices_from_index(m)
q, theta, phi = self.angles_from_indices(i, j, k)
return q, theta, phi
[docs] def index_from_indices(self, i, j, k):
'''
receives angle indices i,j,k and returns the overall index m that matches them.
:param i: angle indices i
:param j: angle indices j
:param k: angle indices k
:return: overall index m that matches the given i,j,k
'''
if i == 0:
return 0
return 6 * (i - 1) + 12 * (i - 1) ** 2 + 6 * (i - 1) ** 3 + 6 * i * j + k + 1
[docs] def indices_from_angles(self, q, theta, phi):
'''
receives angles q, theta, phi, ands returns the matching indices i,j,k.
:param q: q angle
:param theta: theta angle
:param phi: phi angle
:return: return indices i, j, k of given q, thta and phi
'''
eps=0.000001
i=math.floor(q/self.step_size + eps)
phiPoints = 6.0 * i
thePoints = 3.0 * i
j = math.floor((theta / PI) * thePoints + eps)
k = math.floor((phi / (PI*2)) * phiPoints + eps)
return i, j, k
[docs] def index_from_angles(self, q, theta, phi):
'''
receives angles q, theta, phi and returns the matching overall index m.
:param q: q angle
:param theta: theta angle
:param phi: phi angle
:return: matching overall index m
'''
i,j,k = self.indices_from_angles(q, theta, phi)
return self.index_from_indices(i,j,k)
[docs]class Amplitude(Grid):
'''
The class `Amplitude`, by contrast, can be used to build an amplitude and then save that amplitude as an amplitude file,
which can then be opened in D+ (or sent in a class AMP) but it itself cannot be added directly to the Domain parameter tree.
'''
def __init__(self, q_max, grid_size):
super().__init__(q_max, grid_size)
self._values = np.array([], dtype=np.float64)
self.external_headers = None
self.__description = ""
self.filename = ""
[docs] def create_grid(self, func):
'''
Amplitude overrides grid's `create_grid` method. Amplitude's `create_grid` requires a function as an argument.
This function must receive q, theta, phi and return two values, representing the real and imaginary parts of a complex number.
The values returned can be a tuple, an array, or a python complex number (A+Bj).
These values are then saved to the Ampltiude's `values` property, and can also be accessed through the `complex_amplitudes_array`
property as a numpy array of numpy complex types.
:param func: a function that receives q, theta, phi and return two values, representing the real and imaginary parts of a complex number.
'''
values=[]
for q, theta, phi in super().create_grid():
try:
res= func(q, theta, phi)
except Exception as e:
raise ValueError("You must provide a function which receives q, theta, phi")
try:
values.append(res[0])
values.append(res[1])
except:
try:
values.append(res.real)
values.append(res.imag)
except:
raise ValueError("You must provide a function which returns a complex number in two parts")
self._values = np.float64(values)
@property
def values(self):
'''
array that contains the grid intensity values as 2 values - real and imaginary
:return: values array
'''
if len(self._values):
return self._values
else:
raise ValueError("Amplitude values empty-- has not been initialized yet")
@property
def complex_amplitude_array(self):
'''
returns the values array as complex array.
:return: complex array
'''
complex_arr = np.zeros((int(self._values.__len__() / 2), 1), dtype=np.complex)
for index in range(0, complex_arr.__len__()):
complex_arr[index] = self.values[2 * index] + 1j * self.values[2 * index + 1]
return complex_arr
@property
def default_header(self):
'''
Return the default file headers values for amplidute class.
:return: file headers of amplitude
'''
descriptor = "#@".encode(encoding)
header = "# created from a Python function\n"
header += "# " + "\\"*80 +"\n"
header += "# User description:" + self.description + "\n"
header += "# " + "\\" * 80 + "\n"
header += "# Grid was used.\n"
header += "# N^3; N = " + str(self.N) + "\n"
header += "# qMax= " + str(self.q_max) + "\n"
header += "# Grid step size = " + str(self.step_size) + "\n"
header += "\n"
headlen =np.uint32( 2*1 + 4 + 1 + len(header) *1 + 1) # descriptor + unsigned int + \n + header length + \n
header_list = [descriptor, headlen, "\n".encode(encoding), header.encode(encoding) ,"\n".encode(encoding)]
step_size = np.array([self.step_size], dtype=np.float64)
added_list=[
(str(13) + "\n").encode(encoding), # version
(str(16) + "\n").encode(encoding), # size of double
(str(int(self.actual_size)) + "\n").encode(encoding), # "tmp grid size"
(str(int(self.extra_shells)) + "\n").encode(encoding),
step_size.tobytes() # note that this does not get new line
]
return header_list + added_list
@property
def headers(self):
'''
Returns the headers - default if amplitude was created nt python API or external if amplitude was created from a file.
:return:
'''
if self.external_headers:
return self.external_headers
else:
return self.default_header
@property
def description(self):
if self.__description:
return self.__description
return "None"
@description.setter
def description(self, val):
edit = val.replace("\n", "\n# ")
self.__description ="\n# "+ edit
[docs] def save(self, filename):
'''
The function will save the information of the Amplitude class to an Amplitude file which can then be
passed along to D+ to calculate its signal or perform fitting.
:param filename: new amplitude file name
'''
with open(filename, 'wb') as f:
for header in self.headers:
f.write(header)
amps = np.float64(self._values)
amps.tofile(f)
self.filename= os.path.abspath(filename)
[docs] @staticmethod
def load(filename):
'''
A static method, `load`, which receives a filename of an Amplitude file, and returns an Amplitude instance
with the values from that file already loaded.
:param filename: filename of an Amplitude file
:return: instance of Amplitude class.
'''
def _peek(File, length):
pos = File.tell()
data = File.read(length)
File.seek(pos)
return data
has_headers = False
headers = []
with open(filename, "rb+") as f:
if _peek(f, 1).decode('ascii') == '#':
desc = f.read(2)
tempdesc = desc.decode('ascii')
if (tempdesc[1] == '@'):
has_headers = True
else:
tmphead = f.readline()
headers.append(desc + tmphead)
if has_headers:
offset = np.fromfile(f, dtype=np.uint32, count=1, sep="")
del_aka_newline = f.readline() # b"\n"
while _peek(f, 1).decode('ascii') == '#':
headers.append(f.readline())
if offset > 0:
f.seek(offset[0], 0)
version_r = f.readline().rstrip()
version = int(version_r.decode('ascii'))
size_element_r = f.readline().rstrip()
size_element = int(size_element_r.decode('ascii'))
if size_element != int(2 * np.dtype(np.float64).itemsize):
raise ValueError("error in file: " + filename + "dtype is not float64\n")
tmpGridsize_r = f.readline().rstrip()
tmpGridsize = int(tmpGridsize_r.decode('ascii')) # I
tmpExtras_r = f.readline().rstrip()
extra_shells = int(tmpExtras_r.decode('ascii')) # extra shells
grid_size = (tmpGridsize - extra_shells) * 2 # grid_size
actualGridSize = grid_size / 2 + extra_shells # I
i = actualGridSize
totalsz = int((6 * i * (i + 1) * (3 + 3 + 2 * 3 * i)) / 6)
totalsz = totalsz + 1
totalsz = totalsz * 2
step_size = np.fromfile(f, dtype=np.float64, count=1, sep="")
q_max = np.float64 (step_size * (grid_size / 2.0))
amp_values = np.fromfile(f, dtype=np.float64, count=totalsz, sep="")
header_List = []
if has_headers:
pos = 0
header_List.append(desc)
pos = pos + len(desc)
header_List.append(offset[0].tobytes())
pos = pos + len(offset[0].tobytes())
header_List.append(del_aka_newline)
pos = pos + len(del_aka_newline)
for i in headers:
header_List.append(i)
pos = pos + len(i)
header_List.append(del_aka_newline)
header_List.append(del_aka_newline)
pos = pos + 2 * len(del_aka_newline)
pos = np.int32(pos)
if pos != offset[0]:
header_List[1] = pos.tobytes()
header_List.append(version_r + b"\n")
header_List.append(size_element_r + b"\n")
header_List.append(tmpGridsize_r + b"\n")
header_List.append(tmpExtras_r + b"\n")
header_List.append(step_size.tobytes())
amp = Amplitude(q_max, grid_size)
amp.extra_shells = extra_shells
amp._values = amp_values
amp.external_headers = header_List
return amp