Source code for binoculars.backends.id03
import sys
import os
import itertools
import glob
import numpy
import time
from PyMca5.PyMca import EdfFile, SixCircle, specfile, specfilewrapper
from .. import backend, errors, util
[docs]class pixels(backend.ProjectionBase):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi):
y, x = numpy.mgrid[slice(None, gamma.shape[0]), slice(None, delta.shape[0])]
return (y, x)
[docs]class HKLProjection(backend.ProjectionBase):
# arrays: gamma, delta
# scalars: theta, mu, chi, phi
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi):
R = SixCircle.getHKL(
wavelength,
UB,
gamma=gamma,
delta=delta,
theta=theta,
mu=mu,
chi=chi,
phi=phi,
)
shape = gamma.size, delta.size
H = R[0, :].reshape(shape)
K = R[1, :].reshape(shape)
L = R[2, :].reshape(shape)
return (H, K, L)
[docs]class HKProjection(HKLProjection):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi):
H, K, L = super(HKProjection, self).project(
wavelength, UB, gamma, delta, theta, mu, chi, phi
)
return (H, K)
[docs]class specularangles(backend.ProjectionBase):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi):
delta, gamma = numpy.meshgrid(delta, gamma)
mu *= numpy.pi / 180
delta *= numpy.pi / 180
gamma *= numpy.pi / 180
chi *= numpy.pi / 180
phi *= numpy.pi / 180
theta *= numpy.pi / 180
def mat(u, th):
ux, uy, uz = u[0], u[1], u[2]
sint = numpy.sin(th)
cost = numpy.cos(th)
mcost = 1 - numpy.cos(th)
return numpy.matrix(
[
[
cost + ux ** 2 * mcost,
ux * uy * mcost - uz * sint,
ux * uz * mcost + uy * sint,
],
[
uy * ux * mcost + uz * sint,
cost + uy ** 2 * mcost,
uy * uz - ux * sint,
],
[
uz * ux * mcost - uy * sint,
uz * uy * mcost + ux * sint,
cost + uz ** 2 * mcost,
],
]
)
def rot(vx, vy, vz, u, th):
R = mat(u, th)
return (
R[0, 0] * vx + R[0, 1] * vy + R[0, 2] * vz,
R[1, 0] * vx + R[1, 1] * vy + R[1, 2] * vz,
R[2, 0] * vx + R[2, 1] * vy + R[2, 2] * vz,
)
# what are the angles of kin and kout in the sample frame?
# angles in the hexapod frame
koutx, kouty, koutz = (
numpy.sin(-numpy.pi / 2 + gamma) * numpy.cos(delta),
numpy.sin(-numpy.pi / 2 + gamma) * numpy.sin(delta),
numpy.cos(-numpy.pi / 2 + gamma),
)
kinx, kiny, kinz = numpy.sin(numpy.pi / 2 - mu), 0, numpy.cos(numpy.pi / 2 - mu)
# now we rotate the frame around hexapod rotation th
xaxis = numpy.array(rot(1, 0, 0, numpy.array([0, 0, 1]), theta))
yaxis = numpy.array(rot(0, 1, 0, numpy.array([0, 0, 1]), theta))
# first we rotate the sample around the xaxis
koutx, kouty, koutz = rot(koutx, kouty, koutz, xaxis, chi)
kinx, kiny, kinz = rot(kinx, kiny, kinz, xaxis, chi)
yaxis = numpy.array(
rot(yaxis[0], yaxis[1], yaxis[2], xaxis, chi)
) # we also have to rotate the yaxis
# then we rotate the sample around the yaxis
koutx, kouty, koutz = rot(koutx, kouty, koutz, yaxis, phi)
kinx, kiny, kinz = rot(kinx, kiny, kinz, yaxis, phi)
# to calculate the equivalent gamma, delta and mu in the sample frame we rotate the frame around the sample z which is 0,0,1
back = numpy.arctan2(kiny, kinx)
koutx, kouty, koutz = rot(koutx, kouty, koutz, numpy.array([0, 0, 1]), -back)
kinx, kiny, kinz = rot(kinx, kiny, kinz, numpy.array([0, 0, 1]), -back)
mu = numpy.arctan2(kinz, kinx) * numpy.ones_like(delta)
delta = numpy.pi - numpy.arctan2(kouty, koutx)
gamma = numpy.pi - numpy.arctan2(koutz, koutx)
delta[delta > numpy.pi] -= 2 * numpy.pi
gamma[gamma > numpy.pi] -= 2 * numpy.pi
mu *= 1 / numpy.pi * 180
delta *= 1 / numpy.pi * 180
gamma *= 1 / numpy.pi * 180
return (gamma - mu, gamma + mu, delta)
[docs]class ThetaLProjection(backend.ProjectionBase):
# arrays: gamma, delta
# scalars: theta, mu, chi, phi
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi):
R = SixCircle.getHKL(
wavelength,
UB,
gamma=gamma,
delta=delta,
theta=theta,
mu=mu,
chi=chi,
phi=phi,
)
shape = gamma.size, delta.size
L = R[2, :].reshape(shape)
theta_array = numpy.ones_like(L) * theta
return (theta_array, L)
[docs]class QProjection(backend.ProjectionBase):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi):
shape = gamma.size, delta.size
sixc = SixCircle.SixCircle()
sixc.setLambda(wavelength)
sixc.setUB(UB)
R = sixc.getQSurface(
gamma=gamma, delta=delta, theta=theta, mu=mu, chi=chi, phi=phi
)
qz = R[0, :].reshape(shape)
qy = R[1, :].reshape(shape)
qx = R[2, :].reshape(shape)
return (qz, qy, qx)
[docs]class SphericalQProjection(QProjection):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi):
qz, qy, qx = super(SphericalQProjection, self).project(
wavelength, UB, gamma, delta, theta, mu, chi, phi
)
q = numpy.sqrt(qx ** 2 + qy ** 2 + qz ** 2)
theta = numpy.arccos(qz / q)
phi = numpy.arctan2(qy, qx)
return (q, theta, phi)
[docs]class CylindricalQProjection(QProjection):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi):
qz, qy, qx = super(CylindricalQProjection, self).project(
wavelength, UB, gamma, delta, theta, mu, chi, phi
)
qpar = numpy.sqrt(qx ** 2 + qy ** 2)
phi = numpy.arctan2(qy, qx)
return (qpar, qz, phi)
[docs]class nrQProjection(QProjection):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi):
qx, qy, qz = super(nrQProjection, self).project(
wavelength, UB, gamma, delta, 0, mu, chi, phi
)
return (qx, qy, qz)
[docs]class TwoThetaProjection(SphericalQProjection):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi):
q, theta, phi = super(TwoThetaProjection, self).project(
wavelength, UB, gamma, delta, theta, mu, chi, phi
)
return (
2 * numpy.arcsin(q * wavelength / (4 * numpy.pi)) / numpy.pi * 180,
) # note: we need to return a 1-tuple?
[docs]class Qpp(nrQProjection):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi):
qx, qy, qz = super(Qpp, self).project(
wavelength, UB, gamma, delta, theta, mu, chi, phi
)
qpar = numpy.sqrt(qx ** 2 + qy ** 2)
qpar[numpy.sign(qx) == -1] *= -1
return (qpar, qz)
[docs]class GammaDeltaTheta(
HKLProjection
): # just passing on the coordinates, makes it easy to accurately test the theta correction
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi):
delta, gamma = numpy.meshgrid(delta, gamma)
theta = theta * numpy.ones_like(delta)
return (gamma, delta, theta)
[docs]class GammaDelta(
HKLProjection
): # just passing on the coordinates, makes it easy to accurately test the theta correction
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi):
delta, gamma = numpy.meshgrid(delta, gamma)
return (gamma, delta)
[docs]class GammaDeltaMu(
HKLProjection
): # just passing on the coordinates, makes it easy to accurately test the theta correction
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi):
delta, gamma = numpy.meshgrid(delta, gamma)
mu = mu * numpy.ones_like(delta)
return (gamma, delta, mu)
[docs]class QTransformation(QProjection):
[docs] def project(self, wavelength, UB, gamma, delta, theta, mu, chi, phi):
qx, qy, qz = super(QTransformation, self).project(
wavelength, UB, gamma, delta, theta, mu, chi, phi
)
M = self.config.matrix
q1 = qx * M[0] + qy * M[1] + qz * M[2]
q2 = qx * M[3] + qy * M[4] + qz * M[5]
q3 = qx * M[6] + qy * M[7] + qz * M[8]
return (q1, q2, q3)
[docs] def parse_config(self, config):
super(QTransformation, self).parse_config(config)
self.config.matrix = util.parse_tuple(
config.pop("matrix"), length=9, type=float
)
[docs]class ID03Input(backend.InputBase):
# OFFICIAL API
dbg_scanno = None
dbg_pointno = None
[docs] def generate_jobs(self, command):
scans = util.parse_multi_range(",".join(command).replace(" ", ","))
if not len(scans):
sys.stderr.write("error: no scans selected, nothing to do\n")
for scanno in scans:
util.status("processing scan {0}...".format(scanno))
if self.config.wait_for_data:
for job in self.get_delayed_jobs(scanno):
yield job
else:
scan = self.get_scan(scanno)
if self.config.pr:
pointcount = self.config.pr[1] - self.config.pr[0] + 1
start = self.config.pr[0]
else:
start = 0
try:
pointcount = scan.lines()
except specfile.error: # no points
continue
next(self.get_images(scan, 0, pointcount - 1, dry_run=True)) # dryrun
if pointcount > self.config.target_weight * 1.4:
for s in util.chunk_slicer(pointcount, self.config.target_weight):
yield backend.Job(
scan=scanno,
firstpoint=start + s.start,
lastpoint=start + s.stop - 1,
weight=s.stop - s.start,
)
else:
yield backend.Job(
scan=scanno,
firstpoint=start,
lastpoint=start + pointcount - 1,
weight=pointcount,
)
[docs] def get_delayed_jobs(self, scanno):
scan = self.get_delayed_scan(scanno)
if self.config.pr:
(
firstpoint,
lastpoint,
) = (
self.config.pr
) # firstpoint is the first index to be included, lastpoint the last index to be included.
else:
firstpoint, lastpoint = 0, self.target(scan) - 1
pointcount = lastpoint - firstpoint + 1
if self.is_zap(scan): # wait until the scan is finished.
if not self.wait_for_points(
scanno, self.target(scan), timeout=self.config.timeout
): # wait for last datapoint
for s in util.chunk_slicer(pointcount, self.config.target_weight):
yield backend.Job(
scan=scanno,
firstpoint=firstpoint + s.start,
lastpoint=firstpoint + s.stop - 1,
weight=s.stop - s.start,
)
else:
raise errors.BackendError(
"Image collection timed out. Zapscan was probably aborted"
)
elif lastpoint >= 0: # scanlength is known
for s in util.chunk_slicer(pointcount, self.config.target_weight):
if self.wait_for_points(
scanno, firstpoint + s.stop, timeout=self.config.timeout
):
stop = self.get_scan(scanno).lines()
yield backend.Job(
scan=scanno,
firstpoint=firstpoint + s.start,
lastpoint=stop - 1,
weight=s.stop - s.start,
)
break
else:
yield backend.Job(
scan=scanno,
firstpoint=firstpoint + s.start,
lastpoint=firstpoint + s.stop - 1,
weight=s.stop - s.start,
)
else: # scanlength is unknown
step = int(self.config.target_weight / 1.4)
for start, stop in zip(
itertools.count(0, step), itertools.count(step, step)
):
if self.wait_for_points(scanno, stop, timeout=self.config.timeout):
stop = self.get_scan(scanno).lines()
yield backend.Job(
scan=scanno,
firstpoint=start,
lastpoint=stop - 1,
weight=stop - start,
)
break
else:
yield backend.Job(
scan=scanno,
firstpoint=start,
lastpoint=stop - 1,
weight=stop - start,
)
[docs] def process_job(self, job):
super(ID03Input, self).process_job(job)
scan = self.get_scan(job.scan)
self.metadict = dict()
try:
scanparams = self.get_scan_params(scan) # wavelength, UB
pointparams = self.get_point_params(
scan, job.firstpoint, job.lastpoint
) # 2D array of diffractometer angles + mon + transm
images = self.get_images(scan, job.firstpoint, job.lastpoint) # iterator!
for pp, image in zip(pointparams, images):
yield self.process_image(scanparams, pp, image)
util.statuseol()
except Exception as exc:
exc.args = errors.addmessage(
exc.args,
", An error occured for scan {0} at point {1}. See above for more information".format(
self.dbg_scanno, self.dbg_pointno
),
)
raise
self.metadata.add_section("id03_backend", self.metadict)
[docs] def parse_config(self, config):
super(ID03Input, self).parse_config(config)
self.config.xmask = util.parse_multi_range(
config.pop("xmask", None)
) # Optional, select a subset of the image range in the x direction. all by default
self.config.ymask = util.parse_multi_range(
config.pop("ymask", None)
) # Optional, select a subset of the image range in the y direction. all by default
self.config.specfile = config.pop("specfile") # Location of the specfile
self.config.imagefolder = config.pop(
"imagefolder", None
) # Optional, takes specfile folder tag by default
self.config.pr = config.pop("pr", None) # Optional, all range by default
self.config.background = config.pop(
"background", None
) # Optional, if supplied a space of this image is constructed
self.config.th_offset = float(
config.pop("th_offset", 0)
) # Optional; Only used in zapscans, zero by default.
self.config.wavelength = config.pop(
"wavelength", None
) # Optional; Overrides wavelength from specfile.
if self.config.wavelength is not None:
self.config.wavelength = float(self.config.wavelength)
if self.config.xmask is None:
self.config.xmask = slice(None)
if self.config.ymask is None:
self.config.ymask = slice(None)
self.config.maskmatrix = load_matrix(
config.pop("maskmatrix", None)
) # Optional, if supplied pixels where the mask is 0 will be removed
if self.config.pr:
self.config.pr = util.parse_tuple(self.config.pr, length=2, type=int)
self.config.sdd = float(config.pop("sdd")) # sample to detector distance (mm)
self.config.pixelsize = util.parse_tuple(
config.pop("pixelsize"), length=2, type=float
) # pixel size x/y (mm) (same dimension as sdd)
self.config.wait_for_data = util.parse_bool(
config.pop("wait_for_data", "false")
) # Optional, if true wait until the data appears
self.config.timeout = int(
config.pop("timeout", 180)
) # Optional, how long the script wait until it assumes the scan is not continuing
[docs] def get_destination_options(self, command):
if not command:
return False
command = ",".join(command).replace(" ", ",")
scans = util.parse_multi_range(command)
return dict(
first=min(scans),
last=max(scans),
range=",".join(str(scan) for scan in scans),
)
# CONVENIENCE FUNCTIONS
[docs] def get_scan(self, scannumber):
spec = specfilewrapper.Specfile(self.config.specfile)
return spec.select("{0}.1".format(scannumber))
[docs] def get_delayed_scan(self, scannumber, timeout=None):
delay = util.loop_delayer(5)
start = time.time()
while 1:
try:
return self.get_scan(scannumber) # reload entire specfile
except specfile.error:
if timeout is not None and time.time() - start > timeout:
raise errors.BackendError(
"Scan timed out. There is no data to process"
)
else:
util.status("waiting for scan {0}...".format(scannumber))
next(delay)
[docs] def wait_for_points(self, scannumber, stop, timeout=None):
delay = util.loop_delayer(1)
start = time.time()
while 1:
scan = self.get_scan(scannumber)
try:
if scan.lines() >= stop:
next(delay) # time delay between specfile and edf file
return False
except specfile.error:
pass
finally:
next(delay)
util.status("waiting for scan {0}, point {1}...".format(scannumber, stop))
if (
timeout is not None and time.time() - start > timeout
) or self.is_aborted(scan):
try:
util.statusnl(
"scan {0} aborted at point {1}".format(scannumber, scan.lines())
)
return True
except specfile.error:
raise errors.BackendError(
"Scan was aborted before images were collected. There is no data to process"
)
[docs] def target(self, scan):
if any(
tuple(
scan.command().startswith(pattern)
for pattern in ["hklscan", "a2scan", "ascan", "ringscan"]
)
):
return int(scan.command().split()[-2]) + 1
elif scan.command().startswith("mesh"):
return int(scan.command().split()[-6]) * int(scan.command().split()[-2]) + 1
elif scan.command().startswith("loopscan"):
return int(scan.command().split()[-3])
elif scan.command().startswith("xascan"):
params = numpy.array(scan.command().split()[-6:]).astype(float)
return int(params[2] + 1 + (params[4] - 1) / params[5] * params[2])
elif self.is_zap(scan):
return int(scan.command().split()[-2])
else:
return -1
[docs] @staticmethod
def is_aborted(scan):
for line in scan.header("C"):
if "Scan aborted" in line:
return True
return False
[docs] def find_edfs(self, pattern, scanno):
files = glob.glob(pattern)
ret = {}
for file in files:
try:
filename = os.path.basename(file).split(".")[0]
scan, point, image = filename.split("_")[-3:]
scan, point, image = int(scan), int(point), int(image)
if scan == scanno and point not in list(ret.keys()):
ret[point] = file
except ValueError:
continue
return ret
[docs] def get_wavelength(self, G):
for line in G:
if line.startswith("#G4"):
return float(line.split(" ")[4])
return None
# MAIN LOGIC
[docs] def get_scan_params(self, scan):
self.dbg_scanno = scan.number()
if self.is_zap(scan):
# zapscans don't contain the UB matrix, this needs to be fixed at ID03
scanno = scan.number()
UB = None
while 1: # look back in spec file to locate a UB matrix
try:
ubscan = self.get_scan(scanno)
except specfilewrapper.specfile.error:
break
try:
UB = numpy.array(
ubscan.header("G")[2].split(" ")[-9:], dtype=numpy.float
)
except:
scanno -= 1
else:
break
if UB is None:
# fall back to UB matrix from the configfile
if not self.config.UB:
raise errors.ConfigError(
"UB matrix must be specified in configuration file when processing zapscans"
)
UB = numpy.array(self.config.UB)
else:
UB = numpy.array(scan.header("G")[2].split(" ")[-9:], dtype=numpy.float)
if self.config.wavelength is None:
wavelength = self.get_wavelength(scan.header("G"))
if wavelength is None or wavelength == 0:
raise errors.BackendError(
"No or incorrect wavelength specified in the specfile. Please add wavelength to the configfile in the input section"
)
else:
wavelength = self.config.wavelength
self.metadict["UB"] = UB
self.metadict["wavelength"] = wavelength
return wavelength, UB
[docs] def get_images(self, scan, first, last, dry_run=False):
if self.config.background:
if not os.path.exists(self.config.background):
raise errors.FileError(
"could not find background file {0}".format(self.config.background)
)
if dry_run:
yield
else:
edf = EdfFile.EdfFile(self.config.background)
for i in range(first, last + 1):
self.dbg_pointno = i
yield edf.GetData(0)
else:
if self.is_zap(scan):
scanheaderC = scan.header("C")
zapscanno = int(
scanheaderC[2].split(" ")[-1]
) # is different from scanno should be changed in spec!
try:
uccdtagline = scanheaderC[0]
UCCD = os.path.split(uccdtagline.split()[-1])
except:
print(
"warning: UCCD tag not found, use imagefolder for proper file specification"
)
UCCD = []
pattern = self._get_pattern(UCCD)
matches = self.find_edfs(pattern, zapscanno)
if 0 not in matches:
raise errors.FileError(
"could not find matching edf for zapscannumber {0} using pattern {1}".format(
zapscanno, pattern
)
)
if dry_run:
yield
else:
edf = EdfFile.EdfFile(matches[0])
for i in range(first, last + 1):
self.dbg_pointno = i
yield edf.GetData(i)
else:
try:
uccdtagline = scan.header("UCCD")[0]
UCCD = os.path.split(os.path.dirname(uccdtagline.split()[-1]))
except:
print(
"warning: UCCD tag not found, use imagefolder for proper file specification"
)
UCCD = []
pattern = self._get_pattern(UCCD)
matches = self.find_edfs(pattern, scan.number())
if set(range(first, last + 1)) > set(matches.keys()):
raise errors.FileError(
"incorrect number of matches for scan {0} using pattern {1}".format(
scan.number(), pattern
)
)
if dry_run:
yield
else:
for i in range(first, last + 1):
self.dbg_pointno = i
edf = EdfFile.EdfFile(matches[i])
yield edf.GetData(0)
def _get_pattern(self, UCCD):
imagefolder = self.config.imagefolder
if imagefolder:
try:
imagefolder = imagefolder.format(UCCD=UCCD, rUCCD=list(reversed(UCCD)))
except Exception as e:
raise errors.ConfigError(
"invalid 'imagefolder' specification '{0}': {1}".format(
self.config.imagefolder, e
)
)
else:
if not os.path.exists(imagefolder):
raise errors.ConfigError(
"invalid 'imagefolder' specification '{0}'. Path {1} does not exist".format(
self.config.imagefolder, imagefolder
)
)
else:
imagefolder = os.path.join(*UCCD)
if not os.path.exists(imagefolder):
raise errors.ConfigError(
"invalid UCCD tag '{0}'. The UCCD tag in the specfile does not point to an existing folder. Specify the imagefolder in the configuration file.".format(
imagefolder
)
)
return os.path.join(imagefolder, "*")
[docs]class EH1(ID03Input):
monitor_counter = "mon"
[docs] def parse_config(self, config):
super(EH1, self).parse_config(config)
self.config.centralpixel = util.parse_tuple(
config.pop("centralpixel"), length=2, type=int
) # x,y
self.config.hr = config.pop(
"hr", None
) # Optional, hexapod rotations in miliradians. At the entered value the sample is assumed flat, if not entered the sample is assumed flat at the spec values.
self.config.UB = config.pop(
"ub", None
) # Optional, takes specfile matrix by default
if self.config.UB:
self.config.UB = util.parse_tuple(self.config.UB, length=9, type=float)
if self.config.hr:
self.config.hr = util.parse_tuple(self.config.hr, length=2, type=float)
[docs] def process_image(self, scanparams, pointparams, image):
gamma, delta, theta, chi, phi, mu, mon, transm, hrx, hry = pointparams
wavelength, UB = scanparams
weights = numpy.ones_like(image)
if self.config.hr:
zerohrx, zerohry = self.config.hr
chi = (hrx - zerohrx) / numpy.pi * 180.0 / 1000
phi = (hry - zerohry) / numpy.pi * 180.0 / 1000
if self.config.background:
data = image / mon
else:
data = image / mon / transm
if mon == 0:
raise errors.BackendError(
"Monitor is zero, this results in empty output. Scannumber = {0}, pointnumber = {1}. Did you forget to open the shutter?".format(
self.dbg_scanno, self.dbg_pointno
)
)
util.status(
"{4}| gamma: {0}, delta: {1}, theta: {2}, mu: {3}".format(
gamma, delta, theta, mu, time.ctime(time.time())
)
)
# pixels to angles
pixelsize = numpy.array(self.config.pixelsize)
sdd = self.config.sdd
app = numpy.arctan(pixelsize / sdd) * 180 / numpy.pi
centralpixel = self.config.centralpixel # (column, row) = (delta, gamma)
gamma_range = -app[1] * (numpy.arange(data.shape[1]) - centralpixel[1]) + gamma
delta_range = app[0] * (numpy.arange(data.shape[0]) - centralpixel[0]) + delta
# masking
if self.config.maskmatrix is not None:
if self.config.maskmatrix.shape != data.shape:
raise errors.BackendError(
"The mask matrix does not have the same shape as the images"
)
weights *= self.config.maskmatrix
gamma_range = gamma_range[self.config.ymask]
delta_range = delta_range[self.config.xmask]
intensity = self.apply_mask(data, self.config.xmask, self.config.ymask)
weights = self.apply_mask(weights, self.config.xmask, self.config.ymask)
# polarisation correction
delta_grid, gamma_grid = numpy.meshgrid(delta_range, gamma_range)
Pver = (
1
- numpy.sin(delta_grid * numpy.pi / 180.0) ** 2
* numpy.cos(gamma_grid * numpy.pi / 180.0) ** 2
)
intensity /= Pver
return (
intensity,
weights,
(wavelength, UB, gamma_range, delta_range, theta, mu, chi, phi),
)
[docs] def get_point_params(self, scan, first, last):
sl = slice(first, last + 1)
GAM, DEL, TH, CHI, PHI, MU, MON, TRANSM, HRX, HRY = list(range(10))
params = numpy.zeros(
(last - first + 1, 10)
) # gamma delta theta chi phi mu mon transm
params[:, CHI] = scan.motorpos("Chi")
params[:, PHI] = scan.motorpos("Phi")
try:
params[:, HRX] = scan.motorpos("hrx")
params[:, HRY] = scan.motorpos("hry")
except:
raise errors.BackendError(
"The specfile does not accept hrx and hry as a motor label. Have you selected the right hutch? Scannumber = {0}, pointnumber = {1}".format(
self.dbg_scanno, self.dbg_pointno
)
)
if self.is_zap(scan):
if "th" in scan.alllabels():
th = scan.datacol("th")[sl]
if len(th) > 1:
sign = numpy.sign(th[1] - th[0])
else:
sign = 1
# correction for difference between back and forth in th motor
params[:, TH] = th + sign * self.config.th_offset
else:
params[:, TH] = scan.motorpos("Theta")
params[:, GAM] = scan.motorpos("Gam")
params[:, DEL] = scan.motorpos("Delta")
params[:, MU] = scan.motorpos("Mu")
params[:, MON] = scan.datacol("zap_mon")[sl]
transm = scan.datacol("zap_transm")
transm[-1] = transm[-2] # bug in specfile
params[:, TRANSM] = transm[sl]
else:
if "hrx" in scan.alllabels():
params[:, HRX] = scan.datacol("hrx")[sl]
if "hry" in scan.alllabels():
params[:, HRY] = scan.datacol("hry")[sl]
params[:, TH] = scan.datacol("thcnt")[sl]
params[:, GAM] = scan.datacol("gamcnt")[sl]
params[:, DEL] = scan.datacol("delcnt")[sl]
try:
params[:, MON] = scan.datacol(self.monitor_counter)[
sl
] # differs in EH1/EH2
except:
raise errors.BackendError(
"The specfile does not accept {2} as a monitor label. Have you selected the right hutch? Scannumber = {0}, pointnumber = {1}".format(
self.dbg_scanno, self.dbg_pointno, self.monitor_counter
)
)
params[:, TRANSM] = scan.datacol("transm")[sl]
params[:, MU] = scan.datacol("mucnt")[sl]
return params
[docs]class EH2(ID03Input):
monitor_counter = "Monitor"
[docs] def parse_config(self, config):
super(EH2, self).parse_config(config)
self.config.centralpixel = util.parse_tuple(
config.pop("centralpixel"), length=2, type=int
) # x,y
self.config.UB = config.pop(
"ub", None
) # Optional, takes specfile matrix by default
if self.config.UB:
self.config.UB = util.parse_tuple(self.config.UB, length=9, type=float)
[docs] def process_image(self, scanparams, pointparams, image):
gamma, delta, theta, chi, phi, mu, mon, transm = pointparams
wavelength, UB = scanparams
weights = numpy.ones_like(image)
if self.config.background:
data = image / mon
else:
data = image / mon / transm
if mon == 0:
raise errors.BackendError(
"Monitor is zero, this results in empty output. Scannumber = {0}, pointnumber = {1}. Did you forget to open the shutter?".format(
self.dbg_scanno, self.dbg_pointno
)
)
util.status(
"{4}| gamma: {0}, delta: {1}, theta: {2}, mu: {3}".format(
gamma, delta, theta, mu, time.ctime(time.time())
)
)
# area correction
sdd = self.config.sdd / numpy.cos(gamma * numpy.pi / 180)
data *= (self.config.sdd / sdd) ** 2
# pixels to angles
pixelsize = numpy.array(self.config.pixelsize)
app = numpy.arctan(pixelsize / sdd) * 180 / numpy.pi
centralpixel = self.config.centralpixel # (row, column) = (gamma, delta)
gamma_range = (
-1 * app[0] * (numpy.arange(data.shape[0]) - centralpixel[0]) + gamma
)
delta_range = app[1] * (numpy.arange(data.shape[1]) - centralpixel[1]) + delta
# masking
if self.config.maskmatrix is not None:
if self.config.maskmatrix.shape != data.shape:
raise errors.BackendError(
"The mask matrix does not have the same shape as the images"
)
weights *= self.config.maskmatrix
gamma_range = gamma_range[self.config.xmask]
delta_range = delta_range[self.config.ymask]
intensity = self.apply_mask(data, self.config.xmask, self.config.ymask)
weights = self.apply_mask(weights, self.config.xmask, self.config.ymask)
intensity = numpy.fliplr(intensity)
intensity = numpy.rot90(intensity)
weights = numpy.fliplr(
weights
) # TODO: should be done more efficiently. Will prob change with new HKL calculations
weights = numpy.rot90(weights)
# polarisation correction
delta_grid, gamma_grid = numpy.meshgrid(delta_range, gamma_range)
Phor = (
1
- (
numpy.sin(mu * numpy.pi / 180.0)
* numpy.sin(delta_grid * numpy.pi / 180.0)
* numpy.cos(gamma_grid * numpy.pi / 180.0)
+ numpy.cos(mu * numpy.pi / 180.0)
* numpy.sin(gamma_grid * numpy.pi / 180.0)
)
** 2
)
intensity /= Phor
return (
intensity,
weights,
(wavelength, UB, gamma_range, delta_range, theta, mu, chi, phi),
)
[docs] def get_point_params(self, scan, first, last):
sl = slice(first, last + 1)
GAM, DEL, TH, CHI, PHI, MU, MON, TRANSM = list(range(8))
params = numpy.zeros(
(last - first + 1, 8)
) # gamma delta theta chi phi mu mon transm
params[:, CHI] = scan.motorpos("Chi")
params[:, PHI] = scan.motorpos("Phi")
if self.is_zap(scan):
if "th" in scan.alllabels():
th = scan.datacol("th")[sl]
if len(th) > 1:
sign = numpy.sign(th[1] - th[0])
else:
sign = 1
# correction for difference between back and forth in th motor
params[:, TH] = th + sign * self.config.th_offset
else:
params[:, TH] = scan.motorpos("Theta")
params[:, GAM] = scan.motorpos("Gamma")
params[:, DEL] = scan.motorpos("Delta")
params[:, MU] = scan.motorpos("Mu")
params[:, MON] = scan.datacol("zap_mon")[sl]
transm = scan.datacol("zap_transm")
transm[-1] = transm[-2] # bug in specfile
params[:, TRANSM] = transm[sl]
else:
params[:, TH] = scan.datacol("thcnt")[sl]
params[:, GAM] = scan.datacol("gamcnt")[sl]
params[:, DEL] = scan.datacol("delcnt")[sl]
try:
params[:, MON] = scan.datacol(self.monitor_counter)[
sl
] # differs in EH1/EH2
except:
raise errors.BackendError(
"The specfile does not accept {2} as a monitor label. Have you selected the right hutch? Scannumber = {0}, pointnumber = {1}".format(
self.dbg_scanno, self.dbg_pointno, self.monitor_counter
)
)
params[:, TRANSM] = scan.datacol("transm")[sl]
params[:, MU] = scan.datacol("mucnt")[sl]
return params
[docs]class GisaxsDetector(ID03Input):
monitor_counter = "mon"
[docs] def process_image(self, scanparams, pointparams, image):
ccdy, ccdz, theta, chi, phi, mu, mon, transm = pointparams
weights = numpy.ones_like(image)
wavelength, UB = scanparams
if self.config.background:
data = image / mon
else:
data = image / mon / transm
if mon == 0:
raise errors.BackendError(
"Monitor is zero, this results in empty output. Scannumber = {0}, pointnumber = {1}. Did you forget to open the shutter?".format(
self.dbg_scanno, self.dbg_pointno
)
)
util.status(
"{4}| ccdy: {0}, ccdz: {1}, theta: {2}, mu: {3}".format(
ccdy, ccdz, theta, mu, time.ctime(time.time())
)
)
# pixels to angles
pixelsize = numpy.array(self.config.pixelsize)
sdd = self.config.sdd
directbeam = (
self.config.directbeam[0]
- (ccdy - self.config.directbeam_coords[0]) * pixelsize[0],
self.config.directbeam[1]
- (ccdz - self.config.directbeam_coords[1]) * pixelsize[1],
)
gamma_distance = -pixelsize[1] * (numpy.arange(data.shape[1]) - directbeam[1])
delta_distance = -pixelsize[0] * (numpy.arange(data.shape[0]) - directbeam[0])
gamma_range = numpy.arctan2(gamma_distance, sdd) / numpy.pi * 180 - mu
delta_range = numpy.arctan2(delta_distance, sdd) / numpy.pi * 180
# sample pixel distance
spd = numpy.sqrt(gamma_distance ** 2 + delta_distance ** 2 + sdd ** 2)
data *= spd ** 2 / sdd
# masking
if self.config.maskmatrix is not None:
if self.config.maskmatrix.shape != data.shape:
raise errors.BackendError(
"The mask matrix does not have the same shape as the images"
)
weights *= self.config.maskmatrix
gamma_range = gamma_range[self.config.ymask]
delta_range = delta_range[self.config.xmask]
intensity = self.apply_mask(data, self.config.xmask, self.config.ymask)
weights = self.apply_mask(weights, self.config.xmask, self.config.ymask)
return (
intensity,
weights,
(wavelength, UB, gamma_range, delta_range, theta, mu, chi, phi),
)
[docs] def parse_config(self, config):
super(GisaxsDetector, self).parse_config(config)
self.config.directbeam = util.parse_tuple(
config.pop("directbeam"), length=2, type=int
)
self.config.directbeam_coords = util.parse_tuple(
config.pop("directbeam_coords"), length=2, type=float
) # Coordinates of ccdy and ccdz at the direct beam position
[docs] def get_point_params(self, scan, first, last):
sl = slice(first, last + 1)
CCDY, CCDZ, TH, CHI, PHI, MU, MON, TRANSM = list(range(8))
params = numpy.zeros(
(last - first + 1, 8)
) # gamma delta theta chi phi mu mon transm
params[:, CHI] = scan.motorpos("Chi")
params[:, PHI] = scan.motorpos("Phi")
params[:, CCDY] = scan.motorpos("ccdy")
params[:, CCDZ] = scan.motorpos("ccdz")
params[:, TH] = scan.datacol("thcnt")[sl]
try:
params[:, MON] = scan.datacol(self.monitor_counter)[
sl
] # differs in EH1/EH2
except:
raise errors.BackendError(
"The specfile does not accept {2} as a monitor label. Have you selected the right hutch? Scannumber = {0}, pointnumber = {1}".format(
self.dbg_scanno, self.dbg_pointno, self.monitor_counter
)
)
params[:, TRANSM] = scan.datacol("transm")[sl]
params[:, MU] = scan.datacol("mucnt")[sl]
return params
[docs] def find_edfs(self, pattern, scanno):
files = glob.glob(pattern)
ret = {}
for file in files:
try:
filename = os.path.basename(file).split(".")[0]
scan, point = filename.split("_")[-2:]
scan, point = int(scan), int(point)
if scan == scanno and point not in list(ret.keys()):
ret[point] = file
except ValueError:
continue
return ret
[docs]def load_matrix(filename):
if filename == None:
return None
if os.path.exists(filename):
ext = os.path.splitext(filename)[-1]
if ext == ".txt":
return numpy.array(numpy.loadtxt(filename), dtype=numpy.bool)
elif ext == ".npy":
return numpy.array(numpy.load(filename), dtype=numpy.bool)
elif ext == ".edf":
return numpy.array(EdfFile.EdfFile(filename).getData(0), dtype=numpy.bool)
else:
raise ValueError(
"unknown extension {0}, unable to load matrix!\n".format(ext)
)
else:
raise IOError(
"filename: {0} does not exist. Can not load matrix".format(filename)
)