Skip to content

XRD

pydatalab.apps.xrd special

blocks

XRDBlock (DataBlock)

Source code in pydatalab/apps/xrd/blocks.py
class XRDBlock(DataBlock):
    blocktype = "xrd"
    name = "Powder XRD"
    description = "Visualize XRD patterns and perform simple baseline corrections."
    accepted_file_extensions = (".xrdml", ".xy", ".dat", ".xye")

    defaults = {"wavelength": 1.54060}

    @property
    def plot_functions(self):
        return (self.generate_xrd_plot,)

    @classmethod
    def load_pattern(
        self, location: str, wavelength: float | None = None
    ) -> Tuple[pd.DataFrame, List[str]]:
        if not isinstance(location, str):
            location = str(location)

        ext = os.path.splitext(location.split("/")[-1])[-1].lower()

        if ext == ".xrdml":
            df = parse_xrdml(location)

        else:
            columns = ["twotheta", "intensity", "error"]
            # Try to parse the file by incrementing skiprows until all lines can be cast to np.float64
            skiprows: int = 0
            # Set arbitrary limit to avoid infinite loop; a header of 10,000 lines is unlikely to be useful
            while skiprows < 10_000:
                try:
                    df = pd.read_csv(
                        location, sep=r"\s+", names=columns, dtype=np.float64, skiprows=skiprows
                    )
                    break
                except ValueError:
                    skiprows += 1
            else:
                raise RuntimeError(
                    f"Unable to extract XRD data from file {location}; check file header for irregularities"
                )

            if skiprows > 0:
                with open(location) as f:
                    header = "".join([next(f) for _ in range(skiprows)])
                    df.attrs["header"] = header

        if len(df) == 0:
            raise RuntimeError(f"No compatible data found in {location}")

        df = df.rename(columns={"twotheta": "2θ (°)"})

        # if no wavelength (or invalid wavelength) is passed, don't convert to Q and d
        if wavelength:
            try:
                df["Q (Å⁻¹)"] = 4 * np.pi / wavelength * np.sin(np.deg2rad(df["2θ (°)"]) / 2)
                df["d (Å)"] = 2 * np.pi / df["Q (Å⁻¹)"]
            except (ValueError, ZeroDivisionError):
                pass

        df["sqrt(intensity)"] = np.sqrt(df["intensity"])
        df["log(intensity)"] = np.log10(df["intensity"])
        df["normalized intensity"] = df["intensity"] / np.max(df["intensity"])
        polyfit_deg = 15
        polyfit_baseline = np.poly1d(
            np.polyfit(df["2θ (°)"], df["normalized intensity"], deg=polyfit_deg)
        )(df["2θ (°)"])
        df["intensity - polyfit baseline"] = df["normalized intensity"] - polyfit_baseline
        df["intensity - polyfit baseline"] /= np.max(df["intensity - polyfit baseline"])
        df[f"baseline (`numpy.polyfit`, deg={polyfit_deg})"] = polyfit_baseline / np.max(
            df["intensity - polyfit baseline"]
        )

        kernel_size = 101
        median_baseline = medfilt(df["normalized intensity"], kernel_size=kernel_size)
        df["intensity - median baseline"] = df["normalized intensity"] - median_baseline
        df["intensity - median baseline"] /= np.max(df["intensity - median baseline"])
        df[f"baseline (`scipy.signal.medfilt`, kernel_size={kernel_size})"] = (
            median_baseline / np.max(df["intensity - median baseline"])
        )

        df.index.name = location.split("/")[-1]

        y_options = [
            "normalized intensity",
            "intensity",
            "sqrt(intensity)",
            "log(intensity)",
            "intensity - median baseline",
            f"baseline (`scipy.signal.medfilt`, kernel_size={kernel_size})",
            "intensity - polyfit baseline",
            f"baseline (`numpy.polyfit`, deg={polyfit_deg})",
        ]

        return df, y_options

    def generate_xrd_plot(self):
        file_info = None
        all_files = None
        pattern_dfs = None

        if "file_id" not in self.data:
            # If no file set, try to plot them all
            item_info = flask_mongo.db.items.find_one(
                {"item_id": self.data["item_id"]},
            )

            all_files = [
                d
                for d in [
                    get_file_info_by_id(f, update_if_live=False)
                    for f in item_info["file_ObjectIds"]
                ]
                if any(d["name"].lower().endswith(ext) for ext in self.accepted_file_extensions)
            ]

            if not all_files:
                LOGGER.warning("XRDBlock.generate_xrd_plot(): No files found on sample")
                return

            pattern_dfs = []
            for f in all_files:
                try:
                    pattern_df, y_options = self.load_pattern(
                        f["location"],
                        wavelength=float(self.data.get("wavelength", self.defaults["wavelength"])),
                    )
                except Exception as exc:
                    raise RuntimeError(
                        f"Could not parse file {file_info['location']}. Error: {exc}"
                    )
                pattern_dfs.append(pattern_df)

        else:
            file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
            ext = os.path.splitext(file_info["location"].split("/")[-1])[-1].lower()
            if ext not in self.accepted_file_extensions:
                raise RuntimeError(
                    "XRDBlock.generate_xrd_plot(): Unsupported file extension (must be one of %s), not %s",
                    self.accepted_file_extensions,
                    ext,
                )

            pattern_dfs, y_options = self.load_pattern(
                file_info["location"],
                wavelength=float(self.data.get("wavelength", self.defaults["wavelength"])),
            )
            pattern_dfs = [pattern_dfs]

        if pattern_dfs:
            p = selectable_axes_plot(
                pattern_dfs,
                x_options=["2θ (°)", "Q (Å⁻¹)", "d (Å)"],
                y_options=y_options,
                plot_line=True,
                plot_points=True,
                point_size=3,
            )

            self.data["bokeh_plot_data"] = bokeh.embed.json_item(p, theme=DATALAB_BOKEH_THEME)
accepted_file_extensions: tuple[str, ...] | None

A list of file extensions that the block will attempt to read.

blocktype: str

A short (unique) string key specifying the type of block.

defaults: Dict[str, Any]

Any default values that should be set if they are not supplied during block init.

description: str

A longer description outlining the purpose and capability of the block.

name: str

The human-readable block name specifying which technique or file format it pertains to.

plot_functions property readonly
load_pattern(location: str, wavelength: float | None = None) -> Tuple[pandas.core.frame.DataFrame, List[str]] classmethod
Source code in pydatalab/apps/xrd/blocks.py
@classmethod
def load_pattern(
    self, location: str, wavelength: float | None = None
) -> Tuple[pd.DataFrame, List[str]]:
    if not isinstance(location, str):
        location = str(location)

    ext = os.path.splitext(location.split("/")[-1])[-1].lower()

    if ext == ".xrdml":
        df = parse_xrdml(location)

    else:
        columns = ["twotheta", "intensity", "error"]
        # Try to parse the file by incrementing skiprows until all lines can be cast to np.float64
        skiprows: int = 0
        # Set arbitrary limit to avoid infinite loop; a header of 10,000 lines is unlikely to be useful
        while skiprows < 10_000:
            try:
                df = pd.read_csv(
                    location, sep=r"\s+", names=columns, dtype=np.float64, skiprows=skiprows
                )
                break
            except ValueError:
                skiprows += 1
        else:
            raise RuntimeError(
                f"Unable to extract XRD data from file {location}; check file header for irregularities"
            )

        if skiprows > 0:
            with open(location) as f:
                header = "".join([next(f) for _ in range(skiprows)])
                df.attrs["header"] = header

    if len(df) == 0:
        raise RuntimeError(f"No compatible data found in {location}")

    df = df.rename(columns={"twotheta": "2θ (°)"})

    # if no wavelength (or invalid wavelength) is passed, don't convert to Q and d
    if wavelength:
        try:
            df["Q (Å⁻¹)"] = 4 * np.pi / wavelength * np.sin(np.deg2rad(df["2θ (°)"]) / 2)
            df["d (Å)"] = 2 * np.pi / df["Q (Å⁻¹)"]
        except (ValueError, ZeroDivisionError):
            pass

    df["sqrt(intensity)"] = np.sqrt(df["intensity"])
    df["log(intensity)"] = np.log10(df["intensity"])
    df["normalized intensity"] = df["intensity"] / np.max(df["intensity"])
    polyfit_deg = 15
    polyfit_baseline = np.poly1d(
        np.polyfit(df["2θ (°)"], df["normalized intensity"], deg=polyfit_deg)
    )(df["2θ (°)"])
    df["intensity - polyfit baseline"] = df["normalized intensity"] - polyfit_baseline
    df["intensity - polyfit baseline"] /= np.max(df["intensity - polyfit baseline"])
    df[f"baseline (`numpy.polyfit`, deg={polyfit_deg})"] = polyfit_baseline / np.max(
        df["intensity - polyfit baseline"]
    )

    kernel_size = 101
    median_baseline = medfilt(df["normalized intensity"], kernel_size=kernel_size)
    df["intensity - median baseline"] = df["normalized intensity"] - median_baseline
    df["intensity - median baseline"] /= np.max(df["intensity - median baseline"])
    df[f"baseline (`scipy.signal.medfilt`, kernel_size={kernel_size})"] = (
        median_baseline / np.max(df["intensity - median baseline"])
    )

    df.index.name = location.split("/")[-1]

    y_options = [
        "normalized intensity",
        "intensity",
        "sqrt(intensity)",
        "log(intensity)",
        "intensity - median baseline",
        f"baseline (`scipy.signal.medfilt`, kernel_size={kernel_size})",
        "intensity - polyfit baseline",
        f"baseline (`numpy.polyfit`, deg={polyfit_deg})",
    ]

    return df, y_options
generate_xrd_plot(self)
Source code in pydatalab/apps/xrd/blocks.py
def generate_xrd_plot(self):
    file_info = None
    all_files = None
    pattern_dfs = None

    if "file_id" not in self.data:
        # If no file set, try to plot them all
        item_info = flask_mongo.db.items.find_one(
            {"item_id": self.data["item_id"]},
        )

        all_files = [
            d
            for d in [
                get_file_info_by_id(f, update_if_live=False)
                for f in item_info["file_ObjectIds"]
            ]
            if any(d["name"].lower().endswith(ext) for ext in self.accepted_file_extensions)
        ]

        if not all_files:
            LOGGER.warning("XRDBlock.generate_xrd_plot(): No files found on sample")
            return

        pattern_dfs = []
        for f in all_files:
            try:
                pattern_df, y_options = self.load_pattern(
                    f["location"],
                    wavelength=float(self.data.get("wavelength", self.defaults["wavelength"])),
                )
            except Exception as exc:
                raise RuntimeError(
                    f"Could not parse file {file_info['location']}. Error: {exc}"
                )
            pattern_dfs.append(pattern_df)

    else:
        file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
        ext = os.path.splitext(file_info["location"].split("/")[-1])[-1].lower()
        if ext not in self.accepted_file_extensions:
            raise RuntimeError(
                "XRDBlock.generate_xrd_plot(): Unsupported file extension (must be one of %s), not %s",
                self.accepted_file_extensions,
                ext,
            )

        pattern_dfs, y_options = self.load_pattern(
            file_info["location"],
            wavelength=float(self.data.get("wavelength", self.defaults["wavelength"])),
        )
        pattern_dfs = [pattern_dfs]

    if pattern_dfs:
        p = selectable_axes_plot(
            pattern_dfs,
            x_options=["2θ (°)", "Q (Å⁻¹)", "d (Å)"],
            y_options=y_options,
            plot_line=True,
            plot_points=True,
            point_size=3,
        )

        self.data["bokeh_plot_data"] = bokeh.embed.json_item(p, theme=DATALAB_BOKEH_THEME)

models

XRDPattern (BaseModel) pydantic-model

This model defines the structure of the data that is expected for a solid-state XRD pattern.

Source code in pydatalab/apps/xrd/models.py
class XRDPattern(BaseModel):
    """This model defines the structure of the data that is expected
    for a solid-state XRD pattern.

    """

    wavelength: float

    two_theta: List[float]

    d_spacings: List[float]

    q_values: List[float]

    intensities: List[float]
wavelength: float pydantic-field required
two_theta: List[float] pydantic-field required
d_spacings: List[float] pydantic-field required
q_values: List[float] pydantic-field required
intensities: List[float] pydantic-field required

XRDProcessing (BaseModel) pydantic-model

Source code in pydatalab/apps/xrd/models.py
class XRDProcessing(BaseModel):
    peak_positions: List[float]

    peak_intensities: List[float]

    peak_widths: List[float]

    baselines: List[List[float]]

    class Config:
        extra = "allow"
peak_positions: List[float] pydantic-field required
peak_intensities: List[float] pydantic-field required
peak_widths: List[float] pydantic-field required
baselines: List[List[float]] pydantic-field required

XRDMetadata (BaseModel) pydantic-model

Source code in pydatalab/apps/xrd/models.py
class XRDMetadata(BaseModel): ...

XRDMeasurement (BaseModel) pydantic-model

Source code in pydatalab/apps/xrd/models.py
class XRDMeasurement(BaseModel):
    data: Optional[XRDPattern]
    processing: Optional[XRDProcessing]
    metadata: Optional[XRDMetadata]
data: XRDPattern pydantic-field
processing: XRDProcessing pydantic-field
metadata: XRDMetadata pydantic-field

utils

DATA_REGEX

STARTEND_REGEX

XrdmlParseError (Exception)

Source code in pydatalab/apps/xrd/utils.py
class XrdmlParseError(Exception):
    pass

parse_xrdml(filename: str) -> DataFrame

Parses an XRDML file and returns a pandas DataFrame with columns twotheta and intensity.

Parameters:

Name Type Description Default
filename str

The file to parse.

required
Source code in pydatalab/apps/xrd/utils.py
def parse_xrdml(filename: str) -> pd.DataFrame:
    """Parses an XRDML file and returns a pandas DataFrame with columns
    twotheta and intensity.

    Parameters:
        filename: The file to parse.

    """
    with open(filename) as f:
        s = f.read()

    start, end = getStartEnd(s)  # extract first and last angle
    intensities = getIntensities(s)  # extract intensities

    angles = np.linspace(start, end, num=len(intensities))

    return pd.DataFrame(
        {
            "twotheta": angles,
            "intensity": intensities,
        }
    )

convertSinglePattern(filename: str, directory: str = '.', adjust_baseline: bool = False, overwrite: bool = False) -> str

Converts an XRDML file to a simple xy and writes it to the passed directory, without overwriting any existing files.

Parameters:

Name Type Description Default
filename str

The file to convert.

required
directory str

The output directory.

'.'
adjust_baseline bool

If True, the baseline will be adjusted so that no points are negative.

False
overwrite bool

If True, existing files with the same filenames will be overwritten.

False

Returns:

Type Description
str

The output filename.

Source code in pydatalab/apps/xrd/utils.py
def convertSinglePattern(
    filename: str,
    directory: str = ".",
    adjust_baseline: bool = False,
    overwrite: bool = False,
) -> str:
    """Converts an XRDML file to a simple xy and writes it to the passed directory, without
    overwriting any existing files.

    Parameters:
        filename: The file to convert.
        directory: The output directory.
        adjust_baseline: If True, the baseline will be adjusted so that no points are negative.
        overwrite: If True, existing files with the same filenames will be overwritten.

    Returns:
        The output filename.

    """
    filename = os.path.join(directory, filename)
    outfn = filename + ".xy"
    if os.path.exists(outfn):
        if overwrite:
            print(f"{outfn} already exists in the directory {directory}. Overwriting.")
        else:
            warnings.warn(
                f"{outfn} already exists in the directory {directory}, will not overwrite"
            )
            return outfn

    with open(filename) as f:
        s = f.read()

    print(f"Processing file {filename}")
    start, end = getStartEnd(s)
    print(f"\tstart angle: {start}\tend angle: {end}")
    intensities = getIntensities(s)

    if adjust_baseline:
        _intensities = np.array(intensities)  # type: ignore
        minI: float = np.min(_intensities)
        if minI < 0:
            print(
                f"\tadjusting baseline so that no points are negative (adding {-1 * minI} counts)"
            )
            _intensities -= minI
        else:
            print("\tno intensitites are less than zero, so no baseline adjustment performed")

        intensities = _intensities.tolist()  # type: ignore

    print(f"\tnumber of datapoints: {len(intensities)}")
    xystring = toXY(intensities, start, end)
    with open(outfn, "w") as of:
        of.write(xystring)
    print("\tSuccess!")
    return outfn

getStartEnd(s: str) -> Tuple[float, float]

Parse a given string representation of an xrdml file to find the start and end 2Theta points of the scan. Note: this could match either Omega or 2Theta depending on their order in the XRDML file.

Exceptions:

Type Description
XrdmlParseError

if the start and end positions could not be found.

Returns:

Type Description
Tuple[float, float]

(start, end) positions in the XRDML file.

Source code in pydatalab/apps/xrd/utils.py
def getStartEnd(s: str) -> Tuple[float, float]:
    """Parse a given string representation of an xrdml file to find the start and end 2Theta points of the scan.
    Note: this could match either Omega or 2Theta depending on their order in the XRDML file.

    Raises:
        XrdmlParseError: if the start and end positions could not be found.

    Returns:
        (start, end) positions in the XRDML file.

    """
    match = re.search(STARTEND_REGEX, s)
    if not match:
        raise XrdmlParseError("the start and end 2theta positions were not found in the XRDML file")

    start = float(match.group(1))
    end = float(match.group(2))

    return start, end

getIntensities(s: str) -> List[float]

Parse a given string representation of an xrdml file to find the peak intensities.

Exceptions:

Type Description
XrdmlParseError

if intensities could not be found in the file

Returns:

Type Description
List[float]

The array of intensitites.

Source code in pydatalab/apps/xrd/utils.py
def getIntensities(s: str) -> List[float]:
    """Parse a given string representation of an xrdml file to find the peak intensities.

    Raises:
        XrdmlParseError: if intensities could not be found in the file

    Returns:
        The array of intensitites.

    """
    match = re.search(DATA_REGEX, s)
    if not match:
        raise XrdmlParseError("the intensitites were not found in the XML file")

    out = [float(x) for x in match.group(2).split()]  # the intensitites as a list of integers
    return out

toXY(intensities: List[float], start: float, end: float) -> str

Converts a given list of intensities, along with a start and end angle, to a string in XY format.

Source code in pydatalab/apps/xrd/utils.py
def toXY(intensities: List[float], start: float, end: float) -> str:
    """Converts a given list of intensities, along with a start and end angle,
    to a string in XY format.

    """
    angles = np.linspace(start, end, num=len(intensities))
    xylines = ["{:.5f} {:.3f}\r\n".format(a, i) for a, i in zip(angles, intensities)]
    return "".join(xylines)