NMR
pydatalab.apps.nmr
special
¶
blocks
¶
NMRBlock (DataBlock)
¶
Source code in pydatalab/apps/nmr/blocks.py
class NMRBlock(DataBlock):
blocktype = "nmr"
name = "NMR"
description = "A simple NMR block for visualizing 1D NMR data from Bruker projects."
accepted_file_extensions = (".zip",)
defaults = {"process number": 1}
_supports_collections = False
@property
def plot_functions(self):
return (self.generate_nmr_plot,)
def read_bruker_nmr_data(self):
if "file_id" not in self.data:
LOGGER.warning("NMRPlot.read_bruker_nmr_data(): No file set in the DataBlock")
return
zip_file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
filename = zip_file_info["name"]
name, ext = os.path.splitext(filename)
if ext.lower() not in self.accepted_file_extensions:
LOGGER.warning(
"NMRBlock.read_bruker_nmr_data(): Unsupported file extension (must be .zip)"
)
return
# unzip:
directory_location = zip_file_info["location"] + ".extracted"
LOGGER.debug(f"Directory location is: {directory_location}")
with zipfile.ZipFile(zip_file_info["location"], "r") as zip_ref:
zip_ref.extractall(directory_location)
extracted_directory_name = os.path.join(directory_location, name)
available_processes = os.listdir(os.path.join(extracted_directory_name, "pdata"))
if self.data.get("selected_process") not in available_processes:
self.data["selected_process"] = available_processes[0]
try:
df, a_dic, topspin_title, processed_data_shape = read_bruker_1d(
os.path.join(directory_location, name),
process_number=self.data["selected_process"],
verbose=False,
)
except Exception as error:
LOGGER.critical(f"Unable to parse {name} as Bruker project. {error}")
return
serialized_df = df.to_dict() if (df is not None) else None
# all data sorted in a fairly raw way
self.data["processed_data"] = serialized_df
self.data["acquisition_parameters"] = a_dic["acqus"]
self.data["processing_parameters"] = a_dic["procs"]
self.data["pulse_program"] = a_dic["pprog"]
# specific things that we might want to pull out for the UI:
self.data["available_processes"] = available_processes
self.data["nucleus"] = a_dic["acqus"]["NUC1"]
self.data["carrier_frequency_MHz"] = a_dic["acqus"]["SFO1"]
self.data["carrier_offset_Hz"] = a_dic["acqus"]["O1"]
self.data["recycle_delay"] = a_dic["acqus"]["D"][1]
self.data["nscans"] = a_dic["acqus"]["NS"]
self.data["CNST31"] = a_dic["acqus"]["CNST"][31]
self.data["processed_data_shape"] = processed_data_shape
self.data["probe_name"] = a_dic["acqus"]["PROBHD"]
self.data["pulse_program_name"] = a_dic["acqus"]["PULPROG"]
self.data["topspin_title"] = topspin_title
def generate_nmr_plot(self):
# currently calls every time plotting happens, but it should only happen if the file was updated
self.read_bruker_nmr_data()
if "processed_data" not in self.data or not self.data["processed_data"]:
self.data["bokeh_plot_data"] = None
return
df = pd.DataFrame(self.data["processed_data"])
df["normalized intensity"] = df.intensity / df.intensity.max()
bokeh_layout = selectable_axes_plot(
df,
x_options=["ppm", "hz"],
y_options=[
"intensity",
"intensity_per_scan",
"normalized intensity",
],
plot_line=True,
point_size=3,
)
# flip x axis, per NMR convention. Note that the figure is the second element
# of the layout in the current implementation, but this could be fragile.
bokeh_layout.children[1].x_range.flipped = True
self.data["bokeh_plot_data"] = bokeh.embed.json_item(
bokeh_layout, theme=DATALAB_BOKEH_THEME
)
accepted_file_extensions: tuple[str, ...] | None
¶
A list of file extensions that the block will attempt to read.
blocktype: str
¶
A short (unique) string key specifying the type of block.
defaults: Dict[str, Any]
¶
Any default values that should be set if they are not supplied during block init.
description: str
¶
A longer description outlining the purpose and capability of the block.
name: str
¶
The human-readable block name specifying which technique or file format it pertains to.
plot_functions
property
readonly
¶
read_bruker_nmr_data(self)
¶
Source code in pydatalab/apps/nmr/blocks.py
def read_bruker_nmr_data(self):
if "file_id" not in self.data:
LOGGER.warning("NMRPlot.read_bruker_nmr_data(): No file set in the DataBlock")
return
zip_file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
filename = zip_file_info["name"]
name, ext = os.path.splitext(filename)
if ext.lower() not in self.accepted_file_extensions:
LOGGER.warning(
"NMRBlock.read_bruker_nmr_data(): Unsupported file extension (must be .zip)"
)
return
# unzip:
directory_location = zip_file_info["location"] + ".extracted"
LOGGER.debug(f"Directory location is: {directory_location}")
with zipfile.ZipFile(zip_file_info["location"], "r") as zip_ref:
zip_ref.extractall(directory_location)
extracted_directory_name = os.path.join(directory_location, name)
available_processes = os.listdir(os.path.join(extracted_directory_name, "pdata"))
if self.data.get("selected_process") not in available_processes:
self.data["selected_process"] = available_processes[0]
try:
df, a_dic, topspin_title, processed_data_shape = read_bruker_1d(
os.path.join(directory_location, name),
process_number=self.data["selected_process"],
verbose=False,
)
except Exception as error:
LOGGER.critical(f"Unable to parse {name} as Bruker project. {error}")
return
serialized_df = df.to_dict() if (df is not None) else None
# all data sorted in a fairly raw way
self.data["processed_data"] = serialized_df
self.data["acquisition_parameters"] = a_dic["acqus"]
self.data["processing_parameters"] = a_dic["procs"]
self.data["pulse_program"] = a_dic["pprog"]
# specific things that we might want to pull out for the UI:
self.data["available_processes"] = available_processes
self.data["nucleus"] = a_dic["acqus"]["NUC1"]
self.data["carrier_frequency_MHz"] = a_dic["acqus"]["SFO1"]
self.data["carrier_offset_Hz"] = a_dic["acqus"]["O1"]
self.data["recycle_delay"] = a_dic["acqus"]["D"][1]
self.data["nscans"] = a_dic["acqus"]["NS"]
self.data["CNST31"] = a_dic["acqus"]["CNST"][31]
self.data["processed_data_shape"] = processed_data_shape
self.data["probe_name"] = a_dic["acqus"]["PROBHD"]
self.data["pulse_program_name"] = a_dic["acqus"]["PULPROG"]
self.data["topspin_title"] = topspin_title
generate_nmr_plot(self)
¶
Source code in pydatalab/apps/nmr/blocks.py
def generate_nmr_plot(self):
# currently calls every time plotting happens, but it should only happen if the file was updated
self.read_bruker_nmr_data()
if "processed_data" not in self.data or not self.data["processed_data"]:
self.data["bokeh_plot_data"] = None
return
df = pd.DataFrame(self.data["processed_data"])
df["normalized intensity"] = df.intensity / df.intensity.max()
bokeh_layout = selectable_axes_plot(
df,
x_options=["ppm", "hz"],
y_options=[
"intensity",
"intensity_per_scan",
"normalized intensity",
],
plot_line=True,
point_size=3,
)
# flip x axis, per NMR convention. Note that the figure is the second element
# of the layout in the current implementation, but this could be fragile.
bokeh_layout.children[1].x_range.flipped = True
self.data["bokeh_plot_data"] = bokeh.embed.json_item(
bokeh_layout, theme=DATALAB_BOKEH_THEME
)
utils
¶
read_bruker_1d(data: pathlib.Path | pandas.core.frame.DataFrame, process_number: int = 1, verbose: bool = False, sample_mass_mg: float | None = None) -> tuple
¶
Read a 1D bruker nmr spectrum and return it as a df.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
pathlib.Path | pandas.core.frame.DataFrame |
The directory of the full bruker data file, or a pandas DataFrame which will be returned without further processing. |
required |
process_number |
int |
The process number of the processed data you want to plot [default: 1]. |
1 |
verbose |
bool |
Whether to print information such as the spectrum title to stdout. |
False |
sample_mass_mg |
float | None |
The (optional) sample mass. If provided, the resulting DataFrame will have a "intensity_per_scan_per_gram" column. |
None |
Returns:
Type | Description |
---|---|
df |
A pandas DataFrame containing the spectrum data, or None if the reading failed. a_dic: A dictionary containing the acquisition parameters. topspin_title: The title of the spectrum, as stored in the topspin "title" file. shape: The shape of the spectrum data array. |
Source code in pydatalab/apps/nmr/utils.py
def read_bruker_1d(
data: Path | pd.DataFrame,
process_number: int = 1,
verbose: bool = False,
sample_mass_mg: float | None = None,
) -> tuple[pd.DataFrame | None, dict, str | None, tuple[int, ...]]:
"""Read a 1D bruker nmr spectrum and return it as a df.
Parameters:
data: The directory of the full bruker data file, or a pandas DataFrame which
will be returned without further processing.
process_number: The process number of the processed data you want to plot [default: 1].
verbose: Whether to print information such as the spectrum title to stdout.
sample_mass_mg: The (optional) sample mass. If provided, the resulting DataFrame will have a "intensity_per_scan_per_gram" column.
Returns:
df: A pandas DataFrame containing the spectrum data, or None if the reading failed.
a_dic: A dictionary containing the acquisition parameters.
topspin_title: The title of the spectrum, as stored in the topspin "title" file.
shape: The shape of the spectrum data array.
"""
# if df is provided, just return it as-is. This functionality is provided to make functions calling read_bruker_1d flexible by default.
# Either the data directory or the already-processed df can always be provided with equivalent results.
if isinstance(data, pd.DataFrame):
if verbose:
print("data frame provided to read_bruker_1d(). Returning it as is.")
return data
else:
data_dir = Path(data)
processed_data_dir = data_dir / "pdata" / str(process_number)
a_dic, a_data = ng.fileio.bruker.read(str(data_dir)) # aquisition_data
p_dic, p_data = ng.fileio.bruker.read_pdata(str(processed_data_dir)) # processing data
try:
with open(os.path.join(processed_data_dir, "title")) as f:
topspin_title = f.read()
except FileNotFoundError:
topspin_title = None
if len(p_data.shape) > 1:
return None, a_dic, topspin_title, p_data.shape
nscans = a_dic["acqus"]["NS"]
# create a unit convertor to get the x-axis in ppm units
udic = ng.bruker.guess_udic(p_dic, p_data)
uc = ng.fileiobase.uc_from_udic(udic)
ppm_scale = uc.ppm_scale()
hz_scale = uc.hz_scale()
df = pd.DataFrame(
{
"ppm": ppm_scale,
"hz": hz_scale,
"intensity": p_data,
"intensity_per_scan": p_data / nscans,
}
)
if sample_mass_mg:
df["intensity_per_scan_per_gram"] = df["intensity_per_scan"] / sample_mass_mg * 1000.0
if verbose:
print(f"reading bruker data file. {udic[0]['label']} 1D spectrum, {nscans} scans.")
if sample_mass_mg:
print(
f'sample mass was provided: {sample_mass_mg:f} mg. "intensity_per_scan_per_gram" column included. '
)
if topspin_title:
print("\nTitle:\n")
print(topspin_title)
else:
print("No title found in scan")
return df, a_dic, topspin_title, a_data.shape
read_topspin_txt(filename, sample_mass_mg = None, nscans = None)
¶
Source code in pydatalab/apps/nmr/utils.py
def read_topspin_txt(filename, sample_mass_mg=None, nscans=None):
MAX_HEADER_LINES = 10
LEFTRIGHT_REGEX = r"# LEFT = (-?\d+\.\d+) ppm. RIGHT = (-?\d+\.\d+) ppm\."
SIZE_REGEX = r"SIZE = (\d+)"
with open(filename) as f:
header = "".join(itertools.islice(f, MAX_HEADER_LINES)) # read the first 10 lines
# print(header)
leftright_match = re.search(LEFTRIGHT_REGEX, header)
if not leftright_match:
raise ValueError("Header improperly formatted. Could not find LEFT and/or RIGHT values")
left = float(leftright_match.group(1))
right = float(leftright_match.group(2))
size_match = re.search(SIZE_REGEX, header)
if not size_match:
raise ValueError("Header improperly formatter. Could not find SIZE value")
size = int(size_match.group(1))
intensity = np.genfromtxt(filename, comments="#")
assert len(intensity) == size, "length of intensities does not match I"
data = {
"ppm": np.linspace(left, right, size),
"intensity": intensity,
"I_norm": (intensity - intensity.min()) / (intensity.max() - intensity.min()),
}
if sample_mass_mg and nscans:
data["I_per_g_per_scan"] = intensity / float(sample_mass_mg) / float(nscans) * 1000
df = pd.DataFrame(data)
return df
integrate_1d(data, process_number = 1, sample_mass_mg = None, left = None, right = None, plot = False, verbose = False)
¶
Source code in pydatalab/apps/nmr/utils.py
def integrate_1d(
data,
process_number=1,
sample_mass_mg=None,
left=None,
right=None,
plot=False,
verbose=False,
):
intensity_cols = ["intensity", "intensity_per_scan", "intensity_per_scan_per_gram"]
df = read_bruker_1d(
data, process_number=process_number, sample_mass_mg=sample_mass_mg, verbose=verbose
)
if left:
df = df[df.ppm >= left]
if right:
df = df[df.ppm <= right]
if plot:
plt.plot(df.ppm, df.intensity, "-")
plt.plot([left, right], [0, 0], "k-", zorder=-1)
plt.xlim(left, right)
plt.show()
integrated_intensities = pd.Series()
for c in intensity_cols:
if c not in df:
integrated_intensities[c] = None
continue
integrated_intensities[c] = -1 * integrate.trapz(df[c], df.ppm)
return integrated_intensities