Skip to content

NMR

pydatalab.apps.nmr special

blocks

NMRBlock (DataBlock)

Source code in pydatalab/apps/nmr/blocks.py
class NMRBlock(DataBlock):
    blocktype = "nmr"
    name = "NMR"
    description = "A simple NMR block for visualizing 1D NMR data from Bruker projects."

    accepted_file_extensions = (".zip",)
    defaults = {"process number": 1}
    _supports_collections = False

    @property
    def plot_functions(self):
        return (self.generate_nmr_plot,)

    def read_bruker_nmr_data(self):
        if "file_id" not in self.data:
            LOGGER.warning("NMRPlot.read_bruker_nmr_data(): No file set in the DataBlock")
            return

        zip_file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
        filename = zip_file_info["name"]

        name, ext = os.path.splitext(filename)
        if ext.lower() not in self.accepted_file_extensions:
            LOGGER.warning(
                "NMRBlock.read_bruker_nmr_data(): Unsupported file extension (must be .zip)"
            )
            return

        # unzip:
        directory_location = zip_file_info["location"] + ".extracted"
        LOGGER.debug(f"Directory location is: {directory_location}")
        with zipfile.ZipFile(zip_file_info["location"], "r") as zip_ref:
            zip_ref.extractall(directory_location)

        extracted_directory_name = os.path.join(directory_location, name)
        available_processes = os.listdir(os.path.join(extracted_directory_name, "pdata"))

        if self.data.get("selected_process") not in available_processes:
            self.data["selected_process"] = available_processes[0]

        try:
            df, a_dic, topspin_title, processed_data_shape = read_bruker_1d(
                os.path.join(directory_location, name),
                process_number=self.data["selected_process"],
                verbose=False,
            )
        except Exception as error:
            LOGGER.critical(f"Unable to parse {name} as Bruker project. {error}")
            return

        serialized_df = df.to_dict() if (df is not None) else None

        # all data sorted in a fairly raw way
        self.data["processed_data"] = serialized_df
        self.data["acquisition_parameters"] = a_dic["acqus"]
        self.data["processing_parameters"] = a_dic["procs"]
        self.data["pulse_program"] = a_dic["pprog"]

        # specific things that we might want to pull out for the UI:
        self.data["available_processes"] = available_processes
        self.data["nucleus"] = a_dic["acqus"]["NUC1"]
        self.data["carrier_frequency_MHz"] = a_dic["acqus"]["SFO1"]
        self.data["carrier_offset_Hz"] = a_dic["acqus"]["O1"]
        self.data["recycle_delay"] = a_dic["acqus"]["D"][1]
        self.data["nscans"] = a_dic["acqus"]["NS"]
        self.data["CNST31"] = a_dic["acqus"]["CNST"][31]
        self.data["processed_data_shape"] = processed_data_shape

        self.data["probe_name"] = a_dic["acqus"]["PROBHD"]
        self.data["pulse_program_name"] = a_dic["acqus"]["PULPROG"]
        self.data["topspin_title"] = topspin_title

    def generate_nmr_plot(self):
        # currently calls every time plotting happens, but it should only happen if the file was updated
        self.read_bruker_nmr_data()
        if "processed_data" not in self.data or not self.data["processed_data"]:
            self.data["bokeh_plot_data"] = None
            return

        df = pd.DataFrame(self.data["processed_data"])
        df["normalized intensity"] = df.intensity / df.intensity.max()

        bokeh_layout = selectable_axes_plot(
            df,
            x_options=["ppm", "hz"],
            y_options=[
                "intensity",
                "intensity_per_scan",
                "normalized intensity",
            ],
            plot_line=True,
            point_size=3,
        )
        # flip x axis, per NMR convention. Note that the figure is the second element
        # of the layout in the current implementation, but this could be fragile.
        bokeh_layout.children[1].x_range.flipped = True

        self.data["bokeh_plot_data"] = bokeh.embed.json_item(
            bokeh_layout, theme=DATALAB_BOKEH_THEME
        )
accepted_file_extensions: tuple[str, ...] | None

A list of file extensions that the block will attempt to read.

blocktype: str

A short (unique) string key specifying the type of block.

defaults: Dict[str, Any]

Any default values that should be set if they are not supplied during block init.

description: str

A longer description outlining the purpose and capability of the block.

name: str

The human-readable block name specifying which technique or file format it pertains to.

plot_functions property readonly
read_bruker_nmr_data(self)
Source code in pydatalab/apps/nmr/blocks.py
def read_bruker_nmr_data(self):
    if "file_id" not in self.data:
        LOGGER.warning("NMRPlot.read_bruker_nmr_data(): No file set in the DataBlock")
        return

    zip_file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
    filename = zip_file_info["name"]

    name, ext = os.path.splitext(filename)
    if ext.lower() not in self.accepted_file_extensions:
        LOGGER.warning(
            "NMRBlock.read_bruker_nmr_data(): Unsupported file extension (must be .zip)"
        )
        return

    # unzip:
    directory_location = zip_file_info["location"] + ".extracted"
    LOGGER.debug(f"Directory location is: {directory_location}")
    with zipfile.ZipFile(zip_file_info["location"], "r") as zip_ref:
        zip_ref.extractall(directory_location)

    extracted_directory_name = os.path.join(directory_location, name)
    available_processes = os.listdir(os.path.join(extracted_directory_name, "pdata"))

    if self.data.get("selected_process") not in available_processes:
        self.data["selected_process"] = available_processes[0]

    try:
        df, a_dic, topspin_title, processed_data_shape = read_bruker_1d(
            os.path.join(directory_location, name),
            process_number=self.data["selected_process"],
            verbose=False,
        )
    except Exception as error:
        LOGGER.critical(f"Unable to parse {name} as Bruker project. {error}")
        return

    serialized_df = df.to_dict() if (df is not None) else None

    # all data sorted in a fairly raw way
    self.data["processed_data"] = serialized_df
    self.data["acquisition_parameters"] = a_dic["acqus"]
    self.data["processing_parameters"] = a_dic["procs"]
    self.data["pulse_program"] = a_dic["pprog"]

    # specific things that we might want to pull out for the UI:
    self.data["available_processes"] = available_processes
    self.data["nucleus"] = a_dic["acqus"]["NUC1"]
    self.data["carrier_frequency_MHz"] = a_dic["acqus"]["SFO1"]
    self.data["carrier_offset_Hz"] = a_dic["acqus"]["O1"]
    self.data["recycle_delay"] = a_dic["acqus"]["D"][1]
    self.data["nscans"] = a_dic["acqus"]["NS"]
    self.data["CNST31"] = a_dic["acqus"]["CNST"][31]
    self.data["processed_data_shape"] = processed_data_shape

    self.data["probe_name"] = a_dic["acqus"]["PROBHD"]
    self.data["pulse_program_name"] = a_dic["acqus"]["PULPROG"]
    self.data["topspin_title"] = topspin_title
generate_nmr_plot(self)
Source code in pydatalab/apps/nmr/blocks.py
def generate_nmr_plot(self):
    # currently calls every time plotting happens, but it should only happen if the file was updated
    self.read_bruker_nmr_data()
    if "processed_data" not in self.data or not self.data["processed_data"]:
        self.data["bokeh_plot_data"] = None
        return

    df = pd.DataFrame(self.data["processed_data"])
    df["normalized intensity"] = df.intensity / df.intensity.max()

    bokeh_layout = selectable_axes_plot(
        df,
        x_options=["ppm", "hz"],
        y_options=[
            "intensity",
            "intensity_per_scan",
            "normalized intensity",
        ],
        plot_line=True,
        point_size=3,
    )
    # flip x axis, per NMR convention. Note that the figure is the second element
    # of the layout in the current implementation, but this could be fragile.
    bokeh_layout.children[1].x_range.flipped = True

    self.data["bokeh_plot_data"] = bokeh.embed.json_item(
        bokeh_layout, theme=DATALAB_BOKEH_THEME
    )

utils

read_bruker_1d(data: pathlib.Path | pandas.core.frame.DataFrame, process_number: int = 1, verbose: bool = False, sample_mass_mg: float | None = None) -> tuple

Read a 1D bruker nmr spectrum and return it as a df.

Parameters:

Name Type Description Default
data pathlib.Path | pandas.core.frame.DataFrame

The directory of the full bruker data file, or a pandas DataFrame which will be returned without further processing.

required
process_number int

The process number of the processed data you want to plot [default: 1].

1
verbose bool

Whether to print information such as the spectrum title to stdout.

False
sample_mass_mg float | None

The (optional) sample mass. If provided, the resulting DataFrame will have a "intensity_per_scan_per_gram" column.

None

Returns:

Type Description
df

A pandas DataFrame containing the spectrum data, or None if the reading failed. a_dic: A dictionary containing the acquisition parameters. topspin_title: The title of the spectrum, as stored in the topspin "title" file. shape: The shape of the spectrum data array.

Source code in pydatalab/apps/nmr/utils.py
def read_bruker_1d(
    data: Path | pd.DataFrame,
    process_number: int = 1,
    verbose: bool = False,
    sample_mass_mg: float | None = None,
) -> tuple[pd.DataFrame | None, dict, str | None, tuple[int, ...]]:
    """Read a 1D bruker nmr spectrum and return it as a df.

    Parameters:
        data: The directory of the full bruker data file, or a pandas DataFrame which
            will be returned without further processing.
        process_number: The process number of the processed data you want to plot [default: 1].
        verbose: Whether to print information such as the spectrum title to stdout.
        sample_mass_mg: The (optional) sample mass. If provided, the resulting DataFrame will have a "intensity_per_scan_per_gram" column.

    Returns:
        df: A pandas DataFrame containing the spectrum data, or None if the reading failed.
        a_dic: A dictionary containing the acquisition parameters.
        topspin_title: The title of the spectrum, as stored in the topspin "title" file.
        shape: The shape of the spectrum data array.

    """

    # if df is provided, just return it as-is. This functionality is provided to make functions calling read_bruker_1d flexible by default.
    # Either the data directory or the already-processed df can always be provided with equivalent results.

    if isinstance(data, pd.DataFrame):
        if verbose:
            print("data frame provided to read_bruker_1d(). Returning it as is.")
        return data
    else:
        data_dir = Path(data)

    processed_data_dir = data_dir / "pdata" / str(process_number)

    a_dic, a_data = ng.fileio.bruker.read(str(data_dir))  # aquisition_data
    p_dic, p_data = ng.fileio.bruker.read_pdata(str(processed_data_dir))  # processing data

    try:
        with open(os.path.join(processed_data_dir, "title")) as f:
            topspin_title = f.read()
    except FileNotFoundError:
        topspin_title = None

    if len(p_data.shape) > 1:
        return None, a_dic, topspin_title, p_data.shape

    nscans = a_dic["acqus"]["NS"]

    # create a unit convertor to get the x-axis in ppm units
    udic = ng.bruker.guess_udic(p_dic, p_data)
    uc = ng.fileiobase.uc_from_udic(udic)

    ppm_scale = uc.ppm_scale()
    hz_scale = uc.hz_scale()

    df = pd.DataFrame(
        {
            "ppm": ppm_scale,
            "hz": hz_scale,
            "intensity": p_data,
            "intensity_per_scan": p_data / nscans,
        }
    )
    if sample_mass_mg:
        df["intensity_per_scan_per_gram"] = df["intensity_per_scan"] / sample_mass_mg * 1000.0

    if verbose:
        print(f"reading bruker data file. {udic[0]['label']} 1D spectrum, {nscans} scans.")
        if sample_mass_mg:
            print(
                f'sample mass was provided: {sample_mass_mg:f} mg. "intensity_per_scan_per_gram" column included. '
            )
        if topspin_title:
            print("\nTitle:\n")
            print(topspin_title)
        else:
            print("No title found in scan")

    return df, a_dic, topspin_title, a_data.shape

read_topspin_txt(filename, sample_mass_mg = None, nscans = None)

Source code in pydatalab/apps/nmr/utils.py
def read_topspin_txt(filename, sample_mass_mg=None, nscans=None):
    MAX_HEADER_LINES = 10
    LEFTRIGHT_REGEX = r"# LEFT = (-?\d+\.\d+) ppm. RIGHT = (-?\d+\.\d+) ppm\."
    SIZE_REGEX = r"SIZE = (\d+)"

    with open(filename) as f:
        header = "".join(itertools.islice(f, MAX_HEADER_LINES))  # read the first 10 lines
    # print(header)

    leftright_match = re.search(LEFTRIGHT_REGEX, header)
    if not leftright_match:
        raise ValueError("Header improperly formatted. Could not find LEFT and/or RIGHT values")
    left = float(leftright_match.group(1))
    right = float(leftright_match.group(2))

    size_match = re.search(SIZE_REGEX, header)
    if not size_match:
        raise ValueError("Header improperly formatter. Could not find SIZE value")
    size = int(size_match.group(1))

    intensity = np.genfromtxt(filename, comments="#")
    assert len(intensity) == size, "length of intensities does not match I"

    data = {
        "ppm": np.linspace(left, right, size),
        "intensity": intensity,
        "I_norm": (intensity - intensity.min()) / (intensity.max() - intensity.min()),
    }

    if sample_mass_mg and nscans:
        data["I_per_g_per_scan"] = intensity / float(sample_mass_mg) / float(nscans) * 1000

    df = pd.DataFrame(data)
    return df

integrate_1d(data, process_number = 1, sample_mass_mg = None, left = None, right = None, plot = False, verbose = False)

Source code in pydatalab/apps/nmr/utils.py
def integrate_1d(
    data,
    process_number=1,
    sample_mass_mg=None,
    left=None,
    right=None,
    plot=False,
    verbose=False,
):
    intensity_cols = ["intensity", "intensity_per_scan", "intensity_per_scan_per_gram"]
    df = read_bruker_1d(
        data, process_number=process_number, sample_mass_mg=sample_mass_mg, verbose=verbose
    )
    if left:
        df = df[df.ppm >= left]
    if right:
        df = df[df.ppm <= right]

    if plot:
        plt.plot(df.ppm, df.intensity, "-")
        plt.plot([left, right], [0, 0], "k-", zorder=-1)
        plt.xlim(left, right)
        plt.show()

    integrated_intensities = pd.Series()
    for c in intensity_cols:
        if c not in df:
            integrated_intensities[c] = None
            continue
        integrated_intensities[c] = -1 * integrate.trapz(df[c], df.ppm)

    return integrated_intensities