Skip to content

Raman

pydatalab.apps.raman special

blocks

RamanBlock (DataBlock)

Source code in pydatalab/apps/raman/blocks.py
class RamanBlock(DataBlock):
    blocktype = "raman"
    name = "Raman spectroscopy"
    description = "Visualize 1D Raman spectroscopy data."
    accepted_file_extensions = (".txt", ".wdf")

    @property
    def plot_functions(self):
        return (self.generate_raman_plot,)

    @classmethod
    def load(self, location: str | Path) -> tuple[pd.DataFrame, dict, list[str]]:
        if not isinstance(location, str):
            location = str(location)
        ext = os.path.splitext(location)[-1].lower()

        vendor = None
        metadata: dict = {}
        if ext == ".txt":
            try:
                header = []
                with open(location, encoding="cp1252") as f:
                    for line in f:
                        if line.startswith("#"):
                            header.append(line)
                    if "#Wave" in header[0] and "#Intensity" in header[0]:
                        vendor = "renishaw"
                    else:
                        metadata = {
                            key: value for key, value in [line.split("=") for line in header]
                        }
                        if (
                            metadata.get("#AxisType[0]") == "Intens\n"
                            and metadata.get("#AxisType[1]") == "Spectr\n"
                        ):
                            vendor = "labspec"
                if vendor == "renishaw":
                    df = pd.DataFrame(np.loadtxt(location), columns=["wavenumber", "intensity"])
                elif vendor == "labspec":
                    df = pd.DataFrame(
                        np.loadtxt(location, encoding="cp1252"), columns=["wavenumber", "intensity"]
                    )
                    metadata = {}
            except IndexError:
                pass
        elif ext == ".wdf":
            vendor = "renishaw"
            df, metadata = self.make_wdf_df(location)
        if not vendor:
            raise Exception(
                "Could not detect Raman data vendor -- this file type is not supported by this block."
            )

        df["sqrt(intensity)"] = np.sqrt(df["intensity"])
        df["log(intensity)"] = np.log10(df["intensity"])
        df["normalized intensity"] = df["intensity"] / np.max(df["intensity"])
        polyfit_deg = 15
        polyfit_baseline = np.poly1d(
            np.polyfit(df["wavenumber"], df["normalized intensity"], deg=polyfit_deg)
        )(df["wavenumber"])
        df["intensity - polyfit baseline"] = df["normalized intensity"] - polyfit_baseline
        df[f"baseline (`numpy.polyfit`, {polyfit_deg=})"] = polyfit_baseline / np.max(
            df["intensity - polyfit baseline"]
        )
        df["intensity - polyfit baseline"] /= np.max(df["intensity - polyfit baseline"])

        kernel_size = 101
        median_baseline = medfilt(df["normalized intensity"], kernel_size=kernel_size)
        df["intensity - median baseline"] = df["normalized intensity"] - median_baseline
        df[f"baseline (`scipy.signal.medfilt`, {kernel_size=})"] = median_baseline / np.max(
            df["intensity - median baseline"]
        )
        df["intensity - median baseline"] /= np.max(df["intensity - median baseline"])

        # baseline calculation I used in my data
        half_window = round(
            0.03 * df.shape[0]
        )  # a value which worked for my data, not sure how universally good it will be
        baseline_fitter = Baseline(x_data=df["wavenumber"])
        morphological_baseline = baseline_fitter.mor(
            df["normalized intensity"], half_window=half_window
        )[0]
        df["intensity - morphological baseline"] = (
            df["normalized intensity"] - morphological_baseline
        )
        df[f"baseline (`pybaselines.Baseline.mor`, {half_window=})"] = (
            morphological_baseline / np.max(df["intensity - morphological baseline"])
        )
        df["intensity - morphological baseline"] /= np.max(df["intensity - morphological baseline"])
        df.index.name = location.split("/")[-1]

        y_options = [
            "normalized intensity",
            "intensity",
            "sqrt(intensity)",
            "log(intensity)",
            "intensity - median baseline",
            f"baseline (`scipy.signal.medfilt`, {kernel_size=})",
            "intensity - polyfit baseline",
            f"baseline (`numpy.polyfit`, {polyfit_deg=})",
            "intensity - morphological baseline",
            f"baseline (`pybaselines.Baseline.mor`, {half_window=})",
        ]
        return df, metadata, y_options

    @classmethod
    def make_wdf_df(self, location: Path | str) -> pd.DataFrame:
        """Read the .wdf file with RosettaSciIO and try to extract
        1D Raman spectra.

        Parameters:
            location: The location of the file to read.

        Returns:
            A dataframe with the appropriate columns.

        """

        try:
            raman_data = file_reader(location)
        except Exception as e:
            raise RuntimeError(f"Could not read file with RosettaSciIO. Error: {e}")

        if len(raman_data[0]["axes"]) == 1:
            pass
        elif len(raman_data[0]["axes"]) == 3:
            raise RuntimeError("This block does not support 2D Raman yet.")
        else:
            raise RuntimeError("Data is not compatible 1D or 2D Raman data.")

        intensity = raman_data[0]["data"]
        wavenumber_size = raman_data[0]["axes"][0]["size"]
        wavenumber_offset = raman_data[0]["axes"][0]["offset"]
        wavenumber_scale = raman_data[0]["axes"][0]["scale"]
        wavenumbers = np.array(
            [wavenumber_offset + i * wavenumber_scale for i in range(wavenumber_size)]
        )
        df = pd.DataFrame({"wavenumber": wavenumbers, "intensity": intensity})
        return df, raman_data[0]["metadata"]

    def generate_raman_plot(self):
        file_info = None
        pattern_dfs = None

        if "file_id" not in self.data:
            return None

        else:
            file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
            ext = os.path.splitext(file_info["location"].split("/")[-1])[-1].lower()
            if ext not in self.accepted_file_extensions:
                raise RuntimeError(
                    "RamanBlock.generate_raman_plot(): Unsupported file extension (must be one of %s), not %s",
                    self.accepted_file_extensions,
                    ext,
                )
            pattern_dfs, _, y_options = self.load(file_info["location"])
            pattern_dfs = [pattern_dfs]

        if pattern_dfs:
            p = selectable_axes_plot(
                pattern_dfs,
                x_options=["wavenumber"],
                y_options=y_options,
                plot_line=True,
                plot_points=True,
                point_size=3,
            )

            self.data["bokeh_plot_data"] = bokeh.embed.json_item(p, theme=DATALAB_BOKEH_THEME)
accepted_file_extensions: tuple[str, ...] | None

A list of file extensions that the block will attempt to read.

blocktype: str

A short (unique) string key specifying the type of block.

description: str

A longer description outlining the purpose and capability of the block.

name: str

The human-readable block name specifying which technique or file format it pertains to.

plot_functions property readonly
load(location: str | pathlib.Path) -> tuple classmethod
Source code in pydatalab/apps/raman/blocks.py
@classmethod
def load(self, location: str | Path) -> tuple[pd.DataFrame, dict, list[str]]:
    if not isinstance(location, str):
        location = str(location)
    ext = os.path.splitext(location)[-1].lower()

    vendor = None
    metadata: dict = {}
    if ext == ".txt":
        try:
            header = []
            with open(location, encoding="cp1252") as f:
                for line in f:
                    if line.startswith("#"):
                        header.append(line)
                if "#Wave" in header[0] and "#Intensity" in header[0]:
                    vendor = "renishaw"
                else:
                    metadata = {
                        key: value for key, value in [line.split("=") for line in header]
                    }
                    if (
                        metadata.get("#AxisType[0]") == "Intens\n"
                        and metadata.get("#AxisType[1]") == "Spectr\n"
                    ):
                        vendor = "labspec"
            if vendor == "renishaw":
                df = pd.DataFrame(np.loadtxt(location), columns=["wavenumber", "intensity"])
            elif vendor == "labspec":
                df = pd.DataFrame(
                    np.loadtxt(location, encoding="cp1252"), columns=["wavenumber", "intensity"]
                )
                metadata = {}
        except IndexError:
            pass
    elif ext == ".wdf":
        vendor = "renishaw"
        df, metadata = self.make_wdf_df(location)
    if not vendor:
        raise Exception(
            "Could not detect Raman data vendor -- this file type is not supported by this block."
        )

    df["sqrt(intensity)"] = np.sqrt(df["intensity"])
    df["log(intensity)"] = np.log10(df["intensity"])
    df["normalized intensity"] = df["intensity"] / np.max(df["intensity"])
    polyfit_deg = 15
    polyfit_baseline = np.poly1d(
        np.polyfit(df["wavenumber"], df["normalized intensity"], deg=polyfit_deg)
    )(df["wavenumber"])
    df["intensity - polyfit baseline"] = df["normalized intensity"] - polyfit_baseline
    df[f"baseline (`numpy.polyfit`, {polyfit_deg=})"] = polyfit_baseline / np.max(
        df["intensity - polyfit baseline"]
    )
    df["intensity - polyfit baseline"] /= np.max(df["intensity - polyfit baseline"])

    kernel_size = 101
    median_baseline = medfilt(df["normalized intensity"], kernel_size=kernel_size)
    df["intensity - median baseline"] = df["normalized intensity"] - median_baseline
    df[f"baseline (`scipy.signal.medfilt`, {kernel_size=})"] = median_baseline / np.max(
        df["intensity - median baseline"]
    )
    df["intensity - median baseline"] /= np.max(df["intensity - median baseline"])

    # baseline calculation I used in my data
    half_window = round(
        0.03 * df.shape[0]
    )  # a value which worked for my data, not sure how universally good it will be
    baseline_fitter = Baseline(x_data=df["wavenumber"])
    morphological_baseline = baseline_fitter.mor(
        df["normalized intensity"], half_window=half_window
    )[0]
    df["intensity - morphological baseline"] = (
        df["normalized intensity"] - morphological_baseline
    )
    df[f"baseline (`pybaselines.Baseline.mor`, {half_window=})"] = (
        morphological_baseline / np.max(df["intensity - morphological baseline"])
    )
    df["intensity - morphological baseline"] /= np.max(df["intensity - morphological baseline"])
    df.index.name = location.split("/")[-1]

    y_options = [
        "normalized intensity",
        "intensity",
        "sqrt(intensity)",
        "log(intensity)",
        "intensity - median baseline",
        f"baseline (`scipy.signal.medfilt`, {kernel_size=})",
        "intensity - polyfit baseline",
        f"baseline (`numpy.polyfit`, {polyfit_deg=})",
        "intensity - morphological baseline",
        f"baseline (`pybaselines.Baseline.mor`, {half_window=})",
    ]
    return df, metadata, y_options
make_wdf_df(location: pathlib.Path | str) -> DataFrame classmethod

Read the .wdf file with RosettaSciIO and try to extract 1D Raman spectra.

Parameters:

Name Type Description Default
location pathlib.Path | str

The location of the file to read.

required

Returns:

Type Description
DataFrame

A dataframe with the appropriate columns.

Source code in pydatalab/apps/raman/blocks.py
@classmethod
def make_wdf_df(self, location: Path | str) -> pd.DataFrame:
    """Read the .wdf file with RosettaSciIO and try to extract
    1D Raman spectra.

    Parameters:
        location: The location of the file to read.

    Returns:
        A dataframe with the appropriate columns.

    """

    try:
        raman_data = file_reader(location)
    except Exception as e:
        raise RuntimeError(f"Could not read file with RosettaSciIO. Error: {e}")

    if len(raman_data[0]["axes"]) == 1:
        pass
    elif len(raman_data[0]["axes"]) == 3:
        raise RuntimeError("This block does not support 2D Raman yet.")
    else:
        raise RuntimeError("Data is not compatible 1D or 2D Raman data.")

    intensity = raman_data[0]["data"]
    wavenumber_size = raman_data[0]["axes"][0]["size"]
    wavenumber_offset = raman_data[0]["axes"][0]["offset"]
    wavenumber_scale = raman_data[0]["axes"][0]["scale"]
    wavenumbers = np.array(
        [wavenumber_offset + i * wavenumber_scale for i in range(wavenumber_size)]
    )
    df = pd.DataFrame({"wavenumber": wavenumbers, "intensity": intensity})
    return df, raman_data[0]["metadata"]
generate_raman_plot(self)
Source code in pydatalab/apps/raman/blocks.py
def generate_raman_plot(self):
    file_info = None
    pattern_dfs = None

    if "file_id" not in self.data:
        return None

    else:
        file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
        ext = os.path.splitext(file_info["location"].split("/")[-1])[-1].lower()
        if ext not in self.accepted_file_extensions:
            raise RuntimeError(
                "RamanBlock.generate_raman_plot(): Unsupported file extension (must be one of %s), not %s",
                self.accepted_file_extensions,
                ext,
            )
        pattern_dfs, _, y_options = self.load(file_info["location"])
        pattern_dfs = [pattern_dfs]

    if pattern_dfs:
        p = selectable_axes_plot(
            pattern_dfs,
            x_options=["wavenumber"],
            y_options=y_options,
            plot_line=True,
            plot_points=True,
            point_size=3,
        )

        self.data["bokeh_plot_data"] = bokeh.embed.json_item(p, theme=DATALAB_BOKEH_THEME)