pydatalab
special
¶
Modules¶
apps
special
¶
Modules¶
chat
special
¶
blocks
¶MAX_CONTEXT_SIZE
¶MODEL
¶
ChatBlock (DataBlock)
¶Source code in pydatalab/apps/chat/blocks.py
class ChatBlock(DataBlock):
blocktype = "chat"
description = "Virtual assistant"
accepted_file_extensions: Sequence[str] = []
__supports_collections = True
defaults = {
"system_prompt": """You are whinchat (lowercase w), a virtual data managment assistant that helps materials chemists manage their experimental data and plan experiments. You are deployed in the group of Professor Clare Grey in the Department of Chemistry at the University of Cambridge.
You are embedded within the program datalab, where you have access to JSON describing an ‘item’, or a collection of items, with connections to other items. These items may include experimental samples, starting materials, and devices (e.g. battery cells made out of experimental samples and starting materials).
Answer questions in markdown. Specify the language for all markdown code blocks. You can make diagrams by writing a mermaid code block or an svg code block. When writing mermaid code, you must use quotations around each of the labels (e.g. A["label1"] --> B["label2"])
Be as concise as possible. When saying your name, type a bird emoji right after whinchat 🐦.
""",
"temperature": 0.2,
"error_message": None,
}
openai.api_key = os.environ.get("OPENAI_API_KEY")
def to_db(self):
"""returns a dictionary with the data for this
block, ready to be input into mongodb"""
self.render()
return super().to_db()
@property
def plot_functions(self):
return (self.render,)
def render(self):
if not self.data.get("messages"):
if (item_id := self.data.get("item_id")) is not None:
info_json = self._prepare_item_json_for_chat(item_id)
elif (collection_id := self.data.get("collection_id")) is not None:
info_json = self._prepare_collection_json_for_chat(collection_id)
else:
raise RuntimeError("No item or collection id provided")
self.data["messages"] = [
{
"role": "system",
"content": self.defaults["system_prompt"],
},
{
"role": "user",
"content": f"""Here is the JSON data for the current item(s): {info_json}.
Start with a friendly introduction and give me a one sentence summary of what this is (not detailed, no information about specific masses). """,
},
]
if self.data.get("prompt"):
self.data["messages"].append(
{
"role": "user",
"content": self.data["prompt"],
}
)
self.data["prompt"] = None
token_count = num_tokens_from_messages(self.data["messages"])
self.data["token_count"] = token_count
if token_count >= MAX_CONTEXT_SIZE:
self.data[
"error_message"
] = f"""This conversation has reached its maximum context size and the chatbot won't be able to respond further ({token_count} tokens, max: {MAX_CONTEXT_SIZE}). Please make a new chat block to start fresh."""
return
try:
if self.data["messages"][-1].role not in ("user", "system"):
return
except AttributeError:
if self.data["messages"][-1]["role"] not in ("user", "system"):
return
try:
LOGGER.debug(
f"submitting request to OpenAI API for completion with last message role \"{self.data['messages'][-1]['role']}\" (message = {self.data['messages'][-1:]}). Temperature = {self.data['temperature']} (type {type(self.data['temperature'])})"
)
responses = openai.ChatCompletion.create(
model=MODEL,
messages=self.data["messages"],
temperature=self.data["temperature"],
max_tokens=min(
1024, MAX_CONTEXT_SIZE - token_count - 1
), # if less than 1024 tokens are left in the token, then indicate this
)
self.data["error_message"] = None
except openai.OpenAIError as exc:
LOGGER.debug("Received an error from OpenAI API: %s", exc)
self.data["error_message"] = f"Received an error from the OpenAi API: {exc}."
return
try:
self.data["messages"].append(responses["choices"][0].message)
except AttributeError:
self.data["messages"].append(responses["choices"][0]["message"])
self.data["model_name"] = MODEL
token_count = num_tokens_from_messages(self.data["messages"])
self.data["token_count"] = token_count
return
def _prepare_item_json_for_chat(self, item_id: str):
from pydatalab.routes.v0_1.items import get_item_data
item_info = get_item_data(item_id, load_blocks=False).json
model = ITEM_MODELS[item_info["item_data"]["type"]](**item_info["item_data"])
if model.blocks_obj:
model.blocks_obj = {
k: value for k, value in model.blocks_obj.items() if value["blocktype"] != "chat"
}
item_info = model.dict(exclude_none=True, exclude_unset=True)
item_info["type"] = model.type
# strip irrelevant or large fields
item_filenames = {
str(file["immutable_id"]): file["name"] for file in item_info.get("files", [])
}
for block in item_info.get("blocks_obj", {}).values():
block.pop("bokeh_plot_data", None)
block_fields_to_remove = ["item_id", "block_id"]
[block.pop(field, None) for field in block_fields_to_remove]
# nmr block fields to remove (need a more general way to do this)
NMR_fields_to_remove = [
"acquisition_parameters",
"carrier_offset_Hz",
"nscans",
"processed_data",
"processed_data_shape",
"processing_parameters",
"pulse_program",
"selected_process",
]
[block.pop(field, None) for field in NMR_fields_to_remove]
# replace file_id with the actual filename
file_id = block.pop("file_id", None)
if file_id:
block["file"] = item_filenames.get(file_id, None)
top_level_keys_to_remove = [
"display_order",
"creator_ids",
"refcode",
"last_modified",
"revision",
"revisions",
"immutable_id",
"file_ObjectIds",
]
for k in top_level_keys_to_remove:
item_info.pop(k, None)
for ind, f in enumerate(item_info.get("relationships", [])):
item_info["relationships"][ind] = {
k: v for k, v in f.items() if k in ["item_id", "type", "relation"]
}
item_info["files"] = [file["name"] for file in item_info.get("files", [])]
item_info["creators"] = [
creator["display_name"] for creator in item_info.get("creators", [])
]
# move blocks from blocks_obj to a simpler list to further cut down tokens,
# especially in alphanumeric block_id fields
item_info["blocks"] = [block for block in item_info.pop("blocks_obj", {}).values()]
item_info = {k: value for k, value in item_info.items() if value}
for key in [
"synthesis_constituents",
"positive_electrode",
"negative_electrode",
"electrolyte",
]:
if key in item_info:
for constituent in item_info[key]:
LOGGER.debug("iterating through constituents:")
LOGGER.debug(constituent)
if "quantity" in constituent:
constituent[
"quantity"
] = f"{constituent.get('quantity', 'unknown')} {constituent.get('unit', '')}"
constituent.pop("unit", None)
# Note manual replaces to help avoid escape sequences that take up extra tokens
item_info_json = (
json.dumps(item_info, cls=CustomJSONEncoder)
.replace('"', "'")
.replace(r"\'", "'")
.replace(r"\n", " ")
)
return item_info_json
def _prepare_collection_json_for_chat(self, collection_id: str):
from pydatalab.routes.v0_1.collections import get_collection
collection_data = get_collection(collection_id).json
if collection_data["status"] != "success":
raise RuntimeError(f"Attempt to get collection data for {collection_id} failed.")
children = collection_data["child_items"]
return (
"["
+ ",".join([self._prepare_item_json_for_chat(child["item_id"]) for child in children])
+ "]"
)
accepted_file_extensions: Sequence[str]
¶blocktype: str
¶defaults: Dict[str, Any]
¶description: str
¶plot_functions
property
readonly
¶to_db(self)
¶returns a dictionary with the data for this block, ready to be input into mongodb
Source code in pydatalab/apps/chat/blocks.py
def to_db(self):
"""returns a dictionary with the data for this
block, ready to be input into mongodb"""
self.render()
return super().to_db()
render(self)
¶Source code in pydatalab/apps/chat/blocks.py
def render(self):
if not self.data.get("messages"):
if (item_id := self.data.get("item_id")) is not None:
info_json = self._prepare_item_json_for_chat(item_id)
elif (collection_id := self.data.get("collection_id")) is not None:
info_json = self._prepare_collection_json_for_chat(collection_id)
else:
raise RuntimeError("No item or collection id provided")
self.data["messages"] = [
{
"role": "system",
"content": self.defaults["system_prompt"],
},
{
"role": "user",
"content": f"""Here is the JSON data for the current item(s): {info_json}.
Start with a friendly introduction and give me a one sentence summary of what this is (not detailed, no information about specific masses). """,
},
]
if self.data.get("prompt"):
self.data["messages"].append(
{
"role": "user",
"content": self.data["prompt"],
}
)
self.data["prompt"] = None
token_count = num_tokens_from_messages(self.data["messages"])
self.data["token_count"] = token_count
if token_count >= MAX_CONTEXT_SIZE:
self.data[
"error_message"
] = f"""This conversation has reached its maximum context size and the chatbot won't be able to respond further ({token_count} tokens, max: {MAX_CONTEXT_SIZE}). Please make a new chat block to start fresh."""
return
try:
if self.data["messages"][-1].role not in ("user", "system"):
return
except AttributeError:
if self.data["messages"][-1]["role"] not in ("user", "system"):
return
try:
LOGGER.debug(
f"submitting request to OpenAI API for completion with last message role \"{self.data['messages'][-1]['role']}\" (message = {self.data['messages'][-1:]}). Temperature = {self.data['temperature']} (type {type(self.data['temperature'])})"
)
responses = openai.ChatCompletion.create(
model=MODEL,
messages=self.data["messages"],
temperature=self.data["temperature"],
max_tokens=min(
1024, MAX_CONTEXT_SIZE - token_count - 1
), # if less than 1024 tokens are left in the token, then indicate this
)
self.data["error_message"] = None
except openai.OpenAIError as exc:
LOGGER.debug("Received an error from OpenAI API: %s", exc)
self.data["error_message"] = f"Received an error from the OpenAi API: {exc}."
return
try:
self.data["messages"].append(responses["choices"][0].message)
except AttributeError:
self.data["messages"].append(responses["choices"][0]["message"])
self.data["model_name"] = MODEL
token_count = num_tokens_from_messages(self.data["messages"])
self.data["token_count"] = token_count
return
num_tokens_from_messages(messages: Sequence[dict])
¶Source code in pydatalab/apps/chat/blocks.py
def num_tokens_from_messages(messages: Sequence[dict]):
# see: https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb
encoding = tiktoken.encoding_for_model(MODEL)
tokens_per_message = 4 # every message follows <|start|>{role/name}\n{content}<|end|>\n
tokens_per_name = -1 # if there's a name, the role is omitted
num_tokens = 0
for message in messages:
num_tokens += tokens_per_message
for key, value in message.items():
num_tokens += len(encoding.encode(value))
if key == "name":
num_tokens += tokens_per_name
num_tokens += 3 # every reply is primed with <|start|>assistant<|message|>
return num_tokens
eis
special
¶
EISBlock (DataBlock)
¶Source code in pydatalab/apps/eis/__init__.py
class EISBlock(DataBlock):
accepted_file_extensions = [".txt"]
blocktype = "eis"
name = "Electrochemical Impedance Spectroscopy"
description = "This block can plot EIS data from Ivium .txt files"
@property
def plot_functions(self):
return (self.generate_eis_plot,)
def generate_eis_plot(self):
file_info = None
# all_files = None
eis_data = None
if "file_id" not in self.data:
LOGGER.warning("No file set in the DataBlock")
return
else:
file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
ext = os.path.splitext(file_info["location"].split("/")[-1])[-1].lower()
if ext not in self.accepted_file_extensions:
LOGGER.warning(
"Unsupported file extension (must be one of %s, not %s)",
self.accepted_file_extensions,
ext,
)
return
eis_data = parse_ivium_eis_txt(Path(file_info["location"]))
if eis_data is not None:
plot = selectable_axes_plot(
eis_data,
x_options=["Re(Z) [Ω]"],
y_options=["-Im(Z) [Ω]"],
color_options=["Frequency [Hz]"],
color_mapper=LogColorMapper("Cividis256"),
plot_points=True,
plot_line=False,
tools=HoverTool(tooltips=[("Frequency [Hz]", "@{Frequency [Hz]}")]),
)
self.data["bokeh_plot_data"] = bokeh.embed.json_item(plot, theme=mytheme)
accepted_file_extensions: Sequence[str]
¶blocktype: str
¶description: str
¶name
¶plot_functions
property
readonly
¶generate_eis_plot(self)
¶Source code in pydatalab/apps/eis/__init__.py
def generate_eis_plot(self):
file_info = None
# all_files = None
eis_data = None
if "file_id" not in self.data:
LOGGER.warning("No file set in the DataBlock")
return
else:
file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
ext = os.path.splitext(file_info["location"].split("/")[-1])[-1].lower()
if ext not in self.accepted_file_extensions:
LOGGER.warning(
"Unsupported file extension (must be one of %s, not %s)",
self.accepted_file_extensions,
ext,
)
return
eis_data = parse_ivium_eis_txt(Path(file_info["location"]))
if eis_data is not None:
plot = selectable_axes_plot(
eis_data,
x_options=["Re(Z) [Ω]"],
y_options=["-Im(Z) [Ω]"],
color_options=["Frequency [Hz]"],
color_mapper=LogColorMapper("Cividis256"),
plot_points=True,
plot_line=False,
tools=HoverTool(tooltips=[("Frequency [Hz]", "@{Frequency [Hz]}")]),
)
self.data["bokeh_plot_data"] = bokeh.embed.json_item(plot, theme=mytheme)
parse_ivium_eis_txt(filename: Path)
¶Source code in pydatalab/apps/eis/__init__.py
def parse_ivium_eis_txt(filename: Path):
eis = pd.read_csv(filename, sep="\t")
eis["Z2 /ohm"] *= -1
eis.rename(
{"Z1 /ohm": "Re(Z) [Ω]", "Z2 /ohm": "-Im(Z) [Ω]", "freq. /Hz": "Frequency [Hz]"},
inplace=True,
axis="columns",
)
return eis
tga
special
¶
blocks
¶
MassSpecBlock (DataBlock)
¶Source code in pydatalab/apps/tga/blocks.py
class MassSpecBlock(DataBlock):
blocktype = "ms"
description = "Mass spectrometry (MS)"
accepted_file_extensions = (".asc",)
@property
def plot_functions(self):
return (self.generate_ms_plot,)
def generate_ms_plot(self):
file_info = None
# all_files = None
ms_data = None
if "file_id" not in self.data:
LOGGER.warning("No file set in the DataBlock")
return
else:
file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
ext = os.path.splitext(file_info["location"].split("/")[-1])[-1].lower()
if ext not in self.accepted_file_extensions:
LOGGER.warning(
"Unsupported file extension (must be one of %s, not %s)",
self.accepted_file_extensions,
ext,
)
return
ms_data = parse_mt_mass_spec_ascii(Path(file_info["location"]))
x_options = ["Time Relative [s]"]
if ms_data:
# collect the maximum value of the data key for each species for plot ordering
max_vals: List[Tuple[str, float]] = []
for species in ms_data["data"]:
data_key = (
"Partial Pressure [mbar]"
if "Partial Pressure [mbar]" in ms_data["data"][species]
else "Ion Current [A]"
)
data = ms_data["data"][species][data_key].to_numpy()
ms_data["data"][species][f"{data_key} (Savitzky-Golay)"] = savgol_filter(
data, len(data) // 10, 3
)
max_vals.append((species, ms_data["data"][species][data_key].max()))
plots = []
for ind, (species, _) in enumerate(sorted(max_vals, key=lambda x: x[1], reverse=True)):
plots.append(
selectable_axes_plot(
{species: ms_data["data"][species]},
x_options=x_options,
y_options=[data_key],
y_default=[
f"{data_key} (Savitzky-Golay)",
f"{data_key}",
],
label_x=(ind == 0),
label_y=(ind == 0),
plot_line=True,
plot_points=False,
plot_title=f"Channel name: {species}",
plot_index=ind,
aspect_ratio=1.5,
)
)
plots[-1].children[0].xaxis[0].ticker.desired_num_ticks = 2
# construct MxN grid of all species
M = 3
grid = []
for i in range(0, len(plots), M):
grid.append(plots[i : i + M])
p = gridplot(grid, sizing_mode="scale_width", toolbar_location="below")
self.data["bokeh_plot_data"] = bokeh.embed.json_item(p, theme=grid_theme)
accepted_file_extensions: Sequence[str]
¶blocktype: str
¶description: str
¶plot_functions
property
readonly
¶generate_ms_plot(self)
¶Source code in pydatalab/apps/tga/blocks.py
def generate_ms_plot(self):
file_info = None
# all_files = None
ms_data = None
if "file_id" not in self.data:
LOGGER.warning("No file set in the DataBlock")
return
else:
file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
ext = os.path.splitext(file_info["location"].split("/")[-1])[-1].lower()
if ext not in self.accepted_file_extensions:
LOGGER.warning(
"Unsupported file extension (must be one of %s, not %s)",
self.accepted_file_extensions,
ext,
)
return
ms_data = parse_mt_mass_spec_ascii(Path(file_info["location"]))
x_options = ["Time Relative [s]"]
if ms_data:
# collect the maximum value of the data key for each species for plot ordering
max_vals: List[Tuple[str, float]] = []
for species in ms_data["data"]:
data_key = (
"Partial Pressure [mbar]"
if "Partial Pressure [mbar]" in ms_data["data"][species]
else "Ion Current [A]"
)
data = ms_data["data"][species][data_key].to_numpy()
ms_data["data"][species][f"{data_key} (Savitzky-Golay)"] = savgol_filter(
data, len(data) // 10, 3
)
max_vals.append((species, ms_data["data"][species][data_key].max()))
plots = []
for ind, (species, _) in enumerate(sorted(max_vals, key=lambda x: x[1], reverse=True)):
plots.append(
selectable_axes_plot(
{species: ms_data["data"][species]},
x_options=x_options,
y_options=[data_key],
y_default=[
f"{data_key} (Savitzky-Golay)",
f"{data_key}",
],
label_x=(ind == 0),
label_y=(ind == 0),
plot_line=True,
plot_points=False,
plot_title=f"Channel name: {species}",
plot_index=ind,
aspect_ratio=1.5,
)
)
plots[-1].children[0].xaxis[0].ticker.desired_num_ticks = 2
# construct MxN grid of all species
M = 3
grid = []
for i in range(0, len(plots), M):
grid.append(plots[i : i + M])
p = gridplot(grid, sizing_mode="scale_width", toolbar_location="below")
self.data["bokeh_plot_data"] = bokeh.embed.json_item(p, theme=grid_theme)
parsers
¶parse_mt_mass_spec_ascii(path: Path) -> Dict[str, Union[pandas.core.frame.DataFrame, Dict]]
¶Parses an .asc file containing MS results from a Mettler-Toledo
spectrometer and returns a dictionary with keys data
and meta
,
which themselves contain a dictionary of dataframes for each species
with the species names/masses as keys, and a dictionary of
metadata fields respectively.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Path |
The path of the file to parse. |
required |
Source code in pydatalab/apps/tga/parsers.py
def parse_mt_mass_spec_ascii(path: Path) -> Dict[str, Union[pd.DataFrame, Dict]]:
"""Parses an .asc file containing MS results from a Mettler-Toledo
spectrometer and returns a dictionary with keys `data` and `meta`,
which themselves contain a dictionary of dataframes for each species
with the species names/masses as keys, and a dictionary of
metadata fields respectively.
Parameters:
path: The path of the file to parse.
"""
header_keys = ("Sourcefile", "Exporttime", "Start Time", "End Time")
data_keys = ("Time Relative [s]", "Partial Pressure [mbar]", "Ion Current [A]")
header = {}
species = []
if not path.exists():
raise RuntimeError(f"Provided path does not exist: {path!r}")
with open(path, "r") as f:
# Read start of file until all header keys have been found
max_header_lines = 8
reads = 0
header_end = None
while reads < max_header_lines:
line = f.readline().strip()
reads += 1
if line:
for key in header_keys:
if key in line:
header[key] = line.split(key)[-1].strip()
if all(k in header for k in header_keys):
header_end = f.tell()
break
else:
raise ValueError(
f"Could not find all header keys in first {max_header_lines} lines of file."
)
for key in header_keys[1:]:
if "time" in key.lower():
header[key] = dateutil.parser.parse(header[key]) # type: ignore
reads = 0
max_species_lines = 10
while reads < max_species_lines:
line = f.readline().strip()
reads += 1
if not line:
continue
species = line.split()
break
else:
raise ValueError(
f"Could not find species list in lines {header_end}:{header_end + max_species_lines} lines of file."
)
# Read data with duplicated keys: will have (column number % number of data keys) appended to them
# MT software also writes "---" if the value is missing, so parse these as NaNs to remove later
df = pd.read_csv(f, sep="\t", header=0, parse_dates=False, na_values=["---"])
ms_results: Dict[str, Union[pd.DataFrame, Dict]] = {}
ms_results["meta"] = header
ms_results["data"] = {}
# Some files have Ion Current [A] or Partial Pressure [mbar] -- only rename those that are present
present_keys = set(df.columns.values) & set(data_keys)
for ind, specie in enumerate(species):
# Loop over all species and rename the columns to remove the species name and disaggregate as a dict
species_data_keys = [k + f"{'.' + str(ind) if ind != 0 else ''}" for k in present_keys]
ms_results["data"][specie] = df[species_data_keys].rename(
{mangled: original for mangled, original in zip(species_data_keys, present_keys)},
axis="columns",
)
# Drop time axis as format cannot be easily inferred and data is essentially duplicated: "Start Time" in header
# provides the timestamp of the first row
ms_results["data"][specie].drop("Time", axis="columns", inplace=True, errors="ignore")
# If the file was provided in an incomplete form, the final rows will be NaN, so drop them
ms_results["data"][specie].dropna(inplace=True)
return ms_results
xrd
special
¶
blocks
¶
XRDBlock (DataBlock)
¶Source code in pydatalab/apps/xrd/blocks.py
class XRDBlock(DataBlock):
blocktype = "xrd"
description = "Powder XRD"
accepted_file_extensions = (".xrdml", ".xy", ".dat", ".xye")
defaults = {"wavelength": 1.54060}
@property
def plot_functions(self):
return (self.generate_xrd_plot,)
@classmethod
def load_pattern(
self, location: str, wavelength: float = None
) -> Tuple[pd.DataFrame, List[str]]:
if not isinstance(location, str):
location = str(location)
ext = os.path.splitext(location.split("/")[-1])[-1].lower()
if ext == ".xrdml":
df = parse_xrdml(location)
elif ext == ".xy":
df = pd.read_csv(location, sep=r"\s+", names=["twotheta", "intensity"])
else:
df = pd.read_csv(location, sep=r"\s+", names=["twotheta", "intensity", "error"])
df = df.rename(columns={"twotheta": "2θ (°)"})
# if no wavelength (or invalid wavelength) is passed, don't convert to Q and d
if wavelength:
try:
df["Q (Å⁻¹)"] = 4 * np.pi / wavelength * np.sin(np.deg2rad(df["2θ (°)"]) / 2)
df["d (Å)"] = 2 * np.pi / df["Q (Å⁻¹)"]
except (ValueError, ZeroDivisionError):
pass
df["sqrt(intensity)"] = np.sqrt(df["intensity"])
df["log(intensity)"] = np.log10(df["intensity"])
df["normalized intensity"] = df["intensity"] / np.max(df["intensity"])
polyfit_deg = 15
polyfit_baseline = np.poly1d(
np.polyfit(df["2θ (°)"], df["normalized intensity"], deg=polyfit_deg)
)(df["2θ (°)"])
df["intensity - polyfit baseline"] = df["normalized intensity"] - polyfit_baseline
df["intensity - polyfit baseline"] /= np.max(df["intensity - polyfit baseline"])
df[f"baseline (`numpy.polyfit`, deg={polyfit_deg})"] = polyfit_baseline / np.max(
df["intensity - polyfit baseline"]
)
kernel_size = 101
median_baseline = medfilt(df["normalized intensity"], kernel_size=kernel_size)
df["intensity - median baseline"] = df["normalized intensity"] - median_baseline
df["intensity - median baseline"] /= np.max(df["intensity - median baseline"])
df[
f"baseline (`scipy.signal.medfilt`, kernel_size={kernel_size})"
] = median_baseline / np.max(df["intensity - median baseline"])
df.index.name = location.split("/")[-1]
y_options = [
"normalized intensity",
"intensity",
"sqrt(intensity)",
"log(intensity)",
"intensity - median baseline",
f"baseline (`scipy.signal.medfilt`, kernel_size={kernel_size})",
"intensity - polyfit baseline",
f"baseline (`numpy.polyfit`, deg={polyfit_deg})",
]
return df, y_options
def generate_xrd_plot(self):
file_info = None
all_files = None
pattern_dfs = None
if "file_id" not in self.data:
# If no file set, try to plot them all
item_info = flask_mongo.db.items.find_one(
{"item_id": self.data["item_id"]},
)
all_files = [
d
for d in [
get_file_info_by_id(f, update_if_live=False)
for f in item_info["file_ObjectIds"]
]
if any(d["name"].lower().endswith(ext) for ext in self.accepted_file_extensions)
]
if not all_files:
LOGGER.warning(
"XRDBlock.generate_xrd_plot(): Unsupported file extension (must be .xrdml or .xy)"
)
raise RuntimeError("XRDBlock.generate_xrd_plot(): No file set in DataBlock")
pattern_dfs = []
for f in all_files:
try:
pattern_df, y_options = self.load_pattern(
f["location"],
wavelength=float(self.data.get("wavelength", self.defaults["wavelength"])),
)
except Exception as exc:
raise RuntimeError(
f"Could not parse file {file_info['location']}. Error: {exc}"
)
pattern_dfs.append(pattern_df)
else:
file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
ext = os.path.splitext(file_info["location"].split("/")[-1])[-1].lower()
if ext not in self.accepted_file_extensions:
raise RuntimeError(
"XRDBlock.generate_xrd_plot(): Unsupported file extension (must be one of %s), not %s",
self.accepted_file_extensions,
ext,
)
pattern_dfs, y_options = self.load_pattern(
file_info["location"],
wavelength=float(self.data.get("wavelength", self.defaults["wavelength"])),
)
pattern_dfs = [pattern_dfs]
if pattern_dfs:
p = selectable_axes_plot(
pattern_dfs,
x_options=["2θ (°)", "Q (Å⁻¹)", "d (Å)"],
y_options=y_options,
plot_line=True,
plot_points=True,
point_size=3,
)
self.data["bokeh_plot_data"] = bokeh.embed.json_item(p, theme=mytheme)
accepted_file_extensions: Sequence[str]
¶blocktype: str
¶defaults: Dict[str, Any]
¶description: str
¶plot_functions
property
readonly
¶load_pattern(location: str, wavelength: float = None) -> Tuple[pandas.core.frame.DataFrame, List[str]]
classmethod
¶Source code in pydatalab/apps/xrd/blocks.py
@classmethod
def load_pattern(
self, location: str, wavelength: float = None
) -> Tuple[pd.DataFrame, List[str]]:
if not isinstance(location, str):
location = str(location)
ext = os.path.splitext(location.split("/")[-1])[-1].lower()
if ext == ".xrdml":
df = parse_xrdml(location)
elif ext == ".xy":
df = pd.read_csv(location, sep=r"\s+", names=["twotheta", "intensity"])
else:
df = pd.read_csv(location, sep=r"\s+", names=["twotheta", "intensity", "error"])
df = df.rename(columns={"twotheta": "2θ (°)"})
# if no wavelength (or invalid wavelength) is passed, don't convert to Q and d
if wavelength:
try:
df["Q (Å⁻¹)"] = 4 * np.pi / wavelength * np.sin(np.deg2rad(df["2θ (°)"]) / 2)
df["d (Å)"] = 2 * np.pi / df["Q (Å⁻¹)"]
except (ValueError, ZeroDivisionError):
pass
df["sqrt(intensity)"] = np.sqrt(df["intensity"])
df["log(intensity)"] = np.log10(df["intensity"])
df["normalized intensity"] = df["intensity"] / np.max(df["intensity"])
polyfit_deg = 15
polyfit_baseline = np.poly1d(
np.polyfit(df["2θ (°)"], df["normalized intensity"], deg=polyfit_deg)
)(df["2θ (°)"])
df["intensity - polyfit baseline"] = df["normalized intensity"] - polyfit_baseline
df["intensity - polyfit baseline"] /= np.max(df["intensity - polyfit baseline"])
df[f"baseline (`numpy.polyfit`, deg={polyfit_deg})"] = polyfit_baseline / np.max(
df["intensity - polyfit baseline"]
)
kernel_size = 101
median_baseline = medfilt(df["normalized intensity"], kernel_size=kernel_size)
df["intensity - median baseline"] = df["normalized intensity"] - median_baseline
df["intensity - median baseline"] /= np.max(df["intensity - median baseline"])
df[
f"baseline (`scipy.signal.medfilt`, kernel_size={kernel_size})"
] = median_baseline / np.max(df["intensity - median baseline"])
df.index.name = location.split("/")[-1]
y_options = [
"normalized intensity",
"intensity",
"sqrt(intensity)",
"log(intensity)",
"intensity - median baseline",
f"baseline (`scipy.signal.medfilt`, kernel_size={kernel_size})",
"intensity - polyfit baseline",
f"baseline (`numpy.polyfit`, deg={polyfit_deg})",
]
return df, y_options
generate_xrd_plot(self)
¶Source code in pydatalab/apps/xrd/blocks.py
def generate_xrd_plot(self):
file_info = None
all_files = None
pattern_dfs = None
if "file_id" not in self.data:
# If no file set, try to plot them all
item_info = flask_mongo.db.items.find_one(
{"item_id": self.data["item_id"]},
)
all_files = [
d
for d in [
get_file_info_by_id(f, update_if_live=False)
for f in item_info["file_ObjectIds"]
]
if any(d["name"].lower().endswith(ext) for ext in self.accepted_file_extensions)
]
if not all_files:
LOGGER.warning(
"XRDBlock.generate_xrd_plot(): Unsupported file extension (must be .xrdml or .xy)"
)
raise RuntimeError("XRDBlock.generate_xrd_plot(): No file set in DataBlock")
pattern_dfs = []
for f in all_files:
try:
pattern_df, y_options = self.load_pattern(
f["location"],
wavelength=float(self.data.get("wavelength", self.defaults["wavelength"])),
)
except Exception as exc:
raise RuntimeError(
f"Could not parse file {file_info['location']}. Error: {exc}"
)
pattern_dfs.append(pattern_df)
else:
file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
ext = os.path.splitext(file_info["location"].split("/")[-1])[-1].lower()
if ext not in self.accepted_file_extensions:
raise RuntimeError(
"XRDBlock.generate_xrd_plot(): Unsupported file extension (must be one of %s), not %s",
self.accepted_file_extensions,
ext,
)
pattern_dfs, y_options = self.load_pattern(
file_info["location"],
wavelength=float(self.data.get("wavelength", self.defaults["wavelength"])),
)
pattern_dfs = [pattern_dfs]
if pattern_dfs:
p = selectable_axes_plot(
pattern_dfs,
x_options=["2θ (°)", "Q (Å⁻¹)", "d (Å)"],
y_options=y_options,
plot_line=True,
plot_points=True,
point_size=3,
)
self.data["bokeh_plot_data"] = bokeh.embed.json_item(p, theme=mytheme)
models
¶
XRDPattern (BaseModel)
pydantic-model
¶This model defines the structure of the data that is expected for a solid-state XRD pattern.
Source code in pydatalab/apps/xrd/models.py
class XRDPattern(BaseModel):
"""This model defines the structure of the data that is expected
for a solid-state XRD pattern.
"""
wavelength: float
two_theta: List[float]
d_spacings: List[float]
q_values: List[float]
intensities: List[float]
XRDProcessing (BaseModel)
pydantic-model
¶Source code in pydatalab/apps/xrd/models.py
class XRDProcessing(BaseModel):
peak_positions: List[float]
peak_intensities: List[float]
peak_widths: List[float]
baselines: List[List[float]]
class Config:
extra = "allow"
XRDMetadata (BaseModel)
pydantic-model
¶Source code in pydatalab/apps/xrd/models.py
class XRDMetadata(BaseModel):
...
XRDMeasurement (BaseModel)
pydantic-model
¶utils
¶DATA_REGEX
¶STARTEND_REGEX
¶
XrdmlParseError (Exception)
¶Source code in pydatalab/apps/xrd/utils.py
class XrdmlParseError(Exception):
pass
parse_xrdml(filename: str) -> DataFrame
¶Parses an XRDML file and returns a pandas DataFrame with columns twotheta and intensity.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
The file to parse. |
required |
Source code in pydatalab/apps/xrd/utils.py
def parse_xrdml(filename: str) -> pd.DataFrame:
"""Parses an XRDML file and returns a pandas DataFrame with columns
twotheta and intensity.
Parameters:
filename: The file to parse.
"""
with open(filename, "r") as f:
s = f.read()
start, end = getStartEnd(s) # extract first and last angle
intensities = getIntensities(s) # extract intensities
angles = np.linspace(start, end, num=len(intensities))
return pd.DataFrame(
{
"twotheta": angles,
"intensity": intensities,
}
)
convertSinglePattern(filename: str, directory: str = '.', adjust_baseline: bool = False, overwrite: bool = False) -> str
¶Converts an XRDML file to a simple xy and writes it to the passed directory, without overwriting any existing files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
The file to convert. |
required |
directory |
str |
The output directory. |
'.' |
adjust_baseline |
bool |
If True, the baseline will be adjusted so that no points are negative. |
False |
overwrite |
bool |
If True, existing files with the same filenames will be overwritten. |
False |
Returns:
Type | Description |
---|---|
str |
The output filename. |
Source code in pydatalab/apps/xrd/utils.py
def convertSinglePattern(
filename: str,
directory: str = ".",
adjust_baseline: bool = False,
overwrite: bool = False,
) -> str:
"""Converts an XRDML file to a simple xy and writes it to the passed directory, without
overwriting any existing files.
Parameters:
filename: The file to convert.
directory: The output directory.
adjust_baseline: If True, the baseline will be adjusted so that no points are negative.
overwrite: If True, existing files with the same filenames will be overwritten.
Returns:
The output filename.
"""
filename = os.path.join(directory, filename)
outfn = filename + ".xy"
if os.path.exists(outfn):
if overwrite:
print(f"{outfn} already exists in the directory {directory}. Overwriting.")
else:
warnings.warn(
f"{outfn} already exists in the directory {directory}, will not overwrite"
)
return outfn
with open(filename, "r") as f:
s = f.read()
print(f"Processing file {filename}")
start, end = getStartEnd(s)
print(f"\tstart angle: {start}\tend angle: {end}")
intensities = getIntensities(s)
if adjust_baseline:
intensities = np.array(intensities) # type: ignore
minI = np.min(intensities)
if minI < 0:
print(
f"\tadjusting baseline so that no points are negative (adding {-1 * minI} counts)"
)
intensities -= minI
else:
print("\tno intensitites are less than zero, so no baseline adjustment performed")
intensities = intensities.tolist() # type: ignore
print(f"\tnumber of datapoints: {len(intensities)}")
xystring = toXY(intensities, start, end)
with open(outfn, "w") as of:
of.write(xystring)
print("\tSuccess!")
return outfn
getStartEnd(s: str) -> Tuple[float, float]
¶Parse a given string representation of an xrdml file to find the start and end 2Theta points of the scan. Note: this could match either Omega or 2Theta depending on their order in the XRDML file.
Exceptions:
Type | Description |
---|---|
XrdmlParseError |
if the start and end positions could not be found. |
Returns:
Type | Description |
---|---|
Tuple[float, float] |
(start, end) positions in the XRDML file. |
Source code in pydatalab/apps/xrd/utils.py
def getStartEnd(s: str) -> Tuple[float, float]:
"""Parse a given string representation of an xrdml file to find the start and end 2Theta points of the scan.
Note: this could match either Omega or 2Theta depending on their order in the XRDML file.
Raises:
XrdmlParseError: if the start and end positions could not be found.
Returns:
(start, end) positions in the XRDML file.
"""
match = re.search(STARTEND_REGEX, s)
if not match:
raise XrdmlParseError("the start and end 2theta positions were not found in the XRDML file")
start = float(match.group(1))
end = float(match.group(2))
return start, end
getIntensities(s: str) -> List[float]
¶Parse a given string representation of an xrdml file to find the peak intensities.
Exceptions:
Type | Description |
---|---|
XrdmlParseError |
if intensities could not be found in the file |
Returns:
Type | Description |
---|---|
List[float] |
The array of intensitites. |
Source code in pydatalab/apps/xrd/utils.py
def getIntensities(s: str) -> List[float]:
"""Parse a given string representation of an xrdml file to find the peak intensities.
Raises:
XrdmlParseError: if intensities could not be found in the file
Returns:
The array of intensitites.
"""
match = re.search(DATA_REGEX, s)
if not match:
raise XrdmlParseError("the intensitites were not found in the XML file")
out = [float(x) for x in match.group(1).split()] # the intensitites as a list of integers
return out
toXY(intensities: List[float], start: float, end: float) -> str
¶Converts a given list of intensities, along with a start and end angle, to a string in XY format.
Source code in pydatalab/apps/xrd/utils.py
def toXY(intensities: List[float], start: float, end: float) -> str:
"""Converts a given list of intensities, along with a start and end angle,
to a string in XY format.
"""
angles = np.linspace(start, end, num=len(intensities))
xylines = ["{:.5f} {:.3f}\r\n".format(a, i) for a, i in zip(angles, intensities)]
return "".join(xylines)
blocks
special
¶
BLOCKS: Sequence[Type[pydatalab.blocks.blocks.DataBlock]]
¶
BLOCK_TYPES: Dict[str, Type[pydatalab.blocks.blocks.DataBlock]]
¶
Modules¶
blocks
¶
DataBlock
¶base class for a data block.
Source code in pydatalab/blocks/blocks.py
class DataBlock:
"""base class for a data block."""
blocktype: str = "generic"
description: str = "Generic Block"
accepted_file_extensions: Sequence[str]
# values that are set by default if they are not supplied by the dictionary in init()
defaults: Dict[str, Any] = {}
# values cached on the block instance for faster retrieval
cache: Optional[Dict[str, Any]] = None
plot_functions: Optional[Sequence[Callable[[], None]]] = None
# whether this datablock can operate on collection data, or just individual items
__supports_collections: bool = False
def __init__(
self,
item_id: Optional[str] = None,
collection_id: Optional[str] = None,
dictionary=None,
unique_id=None,
):
if dictionary is None:
dictionary = {}
if item_id is None and not self.__supports_collections:
raise RuntimeError(f"Must supply `item_id` to make {self.__class__.__name__}.")
if collection_id is not None and not self.__supports_collections:
raise RuntimeError(
f"This block ({self.__class__.__name__}) does not support collections."
)
if item_id is not None and collection_id is not None:
raise RuntimeError("Must provide only one of `item_id` and `collection_id`.")
# Initialise cache
self.cache = {}
LOGGER.debug(
"Creating new block '%s' associated with item_id '%s'",
self.__class__.__name__,
item_id,
)
self.block_id = (
unique_id or generate_random_id()
) # this is supposed to be a unique id for use in html and the database.
self.data = {
"item_id": item_id,
"collection_id": collection_id,
"blocktype": self.blocktype,
"block_id": self.block_id,
**self.defaults,
}
# convert ObjectId file_ids to string to make handling them easier when sending to and from web
if "file_id" in self.data:
self.data["file_id"] = str(self.data["file_id"])
if "title" not in self.data:
self.data["title"] = self.description
self.data.update(
dictionary
) # this could overwrite blocktype and block_id. I think that's reasonable... maybe
LOGGER.debug(
"Initialised block %s for item ID %s or collection ID %s.",
self.__class__.__name__,
item_id,
collection_id,
)
def to_db(self):
"""returns a dictionary with the data for this
block, ready to be input into mongodb"""
LOGGER.debug("Casting block %s to database object.", self.__class__.__name__)
if "file_id" in self.data:
dict_for_db = self.data.copy() # gross, I know
dict_for_db["file_id"] = ObjectId(dict_for_db["file_id"])
return dict_for_db
if "bokeh_plot_data" in self.data:
self.data.pop("bokeh_plot_data")
return self.data
@classmethod
def from_db(cls, db_entry):
"""create a block from json (dictionary) stored in a db"""
LOGGER.debug("Loading block %s from database object.", cls.__class__.__name__)
new_block = cls(
item_id=db_entry.get("item_id"),
collection_id=db_entry.get("collection_id"),
dictionary=db_entry,
)
if "file_id" in new_block.data:
new_block.data["file_id"] = str(new_block.data["file_id"])
return new_block
def to_web(self):
"""returns a json-able dictionary to render the block on the web"""
if self.plot_functions:
for plot in self.plot_functions:
try:
plot()
except RuntimeError:
LOGGER.warning(
f"Could not create plot for {self.__class__.__name__}: {self.data}"
)
return self.data
@classmethod
def from_web(cls, data):
LOGGER.debug("Loading block %s from web request.", cls.__class__.__name__)
block = cls(
item_id=data.get("item_id"),
collection_id=data.get("collection_id"),
unique_id=data["block_id"],
)
block.update_from_web(data)
return block
def update_from_web(self, data):
"""update the object with data received from the website. Only updates fields
that are specified in the dictionary- other fields are left alone"""
LOGGER.debug(
"Updating block %s from web request",
self.__class__.__name__,
)
self.data.update(data)
return self
blocktype: str
¶cache: Optional[Dict[str, Any]]
¶defaults: Dict[str, Any]
¶description: str
¶plot_functions: Optional[Sequence[Callable[[], NoneType]]]
¶__init__(self, item_id: Optional[str] = None, collection_id: Optional[str] = None, dictionary = None, unique_id = None)
special
¶Source code in pydatalab/blocks/blocks.py
def __init__(
self,
item_id: Optional[str] = None,
collection_id: Optional[str] = None,
dictionary=None,
unique_id=None,
):
if dictionary is None:
dictionary = {}
if item_id is None and not self.__supports_collections:
raise RuntimeError(f"Must supply `item_id` to make {self.__class__.__name__}.")
if collection_id is not None and not self.__supports_collections:
raise RuntimeError(
f"This block ({self.__class__.__name__}) does not support collections."
)
if item_id is not None and collection_id is not None:
raise RuntimeError("Must provide only one of `item_id` and `collection_id`.")
# Initialise cache
self.cache = {}
LOGGER.debug(
"Creating new block '%s' associated with item_id '%s'",
self.__class__.__name__,
item_id,
)
self.block_id = (
unique_id or generate_random_id()
) # this is supposed to be a unique id for use in html and the database.
self.data = {
"item_id": item_id,
"collection_id": collection_id,
"blocktype": self.blocktype,
"block_id": self.block_id,
**self.defaults,
}
# convert ObjectId file_ids to string to make handling them easier when sending to and from web
if "file_id" in self.data:
self.data["file_id"] = str(self.data["file_id"])
if "title" not in self.data:
self.data["title"] = self.description
self.data.update(
dictionary
) # this could overwrite blocktype and block_id. I think that's reasonable... maybe
LOGGER.debug(
"Initialised block %s for item ID %s or collection ID %s.",
self.__class__.__name__,
item_id,
collection_id,
)
to_db(self)
¶returns a dictionary with the data for this block, ready to be input into mongodb
Source code in pydatalab/blocks/blocks.py
def to_db(self):
"""returns a dictionary with the data for this
block, ready to be input into mongodb"""
LOGGER.debug("Casting block %s to database object.", self.__class__.__name__)
if "file_id" in self.data:
dict_for_db = self.data.copy() # gross, I know
dict_for_db["file_id"] = ObjectId(dict_for_db["file_id"])
return dict_for_db
if "bokeh_plot_data" in self.data:
self.data.pop("bokeh_plot_data")
return self.data
from_db(db_entry)
classmethod
¶create a block from json (dictionary) stored in a db
Source code in pydatalab/blocks/blocks.py
@classmethod
def from_db(cls, db_entry):
"""create a block from json (dictionary) stored in a db"""
LOGGER.debug("Loading block %s from database object.", cls.__class__.__name__)
new_block = cls(
item_id=db_entry.get("item_id"),
collection_id=db_entry.get("collection_id"),
dictionary=db_entry,
)
if "file_id" in new_block.data:
new_block.data["file_id"] = str(new_block.data["file_id"])
return new_block
to_web(self)
¶returns a json-able dictionary to render the block on the web
Source code in pydatalab/blocks/blocks.py
def to_web(self):
"""returns a json-able dictionary to render the block on the web"""
if self.plot_functions:
for plot in self.plot_functions:
try:
plot()
except RuntimeError:
LOGGER.warning(
f"Could not create plot for {self.__class__.__name__}: {self.data}"
)
return self.data
from_web(data)
classmethod
¶Source code in pydatalab/blocks/blocks.py
@classmethod
def from_web(cls, data):
LOGGER.debug("Loading block %s from web request.", cls.__class__.__name__)
block = cls(
item_id=data.get("item_id"),
collection_id=data.get("collection_id"),
unique_id=data["block_id"],
)
block.update_from_web(data)
return block
update_from_web(self, data)
¶update the object with data received from the website. Only updates fields that are specified in the dictionary- other fields are left alone
Source code in pydatalab/blocks/blocks.py
def update_from_web(self, data):
"""update the object with data received from the website. Only updates fields
that are specified in the dictionary- other fields are left alone"""
LOGGER.debug(
"Updating block %s from web request",
self.__class__.__name__,
)
self.data.update(data)
return self
MediaBlock (DataBlock)
¶Source code in pydatalab/blocks/blocks.py
class MediaBlock(DataBlock):
blocktype = "media"
description = "Media"
accepted_file_extensions = (".png", ".jpeg", ".jpg", ".tif", ".tiff", ".mp4", ".mov", ".webm")
__supports_collections = False
@property
def plot_functions(self):
return (self.encode_tiff,)
def encode_tiff(self):
if "file_id" not in self.data:
LOGGER.warning("ImageBlock.encode_tiff(): No file set in the DataBlock")
return
if "b64_encoded_image" not in self.data:
self.data["b64_encoded_image"] = {}
file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
if file_info["name"].endswith(".tif") or file_info["name"].endswith(".tiff"):
im = Image.open(file_info["location"])
LOGGER.warning("Making base64 encoding of tif")
with io.BytesIO() as f:
im.save(f, format="PNG")
f.seek(0)
self.data["b64_encoded_image"][self.data["file_id"]] = base64.b64encode(
f.getvalue()
).decode()
accepted_file_extensions: Sequence[str]
¶blocktype: str
¶description: str
¶plot_functions
property
readonly
¶encode_tiff(self)
¶Source code in pydatalab/blocks/blocks.py
def encode_tiff(self):
if "file_id" not in self.data:
LOGGER.warning("ImageBlock.encode_tiff(): No file set in the DataBlock")
return
if "b64_encoded_image" not in self.data:
self.data["b64_encoded_image"] = {}
file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
if file_info["name"].endswith(".tif") or file_info["name"].endswith(".tiff"):
im = Image.open(file_info["location"])
LOGGER.warning("Making base64 encoding of tif")
with io.BytesIO() as f:
im.save(f, format="PNG")
f.seek(0)
self.data["b64_encoded_image"][self.data["file_id"]] = base64.b64encode(
f.getvalue()
).decode()
NMRBlock (DataBlock)
¶Source code in pydatalab/blocks/blocks.py
class NMRBlock(DataBlock):
blocktype = "nmr"
description = "Simple NMR Block"
accepted_file_extensions = ".zip"
defaults = {"process number": 1}
__supports_collections = False
@property
def plot_functions(self):
return (self.generate_nmr_plot,)
def read_bruker_nmr_data(self):
if "file_id" not in self.data:
LOGGER.warning("NMRPlot.read_bruker_nmr_data(): No file set in the DataBlock")
return
zip_file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
filename = zip_file_info["name"]
name, ext = os.path.splitext(filename)
if ext.lower() not in self.accepted_file_extensions:
LOGGER.warning(
"NMRBlock.read_bruker_nmr_data(): Unsupported file extension (must be .zip)"
)
return
# unzip:
directory_location = zip_file_info["location"] + ".extracted"
LOGGER.debug(f"Directory location is: {directory_location}")
with zipfile.ZipFile(zip_file_info["location"], "r") as zip_ref:
zip_ref.extractall(directory_location)
extracted_directory_name = os.path.join(directory_location, name)
available_processes = os.listdir(os.path.join(extracted_directory_name, "pdata"))
if self.data.get("selected_process") not in available_processes:
self.data["selected_process"] = available_processes[0]
try:
df, a_dic, topspin_title, processed_data_shape = nmr_utils.read_bruker_1d(
os.path.join(directory_location, name),
process_number=self.data["selected_process"],
verbose=False,
)
except Exception as error:
LOGGER.critical(f"Unable to parse {name} as Bruker project. {error}")
return
serialized_df = df.to_dict() if (df is not None) else None
# all data sorted in a fairly raw way
self.data["processed_data"] = serialized_df
self.data["acquisition_parameters"] = a_dic["acqus"]
self.data["processing_parameters"] = a_dic["procs"]
self.data["pulse_program"] = a_dic["pprog"]
# specific things that we might want to pull out for the UI:
self.data["available_processes"] = available_processes
self.data["nucleus"] = a_dic["acqus"]["NUC1"]
self.data["carrier_frequency_MHz"] = a_dic["acqus"]["SFO1"]
self.data["carrier_offset_Hz"] = a_dic["acqus"]["O1"]
self.data["recycle_delay"] = a_dic["acqus"]["D"][1]
self.data["nscans"] = a_dic["acqus"]["NS"]
self.data["CNST31"] = a_dic["acqus"]["CNST"][31]
self.data["processed_data_shape"] = processed_data_shape
self.data["probe_name"] = a_dic["acqus"]["PROBHD"]
self.data["pulse_program_name"] = a_dic["acqus"]["PULPROG"]
self.data["topspin_title"] = topspin_title
def generate_nmr_plot(self):
self.read_bruker_nmr_data() # currently calls every time plotting happens, but it should only happen if the file was updated
if "processed_data" not in self.data or not self.data["processed_data"]:
self.data["bokeh_plot_data"] = None
return
df = pd.DataFrame(self.data["processed_data"])
df["normalized intensity"] = df.intensity / df.intensity.max()
bokeh_layout = selectable_axes_plot(
df,
x_options=["ppm", "hz"],
y_options=[
"intensity",
"intensity_per_scan",
"normalized intensity",
],
plot_line=True,
point_size=3,
)
bokeh_layout.children[0].x_range.flipped = True # flip x axis, per NMR convention
self.data["bokeh_plot_data"] = bokeh.embed.json_item(bokeh_layout, theme=mytheme)
accepted_file_extensions: Sequence[str]
¶blocktype: str
¶defaults: Dict[str, Any]
¶description: str
¶plot_functions
property
readonly
¶read_bruker_nmr_data(self)
¶Source code in pydatalab/blocks/blocks.py
def read_bruker_nmr_data(self):
if "file_id" not in self.data:
LOGGER.warning("NMRPlot.read_bruker_nmr_data(): No file set in the DataBlock")
return
zip_file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
filename = zip_file_info["name"]
name, ext = os.path.splitext(filename)
if ext.lower() not in self.accepted_file_extensions:
LOGGER.warning(
"NMRBlock.read_bruker_nmr_data(): Unsupported file extension (must be .zip)"
)
return
# unzip:
directory_location = zip_file_info["location"] + ".extracted"
LOGGER.debug(f"Directory location is: {directory_location}")
with zipfile.ZipFile(zip_file_info["location"], "r") as zip_ref:
zip_ref.extractall(directory_location)
extracted_directory_name = os.path.join(directory_location, name)
available_processes = os.listdir(os.path.join(extracted_directory_name, "pdata"))
if self.data.get("selected_process") not in available_processes:
self.data["selected_process"] = available_processes[0]
try:
df, a_dic, topspin_title, processed_data_shape = nmr_utils.read_bruker_1d(
os.path.join(directory_location, name),
process_number=self.data["selected_process"],
verbose=False,
)
except Exception as error:
LOGGER.critical(f"Unable to parse {name} as Bruker project. {error}")
return
serialized_df = df.to_dict() if (df is not None) else None
# all data sorted in a fairly raw way
self.data["processed_data"] = serialized_df
self.data["acquisition_parameters"] = a_dic["acqus"]
self.data["processing_parameters"] = a_dic["procs"]
self.data["pulse_program"] = a_dic["pprog"]
# specific things that we might want to pull out for the UI:
self.data["available_processes"] = available_processes
self.data["nucleus"] = a_dic["acqus"]["NUC1"]
self.data["carrier_frequency_MHz"] = a_dic["acqus"]["SFO1"]
self.data["carrier_offset_Hz"] = a_dic["acqus"]["O1"]
self.data["recycle_delay"] = a_dic["acqus"]["D"][1]
self.data["nscans"] = a_dic["acqus"]["NS"]
self.data["CNST31"] = a_dic["acqus"]["CNST"][31]
self.data["processed_data_shape"] = processed_data_shape
self.data["probe_name"] = a_dic["acqus"]["PROBHD"]
self.data["pulse_program_name"] = a_dic["acqus"]["PULPROG"]
self.data["topspin_title"] = topspin_title
generate_nmr_plot(self)
¶Source code in pydatalab/blocks/blocks.py
def generate_nmr_plot(self):
self.read_bruker_nmr_data() # currently calls every time plotting happens, but it should only happen if the file was updated
if "processed_data" not in self.data or not self.data["processed_data"]:
self.data["bokeh_plot_data"] = None
return
df = pd.DataFrame(self.data["processed_data"])
df["normalized intensity"] = df.intensity / df.intensity.max()
bokeh_layout = selectable_axes_plot(
df,
x_options=["ppm", "hz"],
y_options=[
"intensity",
"intensity_per_scan",
"normalized intensity",
],
plot_line=True,
point_size=3,
)
bokeh_layout.children[0].x_range.flipped = True # flip x axis, per NMR convention
self.data["bokeh_plot_data"] = bokeh.embed.json_item(bokeh_layout, theme=mytheme)
generate_random_id()
¶This function generates a random 15-length string for use as an id for a datablock. It should be sufficiently random that there is a negligible risk of ever generating the same id twice, so this is a unique id that can be used as a unique database refrence and also can be used as id in the DOM. Note: uuid.uuid4() would do this too, but I think the generated ids are too long and ugly.
The ids here are HTML id friendly, using lowercase letters and numbers. The first character is always a letter.
Source code in pydatalab/blocks/blocks.py
def generate_random_id():
"""This function generates a random 15-length string for use as an id for a datablock. It
should be sufficiently random that there is a negligible risk of ever generating
the same id twice, so this is a unique id that can be used as a unique database refrence
and also can be used as id in the DOM. Note: uuid.uuid4() would do this too, but I think
the generated ids are too long and ugly.
The ids here are HTML id friendly, using lowercase letters and numbers. The first character
is always a letter.
"""
randlist = [random.choice("abcdefghijklmnopqrstuvwxyz")] + random.choices(
"abcdefghijklmnopqrstuvwxyz0123456789", k=14
)
return "".join(randlist)
echem_block
¶
CycleBlock (DataBlock)
¶A data block for processing electrochemical cycling data.
This class that contains functions for processing dataframes created by navani from raw cycler files and plotting them with Bokeh.
Source code in pydatalab/blocks/echem_block.py
class CycleBlock(DataBlock):
"""A data block for processing electrochemical cycling data.
This class that contains functions for processing dataframes created by navani
from raw cycler files and plotting them with Bokeh.
"""
blocktype = "cycle"
description = "Electrochemical cycling"
accepted_file_extensions = (
".mpr",
".txt",
".xls",
".xlsx",
".txt",
".res",
)
cache: Dict[str, Any]
defaults = {
"p_spline": 5,
"s_spline": 5,
"win_size_2": 101,
"win_size_1": 1001,
"derivative_mode": None,
}
def _get_characteristic_mass_g(self):
# return {"characteristic_mass": 1000}
doc = flask_mongo.db.items.find_one(
{"item_id": self.data["item_id"]}, {"characteristic_mass": 1}
)
characteristic_mass_mg = doc.get("characteristic_mass", None)
if characteristic_mass_mg:
return characteristic_mass_mg / 1000.0
return None
def _load(self, file_id: Union[str, ObjectId], reload: bool = False):
"""Loads the echem data using navani, summarises it, then caches the results
to disk with suffixed names.
Parameters:
file_id: The ID of the file to load.
reload: Whether to reload the data from the file, or use the cached version, if available.
"""
required_keys = (
"Time",
"Voltage",
"Capacity",
"Current",
"dqdv",
"dvdq",
"half cycle",
"full cycle",
)
keys_with_units = {
"Time": "time (s)",
"Voltage": "voltage (V)",
"Capacity": "capacity (mAh)",
"Current": "current (mA)",
"Charge Capacity": "charge capacity (mAh)",
"Discharge Capacity": "discharge capacity (mAh)",
"dqdv": "dQ/dV (mA/V)",
"dvdq": "dV/dQ (V/mA)",
}
file_info = get_file_info_by_id(file_id, update_if_live=True)
filename = file_info["name"]
if file_info.get("is_live"):
reload = True
ext = os.path.splitext(filename)[-1].lower()
if ext not in self.accepted_file_extensions:
raise RuntimeError(
f"Unrecognized filetype {ext}, must be one of {self.accepted_file_extensions}"
)
parsed_file_loc = Path(file_info["location"]).with_suffix(".RAW_PARSED.pkl")
cycle_summary_file_loc = Path(file_info["location"]).with_suffix(".SUMMARY.pkl")
raw_df = None
cycle_summary_df = None
if not reload:
if parsed_file_loc.exists():
raw_df = pd.read_pickle(parsed_file_loc)
if cycle_summary_file_loc.exists():
cycle_summary_df = pd.read_pickle(cycle_summary_file_loc)
if raw_df is None:
try:
LOGGER.debug("Loading file %s", file_info["location"])
start_time = time.time()
raw_df = ec.echem_file_loader(file_info["location"])
LOGGER.debug(
"Loaded file %s in %s seconds",
file_info["location"],
time.time() - start_time,
)
except Exception as exc:
raise RuntimeError(f"Navani raised an error when parsing: {exc}") from exc
raw_df.to_pickle(parsed_file_loc)
if cycle_summary_df is None:
cycle_summary_df = ec.cycle_summary(raw_df)
cycle_summary_df.to_pickle(cycle_summary_file_loc)
raw_df = raw_df.filter(required_keys)
raw_df.rename(columns=keys_with_units, inplace=True)
cycle_summary_df.rename(columns=keys_with_units, inplace=True)
cycle_summary_df["cycle index"] = pd.to_numeric(cycle_summary_df.index, downcast="integer")
return raw_df, cycle_summary_df
def plot_cycle(self):
"""Plots the electrochemical cycling data from the file ID provided in the request."""
if "file_id" not in self.data:
LOGGER.warning("No file_id given")
return
file_id = self.data["file_id"]
derivative_modes = (None, "dQ/dV", "dV/dQ", "final capacity")
if self.data["derivative_mode"] not in derivative_modes:
LOGGER.warning(
"Invalid derivative_mode provided: %s. Expected one of %s. Falling back to `None`.",
self.data["derivative_mode"],
derivative_modes,
)
self.data["derivative_mode"] = None
if self.data["derivative_mode"] is None:
mode = "normal"
else:
mode = self.data["derivative_mode"]
# User list input
cycle_list = self.data.get("cyclenumber", None)
if not isinstance(cycle_list, list):
cycle_list = None
raw_df, cycle_summary_df = self._load(file_id)
characteristic_mass_g = self._get_characteristic_mass_g()
if characteristic_mass_g:
raw_df["capacity (mAh/g)"] = raw_df["capacity (mAh)"] / characteristic_mass_g
raw_df["current (mA/g)"] = raw_df["current (mA)"] / characteristic_mass_g
if cycle_summary_df is not None:
cycle_summary_df["charge capacity (mAh/g)"] = (
cycle_summary_df["charge capacity (mAh)"] / characteristic_mass_g
)
cycle_summary_df["discharge capacity (mAh/g)"] = (
cycle_summary_df["discharge capacity (mAh)"] / characteristic_mass_g
)
df = filter_df_by_cycle_index(raw_df, cycle_list)
if cycle_summary_df is not None:
cycle_summary_df = filter_df_by_cycle_index(cycle_summary_df, cycle_list)
if mode in ("dQ/dV", "dV/dQ"):
df = compute_gpcl_differential(
df,
mode=mode,
polynomial_spline=int(self.data["p_spline"]),
s_spline=10 ** (-float(self.data["s_spline"])),
window_size_1=int(self.data["win_size_1"]),
window_size_2=int(self.data["win_size_2"]),
use_normalized_capacity=bool(characteristic_mass_g),
)
# Reduce df size to 100 points per cycle by default
df = reduce_echem_cycle_sampling(df, num_samples=100)
layout = bokeh_plots.double_axes_echem_plot(
df, cycle_summary=cycle_summary_df, mode=mode, normalized=bool(characteristic_mass_g)
)
self.data["bokeh_plot_data"] = bokeh.embed.json_item(layout, theme=mytheme)
return
@property
def plot_functions(self):
return (self.plot_cycle,)
accepted_file_extensions: Sequence[str]
¶blocktype: str
¶defaults: Dict[str, Any]
¶description: str
¶plot_functions
property
readonly
¶plot_cycle(self)
¶Plots the electrochemical cycling data from the file ID provided in the request.
Source code in pydatalab/blocks/echem_block.py
def plot_cycle(self):
"""Plots the electrochemical cycling data from the file ID provided in the request."""
if "file_id" not in self.data:
LOGGER.warning("No file_id given")
return
file_id = self.data["file_id"]
derivative_modes = (None, "dQ/dV", "dV/dQ", "final capacity")
if self.data["derivative_mode"] not in derivative_modes:
LOGGER.warning(
"Invalid derivative_mode provided: %s. Expected one of %s. Falling back to `None`.",
self.data["derivative_mode"],
derivative_modes,
)
self.data["derivative_mode"] = None
if self.data["derivative_mode"] is None:
mode = "normal"
else:
mode = self.data["derivative_mode"]
# User list input
cycle_list = self.data.get("cyclenumber", None)
if not isinstance(cycle_list, list):
cycle_list = None
raw_df, cycle_summary_df = self._load(file_id)
characteristic_mass_g = self._get_characteristic_mass_g()
if characteristic_mass_g:
raw_df["capacity (mAh/g)"] = raw_df["capacity (mAh)"] / characteristic_mass_g
raw_df["current (mA/g)"] = raw_df["current (mA)"] / characteristic_mass_g
if cycle_summary_df is not None:
cycle_summary_df["charge capacity (mAh/g)"] = (
cycle_summary_df["charge capacity (mAh)"] / characteristic_mass_g
)
cycle_summary_df["discharge capacity (mAh/g)"] = (
cycle_summary_df["discharge capacity (mAh)"] / characteristic_mass_g
)
df = filter_df_by_cycle_index(raw_df, cycle_list)
if cycle_summary_df is not None:
cycle_summary_df = filter_df_by_cycle_index(cycle_summary_df, cycle_list)
if mode in ("dQ/dV", "dV/dQ"):
df = compute_gpcl_differential(
df,
mode=mode,
polynomial_spline=int(self.data["p_spline"]),
s_spline=10 ** (-float(self.data["s_spline"])),
window_size_1=int(self.data["win_size_1"]),
window_size_2=int(self.data["win_size_2"]),
use_normalized_capacity=bool(characteristic_mass_g),
)
# Reduce df size to 100 points per cycle by default
df = reduce_echem_cycle_sampling(df, num_samples=100)
layout = bokeh_plots.double_axes_echem_plot(
df, cycle_summary=cycle_summary_df, mode=mode, normalized=bool(characteristic_mass_g)
)
self.data["bokeh_plot_data"] = bokeh.embed.json_item(layout, theme=mytheme)
return
reduce_echem_cycle_sampling(df: DataFrame, num_samples: int = 100) -> DataFrame
¶Reduce number of cycles to at most num_samples
points per half cycle. Will
keep the endpoint values of each half cycle.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
The echem dataframe to reduce, which must have cycling data stored
under a |
required |
num_samples |
int |
The maximum number of sample points to include per cycle. |
100 |
Returns:
Type | Description |
---|---|
DataFrame |
The output dataframe. |
Source code in pydatalab/blocks/echem_block.py
def reduce_echem_cycle_sampling(df: pd.DataFrame, num_samples: int = 100) -> pd.DataFrame:
"""Reduce number of cycles to at most `num_samples` points per half cycle. Will
keep the endpoint values of each half cycle.
Parameters:
df: The echem dataframe to reduce, which must have cycling data stored
under a `"half cycle"` column.
num_samples: The maximum number of sample points to include per cycle.
Returns:
The output dataframe.
"""
return_df = pd.DataFrame([])
for _, half_cycle in df.groupby("half cycle"):
return_df = pd.concat([return_df, reduce_df_size(half_cycle, num_samples, endpoint=True)])
return return_df
compute_gpcl_differential(df: DataFrame, mode: str = 'dQ/dV', smoothing: bool = True, polynomial_spline: int = 3, s_spline: float = 1e-05, window_size_1: int = 101, window_size_2: int = 1001, polyorder_1: int = 5, polyorder_2: int = 5, use_normalized_capacity: bool = False) -> DataFrame
¶Compute differential dQ/dV or dV/dQ for the input dataframe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
The input dataframe containing the raw cycling data. |
required |
mode |
str |
Either 'dQ/dV' or 'dV/dQ'. Invalid inputs will default to 'dQ/dV'. |
'dQ/dV' |
smoothing |
bool |
Whether or not to apply additional smoothing to the output differential curve. |
True |
polynomial_spline |
int |
The degree of the B-spline fit used by navani. |
3 |
s_spline |
float |
The smoothing parameter used by navani. |
1e-05 |
window_size_1 |
int |
The window size for the |
101 |
window_size_2 |
int |
The window size for the |
1001 |
polyorder_1 |
int |
The polynomial order for the |
5 |
polyorder_2 |
int |
The polynomial order for the |
5 |
Returns:
Type | Description |
---|---|
DataFrame |
A data frame containing the voltages, capacities and requested differential on the reduced cycle list. |
Source code in pydatalab/blocks/echem_block.py
def compute_gpcl_differential(
df: pd.DataFrame,
mode: str = "dQ/dV",
smoothing: bool = True,
polynomial_spline: int = 3,
s_spline: float = 1e-5,
window_size_1: int = 101,
window_size_2: int = 1001,
polyorder_1: int = 5,
polyorder_2: int = 5,
use_normalized_capacity: bool = False,
) -> pd.DataFrame:
"""Compute differential dQ/dV or dV/dQ for the input dataframe.
Args:
df: The input dataframe containing the raw cycling data.
mode: Either 'dQ/dV' or 'dV/dQ'. Invalid inputs will default to 'dQ/dV'.
smoothing: Whether or not to apply additional smoothing to the output differential curve.
polynomial_spline: The degree of the B-spline fit used by navani.
s_spline: The smoothing parameter used by navani.
window_size_1: The window size for the `savgol` filter when smoothing the capacity.
window_size_2: The window size for the `savgol` filter when smoothing the final differential.
polyorder_1: The polynomial order for the `savgol` filter when smoothing the capacity.
polyorder_2: The polynomial order for the `savgol` filter when smoothing the final differential.
Returns:
A data frame containing the voltages, capacities and requested differential
on the reduced cycle list.
"""
if len(df) < 2:
LOGGER.debug(
f"compute_gpcl_differential called on dataframe with length {len(df)}, too small to calculate derivatives"
)
return df
if mode.lower().replace("/", "") == "dvdq":
y_label = "voltage (V)"
x_label = "capacity (mAh/g)" if use_normalized_capacity else "capacity (mAh)"
yp_label = "dV/dQ (V/mA)"
else:
y_label = "capacity (mAh/g)" if use_normalized_capacity else "capacity (mAh)"
x_label = "voltage (V)"
yp_label = "dQ/dV (mA/V)"
smoothing_parameters = {
"polynomial_spline": polynomial_spline,
"s_spline": s_spline,
"window_size_1": window_size_1 if window_size_1 % 2 else window_size_1 + 1,
"window_size_2": window_size_2 if window_size_2 % 2 else window_size_2 + 1,
"polyorder_1": polyorder_1,
"polyorder_2": polyorder_2,
"final_smooth": smoothing,
}
differential_df = pd.DataFrame()
# Loop over distinct half cycles
for cycle in df["half cycle"].unique():
# Extract all segments corresponding to this half cycle index
df_cycle = df[df["half cycle"] == cycle]
# Compute the desired derivative
try:
x, yp, y = ec.dqdv_single_cycle(
df_cycle[y_label], df_cycle[x_label], **smoothing_parameters
)
except TypeError as e:
LOGGER.debug(
f"""Calculating derivative {mode} of half_cycle {cycle} failed with the following error (likely it is a rest or voltage hold):
{e}
Skipping derivative calculation for this half cycle."""
)
continue
# Set up an array per cycle segment that stores the cycle and half-cycle index
cycle_index = df_cycle["full cycle"].max()
cycle_index_array = np.full(len(x), int(cycle_index), dtype=int)
half_cycle_index_array = np.full(len(x), int(cycle), dtype=int)
differential_df = pd.concat(
[
differential_df,
pd.DataFrame(
{
x_label: x,
y_label: y,
yp_label: yp,
"full cycle": cycle_index_array,
"half cycle": half_cycle_index_array,
}
),
]
)
return differential_df
filter_df_by_cycle_index(df: DataFrame, cycle_list: Optional[List[int]] = None) -> DataFrame
¶Filters the input dataframe by the chosen rows in the cycle_list
.
If half_cycle
is a column in the df, it will be used for filtering,
otherwise cycle index
will be used.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
The input dataframe to filter. Must have the column "half cycle". |
required |
cycle_list |
Optional[List[int]] |
The provided list of cycle indices to keep. |
None |
Returns:
Type | Description |
---|---|
DataFrame |
A dataframe with all the data for the selected cycles. |
Source code in pydatalab/blocks/echem_block.py
def filter_df_by_cycle_index(
df: pd.DataFrame, cycle_list: Optional[List[int]] = None
) -> pd.DataFrame:
"""Filters the input dataframe by the chosen rows in the `cycle_list`.
If `half_cycle` is a column in the df, it will be used for filtering,
otherwise `cycle index` will be used.
Args:
df: The input dataframe to filter. Must have the column "half cycle".
cycle_list: The provided list of cycle indices to keep.
Returns:
A dataframe with all the data for the selected cycles.
"""
if cycle_list is None:
return df
if "half cycle" not in df.columns:
if "cycle index" not in df.columns:
raise ValueError(
"Input dataframe must have either 'half cycle' or 'cycle index' column"
)
return df[df["cycle index"].isin(i for i in cycle_list)]
try:
half_cycles = [i for item in cycle_list for i in [(2 * int(item)) - 1, 2 * int(item)]]
except ValueError as exc:
raise ValueError(
f"Unable to parse `cycle_list` as integers: {cycle_list}. Error: {exc}"
) from exc
return df[df["half cycle"].isin(half_cycles)]
bokeh_plots
¶
Attributes¶
COLORS
¶
FONTSIZE
¶
SELECTABLE_CALLBACK_x
¶
SELECTABLE_CALLBACK_y
¶
TOOLS
¶
TYPEFACE
¶
grid_style
¶
grid_theme
¶
mytheme
¶
style
¶
Additional style suitable for grid plots
Functions¶
selectable_axes_plot(df: Union[Dict[str, pandas.core.frame.DataFrame], List[pandas.core.frame.DataFrame], pandas.core.frame.DataFrame], x_options: List[str], y_options: List[str], color_options: Optional[List[str]] = None, color_mapper: Optional[bokeh.models.mappers.ColorMapper] = None, x_default: Optional[str] = None, y_default: Union[str, List[str]] = None, label_x: bool = True, label_y: bool = True, plot_points: bool = True, point_size: int = 4, plot_line: bool = True, plot_title: Optional[str] = None, plot_index: Optional[int] = None, tools: Optional[List] = None, **kwargs)
¶
Creates bokeh layout with selectable axis.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
Union[Dict[str, pandas.core.frame.DataFrame], List[pandas.core.frame.DataFrame], pandas.core.frame.DataFrame] |
Dataframe, or list/dict of dataframes from data block. |
required |
x_options |
List[str] |
Selectable fields to use for the x-values |
required |
y_options |
List[str] |
Selectable fields to use for the y-values |
required |
color_options |
Optional[List[str]] |
Selectable fields to colour lines/points by. |
None |
color_mapper |
Optional[bokeh.models.mappers.ColorMapper] |
Optional colour mapper to pass to switch between log and linear scales. |
None |
x_default |
Optional[str] |
Default x-axis that is plotted at start, defaults to first value of |
None |
y_default |
Union[str, List[str]] |
Default y-axis that is plotted at start, defaults to first value of |
None |
plot_points |
bool |
Whether to use plot markers. |
True |
point_size |
int |
The size of markers, if enabled. |
4 |
plot_line |
bool |
Whether to draw a line between points. |
True |
plot_title |
Optional[str] |
Global plot title to give to the figure. |
None |
plot_index |
Optional[int] |
If part of a larger number of plots, use this index for e.g., choosing the correct value in the colour cycle. |
None |
tools |
Optional[List] |
A list of Bokeh tools to enable. |
None |
Returns:
Type | Description |
---|---|
Bokeh layout |
Source code in pydatalab/bokeh_plots.py
def selectable_axes_plot(
df: Union[Dict[str, pd.DataFrame], List[pd.DataFrame], pd.DataFrame],
x_options: List[str],
y_options: List[str],
color_options: Optional[List[str]] = None,
color_mapper: Optional[ColorMapper] = None,
x_default: Optional[str] = None,
y_default: Optional[Union[str, List[str]]] = None,
label_x: bool = True,
label_y: bool = True,
plot_points: bool = True,
point_size: int = 4,
plot_line: bool = True,
plot_title: Optional[str] = None,
plot_index: Optional[int] = None,
tools: Optional[List] = None,
**kwargs,
):
"""
Creates bokeh layout with selectable axis.
Args:
df: Dataframe, or list/dict of dataframes from data block.
x_options: Selectable fields to use for the x-values
y_options: Selectable fields to use for the y-values
color_options: Selectable fields to colour lines/points by.
color_mapper: Optional colour mapper to pass to switch between log and linear scales.
x_default: Default x-axis that is plotted at start, defaults to first value of `x_options`
y_default: Default y-axis that is plotted at start, defaults to first value of `y_options`.
If provided a list, the first entry will be plotted as solid line, and all others will
be transparent lines.
plot_points: Whether to use plot markers.
point_size: The size of markers, if enabled.
plot_line: Whether to draw a line between points.
plot_title: Global plot title to give to the figure.
plot_index: If part of a larger number of plots, use this index for e.g., choosing the correct
value in the colour cycle.
tools: A list of Bokeh tools to enable.
Returns:
Bokeh layout
"""
if not x_default:
x_default = x_options[0]
if not y_default:
y_default = y_options[0]
if isinstance(y_default, list):
y_label = y_options[0]
else:
y_label = y_default
x_axis_label = x_default if label_x else ""
y_axis_label = y_label if label_y else ""
p = figure(
sizing_mode="scale_width",
aspect_ratio=kwargs.pop("aspect_ratio", 1.5),
x_axis_label=x_axis_label,
y_axis_label=y_axis_label,
tools=TOOLS,
title=plot_title,
**kwargs,
)
if tools:
p.add_tools(tools)
if isinstance(df, pd.DataFrame):
df = [df]
callbacks_x = []
callbacks_y = []
if color_options:
if color_mapper is None:
color_mapper = LinearColorMapper(palette="Cividis256")
hatch_patterns = [None, ".", "/", "x"]
labels = []
if isinstance(df, dict):
labels = list(df.keys())
for ind, df_ in enumerate(df):
if isinstance(df, dict):
df_ = df[df_]
if labels:
label = labels[ind]
else:
label = df_.index.name if len(df) > 1 else ""
source = ColumnDataSource(df_)
if color_options:
color = {"field": color_options[0], "transform": color_mapper}
line_color = "black"
fill_color = None
if hatch_patterns[ind % len(hatch_patterns)] is None:
fill_color = color
elif plot_index is not None:
color = COLORS[plot_index % len(COLORS)]
line_color = COLORS[plot_index % len(COLORS)]
fill_color = COLORS[plot_index % len(COLORS)]
else:
color = COLORS[ind % len(COLORS)]
line_color = COLORS[ind % len(COLORS)]
fill_color = COLORS[ind % len(COLORS)]
# If y_default is a list, plot the first one as a solid line, and the rest as transparent "auxiliary" lines
y_aux = None
if isinstance(y_default, list):
if len(y_default) > 1:
y_aux = y_default[1:]
y_default = y_default[0]
circles = (
p.circle(
x=x_default,
y=y_default,
source=source,
size=point_size,
line_color=color,
fill_color=fill_color,
legend_label=label,
hatch_pattern=hatch_patterns[ind % len(hatch_patterns)],
hatch_color=color,
)
if plot_points
else None
)
lines = (
p.line(x=x_default, y=y_default, source=source, color=line_color, legend_label=label)
if plot_line
else None
)
if y_aux:
for y in y_aux:
aux_lines = ( # noqa
p.line(
x=x_default,
y=y,
source=source,
color=color,
legend_label=label,
alpha=0.3,
)
if plot_line
else None
)
callbacks_x.append(
CustomJS(
args=dict(circle1=circles, line1=lines, source=source, xaxis=p.xaxis[0]),
code=SELECTABLE_CALLBACK_x,
)
)
callbacks_y.append(
CustomJS(
args=dict(circle1=circles, line1=lines, source=source, yaxis=p.yaxis[0]),
code=SELECTABLE_CALLBACK_y,
)
)
if color_mapper and color_options:
color_bar = ColorBar(color_mapper=color_mapper, title=color_options[0]) # type: ignore
p.add_layout(color_bar, "right")
# Add list boxes for selecting which columns to plot on the x and y axis
xaxis_select = Select(title="X axis:", value=x_default, options=x_options)
xaxis_select.js_on_change("value", *callbacks_x)
yaxis_select = Select(title="Y axis:", value=y_default, options=y_options)
yaxis_select.js_on_change("value", *callbacks_y)
p.legend.click_policy = "hide"
if len(df) <= 1:
p.legend.visible = False
plot_columns = [p]
if len(x_options) > 1:
plot_columns.append(xaxis_select)
if len(y_options) > 1:
plot_columns.append(yaxis_select)
layout = column(*plot_columns)
p.js_on_event(DoubleTap, CustomJS(args=dict(p=p), code="p.reset.emit()"))
return layout
double_axes_echem_plot(df: DataFrame, mode: Optional[str] = None, cycle_summary: DataFrame = None, x_options: Sequence[str] = [], pick_peaks: bool = True, normalized: bool = False, **kwargs) -> <function gridplot at 0x7fd6f5c59040>
¶
Creates a Bokeh plot for electrochemistry data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
The pre-processed dataframe containing capacities and voltages, indexed by half cycle. |
required |
mode |
Optional[str] |
Either "dQ/dV", "dV/dQ", "normal" or None. |
None |
x_options |
Sequence[str] |
Columns from |
[] |
pick_peaks |
bool |
Whether or not to pick and plot the peaks in dV/dQ mode. |
True |
Returns: The Bokeh layout.
Source code in pydatalab/bokeh_plots.py
def double_axes_echem_plot(
df: pd.DataFrame,
mode: Optional[str] = None,
cycle_summary: pd.DataFrame = None,
x_options: Sequence[str] = [],
pick_peaks: bool = True,
normalized: bool = False,
**kwargs,
) -> gridplot:
"""Creates a Bokeh plot for electrochemistry data.
Args:
df: The pre-processed dataframe containing capacities and
voltages, indexed by half cycle.
mode: Either "dQ/dV", "dV/dQ", "normal" or None.
x_options: Columns from `df` that can be selected for the
first plot. The first will be used as the default.
pick_peaks: Whether or not to pick and plot the peaks in dV/dQ mode.
Returns: The Bokeh layout.
"""
if not x_options:
x_options = (
["capacity (mAh/g)", "voltage (V)", "time (s)", "current (mA/g)"]
if normalized
else ["capacity (mAh)", "voltage (V)", "time (s)", "current (mA)"]
)
x_options = [opt for opt in x_options if opt in df.columns]
common_options = {"aspect_ratio": 1.5, "tools": TOOLS}
common_options.update(**kwargs)
if mode == "normal":
mode = None
modes = ("dQ/dV", "dV/dQ", "final capacity", None)
if mode not in modes:
raise RuntimeError(f"Mode must be one of {modes} not {mode}.")
x_default = x_options[0]
y_default = x_options[1]
x_options = list(x_options)
cmap = plt.get_cmap("inferno")
plots = []
# normal plot
# x_label = "Capacity (mAh/g)" if x_default == "Capacity normalized" else x_default
x_label = x_default
p1 = figure(x_axis_label=x_label, y_axis_label="voltage (V)", **common_options)
plots.append(p1)
# the differential plot
if mode in ("dQ/dV", "dV/dQ"):
if mode == "dQ/dV":
p2 = figure(
x_axis_label=mode,
y_axis_label="voltage (V)",
y_range=p1.y_range,
**common_options,
)
else:
p2 = figure(
x_axis_label=x_default, y_axis_label=mode, x_range=p1.x_range, **common_options
)
plots.append(p2)
elif mode == "final capacity" and cycle_summary is not None:
palette = Accent[3]
p3 = figure(
x_axis_label="Cycle number",
y_axis_label="capacity (mAh/g)" if normalized else "capacity (mAh)",
**common_options,
)
p3.line(
x="full cycle",
y="charge capacity (mAh/g)" if normalized else "charge capacity (mAh)",
source=cycle_summary,
legend_label="charge",
line_width=2,
color=palette[0],
)
p3.circle(
x="full cycle",
y="charge capacity (mAh/g)" if normalized else "charge capacity (mAh)",
source=cycle_summary,
fill_color="white",
hatch_color=palette[0],
legend_label="charge",
line_width=2,
size=12,
color=palette[0],
)
p3.line(
x="full cycle",
y="discharge capacity (mAh/g)" if normalized else "discharge capacity (mAh)",
source=cycle_summary,
legend_label="discharge",
line_width=2,
color=palette[2],
)
p3.triangle(
x="full cycle",
y="discharge capacity (mAh/g)" if normalized else "discharge capacity (mAh)",
source=cycle_summary,
fill_color="white",
hatch_color=palette[2],
line_width=2,
legend_label="discharge",
size=12,
color=palette[2],
)
p3.legend.location = "right"
p3.y_range.start = 0
lines = []
grouped_by_half_cycle = df.groupby("half cycle")
for ind, plot in enumerate(plots):
x = x_default
y = "voltage (V)"
if ind == 1:
if mode == "dQ/dV":
x = "dQ/dV (mA/V)"
else:
y = "dV/dQ (V/mA)"
# trim the end of the colour cycle for visibility on a white background
color_space = np.linspace(0.3, 0.7, int(df["half cycle"].max())) # type: ignore
for _, group in grouped_by_half_cycle:
line = plot.line(
x=x,
y=y,
source=group,
line_color=matplotlib.colors.rgb2hex(
cmap(color_space[int(group["half cycle"].max()) - 1])
),
hover_line_width=2,
selection_line_width=2,
selection_line_color="black",
)
if mode == "dV/dQ" and ind == 1 and pick_peaks:
# Check if half cycle or not
dvdq_array = np.array(group[y])
if group[y].mean() < 0:
dvdq_array *= -1
peaks, _ = find_peaks(dvdq_array, prominence=5)
peak_locs = group.iloc[peaks]
p2.circle(x=x, y=y, source=peak_locs)
if ind == 0:
lines.append(line)
# Only add the selectable axis to dQ/dV mode
if mode in ("dQ/dV", None):
callback_x = CustomJS(
args=dict(lines=lines, xaxis=p1.xaxis[0]),
code="""
var column = cb_obj.value;
console.log(column)
for (let line of lines) {
line.glyph.x = { field: column };
}
xaxis.axis_label = column;
""",
)
xaxis_select = Select(title="X axis:", value=x_default, options=x_options)
xaxis_select.js_on_change("value", callback_x)
if mode is None:
callback_y = CustomJS(
args=dict(lines=lines, yaxis=p1.yaxis[0]),
code="""
var column = cb_obj.value;
console.log(column)
for (let line of lines) {
line.glyph.y = { field: column };
}
yaxis.axis_label = column;
""",
)
yaxis_select = Select(title="Y axis:", value=y_default, options=x_options)
yaxis_select.js_on_change("value", callback_y)
hovertooltips = [("Cycle No.", "@{full cycle}"), ("Half-cycle", "@{half cycle}")]
if mode:
crosshair = CrosshairTool(dimensions="width" if mode == "dQ/dV" else "height")
for p in plots:
if len(lines) < 100:
p.add_tools(HoverTool(tooltips=hovertooltips))
if mode:
p.add_tools(crosshair)
p.js_on_event(DoubleTap, CustomJS(args=dict(p=p), code="p.reset.emit()"))
if mode == "dQ/dV":
grid = [[p1, p2], [xaxis_select]]
elif mode == "dV/dQ":
grid = [[p1], [p2]]
elif mode == "final capacity":
grid = [[p3]]
else:
grid = [[p1], [xaxis_select], [yaxis_select]]
return gridplot(grid, sizing_mode="scale_width", toolbar_location="below")
config
¶
CONFIG
¶
DeploymentMetadata (BaseModel)
pydantic-model
¶
Source code in pydatalab/config.py
class DeploymentMetadata(BaseModel):
maintainer: Optional[Person]
issue_tracker: Optional[AnyUrl]
homepage: Optional[AnyUrl]
source_repository: Optional[AnyUrl]
@validator("maintainer")
def strip_fields_from_person(cls, v):
if not v.contact_email:
raise ValueError("Must provide contact email for maintainer.")
return Person(contact_email=v.contact_email, display_name=v.display_name)
class Config:
extra = "allow"
maintainer: Person
pydantic-field
¶issue_tracker: AnyUrl
pydantic-field
¶homepage: AnyUrl
pydantic-field
¶source_repository: AnyUrl
pydantic-field
¶strip_fields_from_person(v)
classmethod
¶Source code in pydatalab/config.py
@validator("maintainer")
def strip_fields_from_person(cls, v):
if not v.contact_email:
raise ValueError("Must provide contact email for maintainer.")
return Person(contact_email=v.contact_email, display_name=v.display_name)
Functions¶
config_file_settings(settings: BaseSettings) -> Dict[str, Any]
¶
Returns a dictionary of server settings loaded from the default or specified
JSON config file location (via the env var PYDATALAB_CONFIG_FILE
).
Source code in pydatalab/config.py
def config_file_settings(settings: BaseSettings) -> Dict[str, Any]:
"""Returns a dictionary of server settings loaded from the default or specified
JSON config file location (via the env var `PYDATALAB_CONFIG_FILE`).
"""
config_file = Path(os.getenv("PYDATALAB_CONFIG_FILE", "/app/config.json"))
res = {}
if config_file.is_file():
logging.debug("Loading from config file at %s", config_file)
config_file_content = config_file.read_text(encoding=settings.__config__.env_file_encoding)
try:
res = json.loads(config_file_content)
except json.JSONDecodeError as json_exc:
raise RuntimeError(f"Unable to read JSON config file {config_file}") from json_exc
else:
logging.debug("Unable to load from config file at %s", config_file)
res = {}
return res
errors
¶
ERROR_HANDLERS: Iterable[Tuple[Any, Callable[[Any], Tuple[flask.wrappers.Response, int]]]]
¶
Classes¶
UserRegistrationForbidden (Forbidden)
¶
Raised when a user tries to register via OAuth without the appropriate properties/credentials, e.g., public membership of a GitHub organization that is on the allow list.
Source code in pydatalab/errors.py
class UserRegistrationForbidden(Forbidden):
"""Raised when a user tries to register via OAuth without the appropriate
properties/credentials, e.g., public membership of a GitHub organization
that is on the allow list.
"""
description: str = """<html><head></head>
<body>
<h1>403 Forbidden</h1>
<h2>Unable to create account</h2>
<p>No user data will be stored as a result of this interaction, but you may wish to clear your cookies for this site.</p>
<p>
The OAuth identity used for registration is not on the allow list.
If you believe this to be an error, please first verify that your membership of the allowed
group (e.g., a GitHub organization) is public, and verify with the deployment manager that
the organization is indeed on the allow list.
</p>
<p>If this was not an error, you may wish to revoke any permissions given to the datalab OAuth application.</p>
</body>
</html>
"""
description: str
¶Functions¶
handle_http_exception(exc: HTTPException) -> Tuple[flask.wrappers.Response, int]
¶
Return a specific error message and status code if the exception stores them.
Source code in pydatalab/errors.py
def handle_http_exception(exc: HTTPException) -> Tuple[Response, int]:
"""Return a specific error message and status code if the exception stores them."""
response = {
"title": exc.__class__.__name__,
"description": exc.description,
}
status_code = exc.code if exc.code else 400
return jsonify(response), status_code
render_unauthorised_user_template(exc: UserRegistrationForbidden) -> Tuple[flask.wrappers.Response, int]
¶
Return a rich HTML page on user account creation failure.
Source code in pydatalab/errors.py
def render_unauthorised_user_template(exc: UserRegistrationForbidden) -> Tuple[Response, int]:
"""Return a rich HTML page on user account creation failure."""
return Response(response=exc.description), exc.code
handle_pydantic_validation_error(exc: ValidationError) -> Tuple[flask.wrappers.Response, int]
¶
Handle pydantic validation errors separately from other exceptions. These always come from malformed data, so should not necessarily trigger the Flask debugger.
Source code in pydatalab/errors.py
def handle_pydantic_validation_error(exc: ValidationError) -> Tuple[Response, int]:
"""Handle pydantic validation errors separately from other exceptions.
These always come from malformed data, so should not necessarily trigger the
Flask debugger.
"""
response = {
"title": exc.__class__.__name__,
"message": str(exc.args[:]) if exc.args else "",
}
return jsonify(response), 500
handle_generic_exception(exc: Exception) -> Tuple[flask.wrappers.Response, int]
¶
Return a specific error message and status code if the exception stores them.
Source code in pydatalab/errors.py
def handle_generic_exception(exc: Exception) -> Tuple[Response, int]:
"""Return a specific error message and status code if the exception stores them."""
if os.environ.get("FLASK_ENV") == "development":
raise exc
response = {
"title": exc.__class__.__name__,
"message": str(exc.args) if exc.args else "",
}
return jsonify(response), 500
file_utils
¶
DIRECTORIES_DICT
¶
FILE_DIRECTORY
¶
LIVE_FILE_CUTOFF
¶
Functions¶
get_file_info_by_id(file_id: Union[str, bson.objectid.ObjectId], update_if_live: bool = True) -> Dict[str, Any]
¶
Query the files collection for the given ID.
If the update_if_live
and the file has been updated on the
remote since it was added to the database, then the new version
will be copied into the local filestore.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_id |
Union[str, bson.objectid.ObjectId] |
Either the string or ObjectID representatoin of the file ID. |
required |
update_if_live |
bool |
Whether or not to update the stored file to a newer version, if it exists. |
True |
Exceptions:
Type | Description |
---|---|
IOError |
If the given file ID does not exist in the database. |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The stored file information as a dictonary. Will be empty if the corresponding file does not exist on disk. |
Source code in pydatalab/file_utils.py
@logged_route
def get_file_info_by_id(
file_id: Union[str, ObjectId], update_if_live: bool = True
) -> Dict[str, Any]:
"""Query the files collection for the given ID.
If the `update_if_live` and the file has been updated on the
remote since it was added to the database, then the new version
will be copied into the local filestore.
Arguments:
file_id: Either the string or ObjectID representatoin of the file ID.
update_if_live: Whether or not to update the stored file to a
newer version, if it exists.
Raises:
IOError: If the given file ID does not exist in the database.
Returns:
The stored file information as a dictonary. Will be empty if the
corresponding file does not exist on disk.
"""
LOGGER.debug("getting file for file_id: %s", file_id)
file_collection = get_database().files
file_id = ObjectId(file_id)
file_info = file_collection.find_one({"_id": file_id})
if not file_info:
raise IOError(f"could not find file with id: {file_id} in db")
file_info = File(**file_info)
if update_if_live and file_info.is_live:
file_info = _check_and_sync_file(file_info, file_id)
return file_info.dict()
update_uploaded_file(file, file_id, last_modified = None, size_bytes = None)
¶
file is a file object from a flask request. last_modified should be an isodate format. if None, the current time will be inserted By default, only changes the last_modified, and size_bytes, increments version, and verifies source=remote and is_live=false. (converts ) additional_updates can be used to pass other fields to change in (NOT IMPLEMENTED YET)
Source code in pydatalab/file_utils.py
@logged_route
def update_uploaded_file(file, file_id, last_modified=None, size_bytes=None):
"""file is a file object from a flask request.
last_modified should be an isodate format. if None, the current time will be inserted
By default, only changes the last_modified, and size_bytes, increments version, and verifies source=remote and is_live=false. (converts )
additional_updates can be used to pass other fields to change in (NOT IMPLEMENTED YET)"""
last_modified = datetime.datetime.now().isoformat()
file_collection = get_database().files
updated_file_entry = file_collection.find_one_and_update(
{"_id": file_id}, # Note, needs to be ObjectID()
{
"$set": {
"last_modified": last_modified,
"size": size_bytes,
"source": "remote",
"is_live": False,
},
"$inc": {"revision": 1},
},
return_document=ReturnDocument.AFTER,
)
if not updated_file_entry:
raise IOError(f"Issue with db update uploaded file {file.name} id {file_id}")
updated_file_entry = File(**updated_file_entry)
# overwrite the old file with the new location
file.save(updated_file_entry["location"])
ret = updated_file_entry.dict()
ret.update({"_id": file_id})
return ret
save_uploaded_file(file, item_ids = None, block_ids = None, last_modified = None, size_bytes = None)
¶
file is a file object from a flask request. last_modified should be an isodate format. if last_modified is None, the current time will be inserted
Source code in pydatalab/file_utils.py
@logged_route
def save_uploaded_file(file, item_ids=None, block_ids=None, last_modified=None, size_bytes=None):
"""file is a file object from a flask request.
last_modified should be an isodate format. if last_modified is None, the current time will be inserted"""
from pydatalab.routes.utils import get_default_permissions
sample_collection = get_database().items
file_collection = get_database().files
# validate item_ids
if not item_ids:
item_ids = []
if not block_ids:
block_ids = []
for item_id in item_ids:
if not sample_collection.find_one(
{"item_id": item_id, **get_default_permissions(user_only=True)}
):
raise ValueError(f"item_id is invalid: {item_id}")
filename = secure_filename(file.filename)
extension = os.path.splitext(filename)[1]
if not last_modified:
last_modified = datetime.datetime.now().isoformat()
new_file_document = File(
name=filename,
original_name=file.filename, # not escaped
location=None, # file storage location in datalab. Important! will be filled in below
url_path=None, # the url used to access this file. Important! will be filled in below
extension=extension,
source="uploaded",
size=size_bytes,
item_ids=item_ids,
blocks=block_ids,
last_modified=last_modified,
time_added=last_modified,
metadata={},
representation=None,
source_server_name=None, # not used for source=uploaded
source_path=None, # not used for source=uploaded
last_modified_remote=None, # not used for source=uploaded
is_live=False, # not available for source=uploaded
revision=1, # increment with each update
)
result = file_collection.insert_one(new_file_document.dict())
if not result.acknowledged:
raise IOError(f"db operation failed when trying to insert new file. Result: {result}")
inserted_id = result.inserted_id
new_directory = os.path.join(FILE_DIRECTORY, str(inserted_id))
file_location = os.path.join(new_directory, filename)
os.makedirs(new_directory)
file.save(file_location)
updated_file_entry = file_collection.find_one_and_update(
{"_id": inserted_id},
{
"$set": {
"location": file_location,
"size": os.path.getsize(file_location),
}
},
return_document=ReturnDocument.AFTER,
)
updated_file_entry = File(**updated_file_entry)
# update any referenced item_ids
for item_id in item_ids:
sample_update_result = sample_collection.update_one(
{"item_id": item_id, **get_default_permissions(user_only=True)},
{"$push": {"file_ObjectIds": inserted_id}},
)
if sample_update_result.modified_count != 1:
raise IOError(
f"db operation failed when trying to insert new file ObjectId into sample: {item_id}"
)
ret = updated_file_entry.dict()
ret.update({"_id": inserted_id})
return ret
add_file_from_remote_directory(file_entry, item_id, block_ids = None)
¶
Source code in pydatalab/file_utils.py
def add_file_from_remote_directory(file_entry, item_id, block_ids=None):
from pydatalab.routes.utils import get_default_permissions
file_collection = get_database().files
sample_collection = get_database().items
if not block_ids:
block_ids = []
filename = secure_filename(file_entry["name"])
extension = os.path.splitext(filename)[1]
# generate the remote url
host = DIRECTORIES_DICT[file_entry["toplevel_name"]]
remote_path = os.path.join(file_entry["relative_path"].lstrip("/"), file_entry["name"])
# If we are dealing with a truly remote host
if host["hostname"]:
remote_toplevel_path = f'{host["hostname"]}:{host["path"]}'
full_remote_path = f"{remote_toplevel_path}/{remote_path}"
if file_entry.get("time") is None:
remote_timestamp = None
else:
remote_timestamp = datetime.datetime.fromtimestamp(int(file_entry["time"]))
# Otherwise we assume the file is mounted locally
else:
remote_toplevel_path = host["path"]
full_remote_path = os.path.join(remote_toplevel_path, remote_path)
# check that the path is valid and get the last modified time from the server
remote_timestamp = os.path.getmtime(full_remote_path)
new_file_document = File(
name=filename,
original_name=file_entry["name"], # not escaped
# file storage location in datalab. Important! will be filled in below
location=None,
# the URL used to access this file. Important! will be filled in below
url_path=None,
extension=extension,
source="remote",
size=file_entry["size"],
item_ids=[item_id],
blocks=block_ids,
# last_modified is the last modified time of the db entry in isoformat. For last modified file timestamp, see last_modified_remote_timestamp
last_modified=datetime.datetime.now().isoformat(),
time_added=datetime.datetime.now().isoformat(),
metadata={},
representation=None,
source_server_name=file_entry["toplevel_name"],
# this is the relative path from the given source_server_name (server directory)
source_path=remote_path,
# last modified time as provided from the remote server. May by different than last_modified if the two servers times are not synchrotronized.
last_modified_remote=remote_timestamp,
# Whether this file will update (if changes have occured) on access
is_live=bool(host["hostname"]),
# incremented with each update
version=1,
)
result = file_collection.insert_one(new_file_document.dict())
if not result.acknowledged:
raise IOError(f"db operation failed when trying to insert new file. Result: {result}")
inserted_id = result.inserted_id
new_directory = os.path.join(FILE_DIRECTORY, str(inserted_id))
new_file_location = os.path.join(new_directory, filename)
os.makedirs(new_directory)
_sync_file_with_remote(full_remote_path, new_file_location)
updated_file_entry = file_collection.find_one_and_update(
{"_id": inserted_id},
{
"$set": {
"location": new_file_location,
"url_path": new_file_location,
}
},
return_document=ReturnDocument.AFTER,
)
sample_update_result = sample_collection.update_one(
{"item_id": item_id, **get_default_permissions(user_only=True)},
{"$push": {"file_ObjectIds": inserted_id}},
)
if sample_update_result.modified_count != 1:
raise IOError(
f"db operation failed when trying to insert new file ObjectId into sample: {item_id}"
)
return updated_file_entry
retrieve_file_path(file_ObjectId)
¶
Source code in pydatalab/file_utils.py
def retrieve_file_path(file_ObjectId):
file_collection = get_database().files
result = file_collection.find_one({"_id": ObjectId(file_ObjectId)})
if not result:
raise FileNotFoundError(
f"The file with file_ObjectId: {file_ObjectId} could not be found in the database"
)
result = File(**result)
return result.location
remove_file_from_sample(item_id: Union[str, bson.objectid.ObjectId], file_id: Union[str, bson.objectid.ObjectId]) -> None
¶
Detach the file at file_id
from the item at item_id
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item_id |
Union[str, bson.objectid.ObjectId] |
The database ID of the item to alter. |
required |
file_id |
Union[str, bson.objectid.ObjectId] |
The database ID of the file to remove from the item. |
required |
Source code in pydatalab/file_utils.py
def remove_file_from_sample(item_id: Union[str, ObjectId], file_id: Union[str, ObjectId]) -> None:
"""Detach the file at `file_id` from the item at `item_id`.
Args:
item_id: The database ID of the item to alter.
file_id: The database ID of the file to remove from the item.
"""
from pydatalab.routes.utils import get_default_permissions
item_id, file_id = ObjectId(item_id), ObjectId(file_id)
sample_collection = get_database().items
file_collection = get_database().files
sample_result = sample_collection.update_one(
{"item_id": item_id, **get_default_permissions(user_only=True)},
{"$pull": {"file_ObjectIds": file_id}},
)
if sample_result.modified_count < 1:
raise IOError(
f"Failed to remove {file_id!r} from item {item_id!r}. Result: {sample_result.raw_result}"
)
file_collection.update_one(
{"_id": file_id},
{"$pull": {"item_ids": item_id}},
)
logger
¶
LOGGER
¶
Classes¶
AnsiColorHandler (StreamHandler)
¶
Colourful and truncated log handler, exfiltrated from/inspired by various answers at https://stackoverflow.com/questions/7484454/removing-handlers-from-pythons-logging-loggers
Source code in pydatalab/logger.py
class AnsiColorHandler(logging.StreamHandler):
"""Colourful and truncated log handler, exfiltrated from/inspired
by various answers at
https://stackoverflow.com/questions/7484454/removing-handlers-from-pythons-logging-loggers
"""
LOGLEVEL_COLORS = {
logging.DEBUG: "36m",
logging.INFO: "32m",
logging.WARNING: "33m",
logging.ERROR: "1;91m",
logging.CRITICAL: "101;30m",
}
max_width = 2000
def __init__(self) -> None:
super().__init__()
self.formatter = logging.Formatter("%(asctime)s - %(name)s | %(levelname)-8s: %(message)s")
def format(self, record: logging.LogRecord) -> str:
from flask_login import current_user
prefix = "🔓"
if current_user and current_user.is_authenticated:
prefix = "🔒"
message: str = super().format(record)
if len(message) > self.max_width:
message = message[: self.max_width] + "[...]"
color = self.LOGLEVEL_COLORS[record.levelno]
message = f"\x1b[{color} {prefix} {message}\x1b[0m"
return message
LOGLEVEL_COLORS
¶max_width
¶__init__(self) -> None
special
¶Source code in pydatalab/logger.py
def __init__(self) -> None:
super().__init__()
self.formatter = logging.Formatter("%(asctime)s - %(name)s | %(levelname)-8s: %(message)s")
format(self, record: LogRecord) -> str
¶Format the specified record.
If a formatter is set, use it. Otherwise, use the default formatter for the module.
Source code in pydatalab/logger.py
def format(self, record: logging.LogRecord) -> str:
from flask_login import current_user
prefix = "🔓"
if current_user and current_user.is_authenticated:
prefix = "🔒"
message: str = super().format(record)
if len(message) > self.max_width:
message = message[: self.max_width] + "[...]"
color = self.LOGLEVEL_COLORS[record.levelno]
message = f"\x1b[{color} {prefix} {message}\x1b[0m"
return message
Functions¶
setup_log(log_name: str = 'pydatalab', log_level: Optional[int] = None) -> Logger
¶
Creates a logger a simple coloured stdout output.
Verbosity can be set to debug in the config file via the DEBUG option, or passed the the function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_name |
str |
The name of the logger. |
'pydatalab' |
log_level |
Optional[int] |
The logging level to use. |
None |
Returns:
Type | Description |
---|---|
Logger |
The logger object. |
Source code in pydatalab/logger.py
def setup_log(log_name: str = "pydatalab", log_level: Optional[int] = None) -> logging.Logger:
"""Creates a logger a simple coloured stdout output.
Verbosity can be set to debug in the config file via
the DEBUG option, or passed the the function.
Parameters:
log_name: The name of the logger.
log_level: The logging level to use.
Returns:
The logger object.
"""
from pydatalab.config import CONFIG
logger = logging.getLogger(log_name)
logger.handlers = []
logger.propagate = False
handler = AnsiColorHandler()
logger.addHandler(handler)
if log_level is None:
log_level = logging.INFO
if CONFIG.DEBUG:
log_level = logging.DEBUG
logger.setLevel(log_level)
return logger
logged_route(fn: Callable)
¶
A decorator that enables logging of inputs (arguments and request body) and outputs (server response) when debug mode is enabled.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
Callable |
The function to wrap. |
required |
Source code in pydatalab/logger.py
def logged_route(fn: Callable):
"""A decorator that enables logging of inputs (arguments
and request body) and outputs (server response) when debug
mode is enabled.
Args:
fn: The function to wrap.
"""
@wraps(fn)
def wrapped_logged_route(*args, **kwargs):
from flask import request
start = time.monotonic_ns()
try:
LOGGER.debug(
"Calling %s with request: %s, JSON payload with keys %s",
fn.__name__,
request,
request.get_json().keys() if request.get_json() else "null",
)
except Exception:
pass
try:
result = fn(*args, **kwargs)
LOGGER.debug(
"%s returned in %s seconds with %s",
fn.__name__,
(time.monotonic_ns() - start) / 1e9,
result,
)
return result
except Exception as exc:
import traceback
LOGGER.error(
"%s errored in %s seconds with %s %s %s",
fn.__name__,
(time.monotonic_ns() - start) / 1e9,
exc.__class__.__name__,
exc,
traceback.print_tb(exc.__traceback__),
)
raise exc
return wrapped_logged_route
login
¶
This module implements functionality around the Flask-login manager, for retrieving the authenticated user for a session and their identities.
Attributes¶
LOGIN_MANAGER: LoginManager
¶
The global login manager for the app.
Classes¶
LoginUser (UserMixin)
¶
A wrapper class around Person
to allow flask-login to track
the session of the current user and get their details
from the database.
(See https://flask-login.readthedocs.io/en/latest/#your-user-class)
Source code in pydatalab/login.py
class LoginUser(UserMixin):
"""A wrapper class around `Person` to allow flask-login to track
the session of the current user and get their details
from the database.
(See https://flask-login.readthedocs.io/en/latest/#your-user-class)
"""
id: str
person: Person
role: UserRole
def __init__(
self,
_id: str,
data: Person,
role: UserRole,
):
"""Construct the logged in user from a given ID and user data.
Parameters:
_id: The ID of the person in the database.
data: The relevant metadata for this user, e.g., their identities, contact
details, for use by the app.
"""
self.id = _id
self.person = data
self.role = role
@property
def display_name(self) -> Optional[str]:
"""Returns the top-level display name for the user, if set."""
return self.person.display_name
@property
def identities(self) -> List[Identity]:
"""Returns the list of identities of the user."""
return self.person.identities
@property
def identity_types(self) -> List[IdentityType]:
"""Returns a list of the identity types associated with the user."""
return [_.identity_type for _ in self.person.identities]
def refresh(self) -> None:
"""Reconstruct the user object from their database entry, to be used when,
e.g., a new identity has been associated with them.
"""
user = get_by_id(self.id)
if user:
self.person = user.person
self.role = user.role
display_name: Optional[str]
property
readonly
¶Returns the top-level display name for the user, if set.
identities: List[pydatalab.models.people.Identity]
property
readonly
¶Returns the list of identities of the user.
identity_types: List[pydatalab.models.people.IdentityType]
property
readonly
¶Returns a list of the identity types associated with the user.
__init__(self, _id: str, data: Person, role: UserRole)
special
¶Construct the logged in user from a given ID and user data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_id |
str |
The ID of the person in the database. |
required |
data |
Person |
The relevant metadata for this user, e.g., their identities, contact details, for use by the app. |
required |
Source code in pydatalab/login.py
def __init__(
self,
_id: str,
data: Person,
role: UserRole,
):
"""Construct the logged in user from a given ID and user data.
Parameters:
_id: The ID of the person in the database.
data: The relevant metadata for this user, e.g., their identities, contact
details, for use by the app.
"""
self.id = _id
self.person = data
self.role = role
refresh(self) -> None
¶Reconstruct the user object from their database entry, to be used when, e.g., a new identity has been associated with them.
Source code in pydatalab/login.py
def refresh(self) -> None:
"""Reconstruct the user object from their database entry, to be used when,
e.g., a new identity has been associated with them.
"""
user = get_by_id(self.id)
if user:
self.person = user.person
self.role = user.role
Functions¶
get_by_id_cached(user_id)
¶
Cached version of get_by_id.
Source code in pydatalab/login.py
@lru_cache(maxsize=128)
def get_by_id_cached(user_id):
"""Cached version of get_by_id."""
return get_by_id(user_id)
get_by_id(user_id: str) -> Optional[pydatalab.login.LoginUser]
¶
Lookup the user database ID and create a new LoginUser
with the relevant metadata.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
str |
The user's ID in the database, either as a string,
an ObjectID, or a JSON |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
if the user could not be found. |
Source code in pydatalab/login.py
def get_by_id(user_id: str) -> Optional[LoginUser]:
"""Lookup the user database ID and create a new `LoginUser`
with the relevant metadata.
Parameters:
user_id: The user's ID in the database, either as a string,
an ObjectID, or a JSON `{'$oid': <id>}` dictionary.
Raises:
ValueError: if the user could not be found.
"""
user = flask_mongo.db.users.find_one({"_id": ObjectId(user_id)})
if not user:
return None
role = flask_mongo.db.roles.find_one({"_id": ObjectId(user_id)})
if not role:
role = "user"
else:
role = role["role"]
return LoginUser(_id=user_id, data=Person(**user), role=UserRole(role))
get_by_api_key(key: str)
¶
Checks if the hashed version of the key is in the keys collection, if so, return the authenticated user.
Source code in pydatalab/login.py
def get_by_api_key(key: str):
"""Checks if the hashed version of the key is in the keys collection,
if so, return the authenticated user.
"""
hash = sha512(key.encode("utf-8")).hexdigest()
user = flask_mongo.db.api_keys.find_one({"hash": hash}, projection={"hash": 0})
if user:
return get_by_id_cached(str(user["_id"]))
load_user(user_id: str) -> Optional[pydatalab.login.LoginUser]
¶
Looks up the currently authenticated user and returns a LoginUser
model.
Source code in pydatalab/login.py
@LOGIN_MANAGER.user_loader
def load_user(user_id: str) -> Optional[LoginUser]:
"""Looks up the currently authenticated user and returns a `LoginUser` model."""
return get_by_id_cached(str(user_id))
request_loader(request) -> Optional[pydatalab.login.LoginUser]
¶
Source code in pydatalab/login.py
@LOGIN_MANAGER.request_loader
def request_loader(request) -> Optional[LoginUser]:
api_key = request.headers.get("DATALAB-API-KEY", None)
if api_key:
return get_by_api_key(str(api_key))
return None
main
¶
compress
¶
Functions¶
create_app(config_override: Dict[str, Any] = None) -> Flask
¶
Create the main Flask
app with the given config.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config_override |
Dict[str, Any] |
Config value overrides to use
within the |
None |
Returns:
Type | Description |
---|---|
Flask |
The |
Source code in pydatalab/main.py
def create_app(config_override: Dict[str, Any] = None) -> Flask:
"""Create the main `Flask` app with the given config.
Parameters:
config_override: Config value overrides to use
within the `Flask` app.
Returns:
The `Flask` app with all associated endpoints.
"""
setup_log("werkzeug", log_level=logging.INFO)
setup_log("", log_level=logging.INFO)
app = Flask(__name__, instance_relative_config=True)
if config_override:
CONFIG.update(config_override)
app.config.update(CONFIG.dict())
app.config.update(dotenv_values())
LOGGER.info("Starting app with Flask app.config: %s", app.config)
LOGGER.info("Datalab config: %s", CONFIG.dict())
if CONFIG.BEHIND_REVERSE_PROXY:
# Fix headers for reverse proxied app:
# https://flask.palletsprojects.com/en/2.2.x/deploying/proxy_fix/
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) # type: ignore
CORS(
app,
resources={r"/*": {"origins": "*"}},
supports_credentials=True,
)
app.json_encoder = CustomJSONEncoder
# Must use the full path so that this object can be mocked for testing
flask_mongo = pydatalab.mongo.flask_mongo
flask_mongo.init_app(app, connectTimeoutMS=100, serverSelectionTimeoutMS=100)
register_endpoints(app)
LOGIN_MANAGER.init_app(app)
pydatalab.mongo.create_default_indices()
compress.init_app(app)
@app.route("/logout")
def logout():
"""Logs out the local user from the current session."""
logout_user()
return redirect(request.environ.get("HTTP_REFERER", "/"))
@app.before_first_request # runs before FIRST request (only once)
def make_session_permanent():
"""Make the session permanent so that it doesn't expire on browser close, but instead adds a lifetime."""
session.permanent = True
app.permanent_session_lifetime = datetime.timedelta(days=1)
@app.route("/")
def index():
"""Landing page endpoint that renders a rudimentary welcome page based on the currently
authenticated user.
Warning:
Does not use a Jinja template, so care must be taken in validating
the embedded inputs.
"""
from pydatalab.routes import ( # pylint: disable=import-outside-toplevel
ENDPOINTS,
auth,
)
OAUTH_PROXIES = auth.OAUTH_PROXIES
connected = True
try:
pydatalab.mongo.check_mongo_connection()
except RuntimeError:
connected = False
if connected:
database_string = (
'<p style="color: DarkSeaGreen">✅ Connected to underlying database</p>'
)
else:
database_string = (
'<p style="color: FireBrick">❎ Unable to connect to underlying database</p>'
)
if connected:
if current_user.is_authenticated:
welcome_string = f"""
<h2>Hello, {current_user.display_name}!</h2>
<h3>Connected identities:</h3>
<ul>
"""
for identity in current_user.identities:
if identity.identity_type == "github":
welcome_string += f"""
<li>
<a href="https://github.com/{identity.name}">
<i class="fa fa-github"></i>
{identity.name}
</a>
</li>
"""
elif identity.identity_type == "orcid":
welcome_string += f"""
<li>
<a href="https://orcid.org/{identity.name}">
<img alt="ORCID logo" style="vertical-align: middle;", src="https://info.orcid.org/wp-content/uploads/2019/11/orcid_16x16.png" width="16" height="16" />
{identity.name}
</a>
</li>
"""
welcome_string += "</ul>"
else:
welcome_string = (
"""<h2>Welcome!</h2><h4>Please connect an OAuth account to continue:</h4>"""
)
connect_buttons = {
"github": f"""
<a href={url_for('github.login')}>
<i class="fa fa-github"></i>
Connect GitHub
</a></br>
""",
"orcid": f"""
<a href={url_for("orcid.login")}>
<img alt="ORCID logo" style="vertical-align: middle;", src="https://info.orcid.org/wp-content/uploads/2019/11/orcid_16x16.png" width="16" height="16" />
Connect ORCID
</a></br>
""",
}
auth_string = "<ul>"
logout_string = ""
if current_user.is_authenticated:
for k in OAUTH_PROXIES:
if k not in current_user.identity_types:
auth_string += f"<li>{connect_buttons[k]}</li>"
logout_string += f'<a href={url_for("logout")}>Log out</a>'
else:
for k in OAUTH_PROXIES:
auth_string += f'<li>{connect_buttons[k].replace("Connect", "Login via")}</li>'
auth_string += "</ul>"
endpoints_string = "\n".join(
[
f'<li><a href="{endp[0]}"><pre>{endp[0]}</pre></a></li>'
for endp in ENDPOINTS.items()
]
)
endpoints_string = f"""<h3>Available endpoints:</h3><ul>{endpoints_string}</ul>"""
else:
auth_string = ""
logout_string = ""
welcome_string = ""
endpoints_string = ""
return f"""<head>
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css">
</head>
<h2><p style="color: CornflowerBlue">Welcome to pydatalab</p></h2>
<p>{welcome_string}</p>
<p>{auth_string}</p>
<p>{logout_string}</p>
<h3>API status:</h3>
<h4>{database_string}</h4>
{endpoints_string}
"""
return app
register_endpoints(app: Flask)
¶
Loops through the implemented endpoints, blueprints and error handlers adds them to the app.
Source code in pydatalab/main.py
def register_endpoints(app: Flask):
"""Loops through the implemented endpoints, blueprints and error handlers adds them to the app."""
from pydatalab.errors import ERROR_HANDLERS
from pydatalab.routes import BLUEPRINTS, ENDPOINTS, __api_version__, auth
OAUTH_BLUEPRINTS = auth.OAUTH_BLUEPRINTS
major, minor, patch = __api_version__.split(".")
versions = ["", f"/v{major}", f"/v{major}.{minor}", f"/v{major}.{minor}.{patch}"]
for rule, func in ENDPOINTS.items():
for ver in versions:
app.add_url_rule(
f"{ver}{rule}",
f"{ver}{rule}",
logged_route(func),
)
for bp in BLUEPRINTS:
for ver in versions:
app.register_blueprint(bp, url_prefix=f"{ver}", name=f"{ver}/{bp.name}")
for bp in OAUTH_BLUEPRINTS: # type: ignore
app.register_blueprint(OAUTH_BLUEPRINTS[bp], url_prefix="/login") # type: ignore
for exception_type, handler in ERROR_HANDLERS:
app.register_error_handler(exception_type, handler)
models
special
¶
ITEM_MODELS: Dict[str, Type[pydantic.main.BaseModel]]
¶
Modules¶
cells
¶
CellComponent (Constituent)
pydantic-model
¶Source code in pydatalab/models/cells.py
class CellComponent(Constituent):
...
CellFormat (str, Enum)
¶An enumeration.
Source code in pydatalab/models/cells.py
class CellFormat(str, Enum):
coin = "coin"
pouch = "pouch"
in_situ_xrd = "in situ (XRD)"
in_situ_nmr = "in situ (NMR)"
in_situ_squid = "in situ (SQUID)"
swagelok = "swagelok"
cylindrical = "cylindrical"
other = "other"
Cell (Item)
pydantic-model
¶A model for representing electrochemical cells.
Source code in pydatalab/models/cells.py
class Cell(Item):
"""A model for representing electrochemical cells."""
type: str = Field("cells", const="cells", pattern="^cells$")
cell_format: Optional[CellFormat] = Field(
description="The form factor of the cell, e.g., coin, pouch, in situ or otherwise.",
)
cell_format_description: Optional[str] = Field(
description="Additional human-readable description of the cell form factor, e.g., 18650, AMPIX, CAMPIX"
)
cell_preparation_description: Optional[str] = Field()
characteristic_mass: Optional[float] = Field(
description="The characteristic mass of the cell in milligrams. Can be used to normalize capacities."
)
characteristic_chemical_formula: Optional[str] = Field(
description="The chemical formula of the active material. Can be used to calculated molar mass in g/mol for normalizing capacities."
)
characteristic_molar_mass: Optional[float] = Field(
description="The molar mass of the active material, in g/mol. Will be inferred from the chemical formula, or can be supplied if it cannot be supplied"
)
positive_electrode: List[CellComponent] = Field([])
negative_electrode: List[CellComponent] = Field([])
electrolyte: List[CellComponent] = Field([])
active_ion_charge: float = Field(1)
@validator("characteristic_molar_mass", always=True, pre=True)
def set_molar_mass(cls, v, values):
from periodictable import formula
if not v:
chemical_formula = values.get("characteristic_chemical_formula")
if chemical_formula:
try:
return formula(chemical_formula).mass
except Exception:
return None
return v
@root_validator
def add_missing_electrode_relationships(cls, values):
"""Add any missing sample synthesis constituents to parent relationships"""
from pydatalab.models.relationships import RelationshipType, TypedRelationship
existing_parthood_relationship_ids = set()
if values.get("relationships") is not None:
existing_parthood_relationship_ids = set(
relationship.item_id
for relationship in values["relationships"]
if relationship.relation == RelationshipType.PARTHOOD
)
else:
values["relationships"] = []
for component in ("positive_electrode", "negative_electrode", "electrolyte"):
for constituent in values.get(component, []):
if (
isinstance(constituent.item, EntryReference)
and constituent.item.item_id not in existing_parthood_relationship_ids
):
relationship = TypedRelationship(
relation=RelationshipType.PARTHOOD,
item_id=constituent.item.item_id,
type=constituent.item.type,
description="Is a constituent of",
)
values["relationships"].append(relationship)
return values
__slots__
special
¶cell_format: CellFormat
pydantic-field
¶The form factor of the cell, e.g., coin, pouch, in situ or otherwise.
cell_format_description: str
pydantic-field
¶Additional human-readable description of the cell form factor, e.g., 18650, AMPIX, CAMPIX
cell_preparation_description: str
pydantic-field
¶characteristic_mass: float
pydantic-field
¶The characteristic mass of the cell in milligrams. Can be used to normalize capacities.
characteristic_chemical_formula: str
pydantic-field
¶The chemical formula of the active material. Can be used to calculated molar mass in g/mol for normalizing capacities.
characteristic_molar_mass: float
pydantic-field
¶The molar mass of the active material, in g/mol. Will be inferred from the chemical formula, or can be supplied if it cannot be supplied
positive_electrode: List[pydatalab.models.cells.CellComponent]
pydantic-field
¶negative_electrode: List[pydatalab.models.cells.CellComponent]
pydantic-field
¶electrolyte: List[pydatalab.models.cells.CellComponent]
pydantic-field
¶active_ion_charge: float
pydantic-field
¶set_molar_mass(v, values)
classmethod
¶Source code in pydatalab/models/cells.py
@validator("characteristic_molar_mass", always=True, pre=True)
def set_molar_mass(cls, v, values):
from periodictable import formula
if not v:
chemical_formula = values.get("characteristic_chemical_formula")
if chemical_formula:
try:
return formula(chemical_formula).mass
except Exception:
return None
return v
add_missing_electrode_relationships(values)
classmethod
¶Add any missing sample synthesis constituents to parent relationships
Source code in pydatalab/models/cells.py
@root_validator
def add_missing_electrode_relationships(cls, values):
"""Add any missing sample synthesis constituents to parent relationships"""
from pydatalab.models.relationships import RelationshipType, TypedRelationship
existing_parthood_relationship_ids = set()
if values.get("relationships") is not None:
existing_parthood_relationship_ids = set(
relationship.item_id
for relationship in values["relationships"]
if relationship.relation == RelationshipType.PARTHOOD
)
else:
values["relationships"] = []
for component in ("positive_electrode", "negative_electrode", "electrolyte"):
for constituent in values.get(component, []):
if (
isinstance(constituent.item, EntryReference)
and constituent.item.item_id not in existing_parthood_relationship_ids
):
relationship = TypedRelationship(
relation=RelationshipType.PARTHOOD,
item_id=constituent.item.item_id,
type=constituent.item.type,
description="Is a constituent of",
)
values["relationships"].append(relationship)
return values
collections
¶
Collection (Entry, HasOwner, HasBlocks)
pydantic-model
¶Source code in pydatalab/models/collections.py
class Collection(Entry, HasOwner, HasBlocks):
type: str = Field("collections", const="collections", pattern="^collections$")
collection_id: HumanReadableIdentifier = Field(None)
"""A short human-readable/usable name for the collection."""
title: Optional[str]
"""A descriptive title for the collection."""
description: Optional[str]
"""A description of the collection, either in plain-text or a markup language."""
num_items: Optional[int] = Field(None)
"""Inlined number of items associated with this collection."""
@root_validator
def check_ids(cls, values):
if not any(values.get(k) is not None for k in ("collection_id", "immutable_id")):
raise ValueError("Collection must have at least collection_id or immutable_id")
return values
__slots__
special
¶collection_id: HumanReadableIdentifier
pydantic-field
¶title: str
pydantic-field
¶description: str
pydantic-field
¶num_items: int
pydantic-field
¶check_ids(values)
classmethod
¶Source code in pydatalab/models/collections.py
@root_validator
def check_ids(cls, values):
if not any(values.get(k) is not None for k in ("collection_id", "immutable_id")):
raise ValueError("Collection must have at least collection_id or immutable_id")
return values
entries
¶
Entry (BaseModel, ABC)
pydantic-model
¶An Entry is an abstract base class for any model that can be deserialized and stored in the database.
Source code in pydatalab/models/entries.py
class Entry(BaseModel, abc.ABC):
"""An Entry is an abstract base class for any model that can be
deserialized and stored in the database.
"""
type: str
"""The resource type of the entry."""
immutable_id: PyObjectId = Field(
None,
title="Immutable ID",
alias="_id",
)
"""The immutable database ID of the entry."""
last_modified: Optional[IsoformatDateTime] = None
"""The timestamp at which the entry was last modified."""
relationships: Optional[List[TypedRelationship]] = None
"""A list of related entries and their types."""
@root_validator(pre=True)
def check_id_names(cls, values):
"""Slightly upsetting hack: this case *should* be covered by the pydantic setting for
populating fields by alias names.
"""
if "_id" in values:
values["immutable_id"] = values.pop("_id")
return values
def to_reference(self, additional_fields: Optional[List[str]] = None) -> "EntryReference":
"""Populate an EntryReference model from this entry, selecting additional fields to inline.
Parameters:
additional_fields: A list of fields to inline in the reference.
"""
if additional_fields is None:
additional_fields = []
data = {
"type": self.type,
"item_id": getattr(self, "item_id", None),
"immutable_id": getattr(self, "immutable_id", None),
}
data.update({field: getattr(self, field, None) for field in additional_fields})
return EntryReference(**data)
class Config:
allow_population_by_field_name = True
json_encoders = JSON_ENCODERS
extra = "ignore"
__slots__
special
¶type: str
pydantic-field
required
¶immutable_id: PyObjectId
pydantic-field
¶last_modified: IsoformatDateTime
pydantic-field
¶relationships: List[pydatalab.models.relationships.TypedRelationship]
pydantic-field
¶check_id_names(values)
classmethod
¶Slightly upsetting hack: this case should be covered by the pydantic setting for populating fields by alias names.
Source code in pydatalab/models/entries.py
@root_validator(pre=True)
def check_id_names(cls, values):
"""Slightly upsetting hack: this case *should* be covered by the pydantic setting for
populating fields by alias names.
"""
if "_id" in values:
values["immutable_id"] = values.pop("_id")
return values
to_reference(self, additional_fields: Optional[List[str]] = None) -> EntryReference
¶Populate an EntryReference model from this entry, selecting additional fields to inline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
additional_fields |
Optional[List[str]] |
A list of fields to inline in the reference. |
None |
Source code in pydatalab/models/entries.py
def to_reference(self, additional_fields: Optional[List[str]] = None) -> "EntryReference":
"""Populate an EntryReference model from this entry, selecting additional fields to inline.
Parameters:
additional_fields: A list of fields to inline in the reference.
"""
if additional_fields is None:
additional_fields = []
data = {
"type": self.type,
"item_id": getattr(self, "item_id", None),
"immutable_id": getattr(self, "immutable_id", None),
}
data.update({field: getattr(self, field, None) for field in additional_fields})
return EntryReference(**data)
files
¶
File (Entry)
pydantic-model
¶A model for representing a file that has been tracked or uploaded to datalab.
Source code in pydatalab/models/files.py
class File(Entry):
"""A model for representing a file that has been tracked or uploaded to datalab."""
type: str = Field("files", const="files", pattern="^files$")
size: Optional[int] = Field(description="The size of the file on disk in bytes.")
last_modified_remote: Optional[IsoformatDateTime] = Field(
description="The last date/time at which the remote file was modified."
)
item_ids: List[str] = Field(description="A list of item IDs associated with this file.")
blocks: List[str] = Field(description="A list of block IDs associated with this file.")
name: str = Field(description="The filename on disk.")
extension: str = Field(description="The file extension that the file was uploaded with.")
original_name: Optional[str] = Field(description="The raw filename as uploaded.")
location: Optional[str] = Field(description="The location of the file on disk.")
url_path: Optional[str] = Field(description="The path to a remote file.")
source: Optional[str] = Field(
description="The source of the file, e.g. 'remote' or 'uploaded'."
)
time_added: datetime.datetime = Field(description="The timestamp for the original file upload.")
metadata: Optional[Dict[Any, Any]] = Field(description="Any additional metadata.")
representation: Optional[Any] = Field()
source_server_name: Optional[str] = Field(
description="The server name at which the file is stored."
)
source_path: Optional[str] = Field(description="The path to the file on the remote resource.")
is_live: bool = Field(
description="Whether or not the file should be watched for future updates."
)
__slots__
special
¶size: int
pydantic-field
¶The size of the file on disk in bytes.
last_modified_remote: IsoformatDateTime
pydantic-field
¶The last date/time at which the remote file was modified.
item_ids: List[str]
pydantic-field
required
¶A list of item IDs associated with this file.
blocks: List[str]
pydantic-field
required
¶A list of block IDs associated with this file.
name: str
pydantic-field
required
¶The filename on disk.
extension: str
pydantic-field
required
¶The file extension that the file was uploaded with.
original_name: str
pydantic-field
¶The raw filename as uploaded.
location: str
pydantic-field
¶The location of the file on disk.
url_path: str
pydantic-field
¶The path to a remote file.
source: str
pydantic-field
¶The source of the file, e.g. 'remote' or 'uploaded'.
time_added: datetime
pydantic-field
required
¶The timestamp for the original file upload.
metadata: Dict[Any, Any]
pydantic-field
¶Any additional metadata.
representation: Any
pydantic-field
¶source_server_name: str
pydantic-field
¶The server name at which the file is stored.
source_path: str
pydantic-field
¶The path to the file on the remote resource.
is_live: bool
pydantic-field
required
¶Whether or not the file should be watched for future updates.
items
¶
Item (Entry, HasOwner, HasRevisionControl, IsCollectable, HasBlocks, ABC)
pydantic-model
¶The generic model for data types that will be exposed with their own named endpoints.
Source code in pydatalab/models/items.py
class Item(Entry, HasOwner, HasRevisionControl, IsCollectable, HasBlocks, abc.ABC):
"""The generic model for data types that will be exposed with their own named endpoints."""
refcode: Refcode = None # type: ignore
"""A globally unique immutable ID comprised of the deployment prefix (e.g., `grey`)
and a locally unique string, ideally created with some consistent scheme.
"""
item_id: HumanReadableIdentifier
"""A locally unique, human-readable identifier for the entry. This ID is mutable."""
description: Optional[str]
"""A description of the item, either in plain-text or a markup language."""
date: Optional[IsoformatDateTime]
"""A relevant 'creation' timestamp for the entry (e.g., purchase date, synthesis date)."""
name: Optional[str]
"""An optional human-readable/usable name for the entry."""
files: Optional[List[File]]
"""Any files attached to this sample."""
file_ObjectIds: List[PyObjectId] = Field([])
"""Links to object IDs of files stored within the database."""
@validator("refcode", pre=True, always=True)
def refcode_validator(cls, v):
"""Generate a refcode if not provided; check that the refcode has the correct prefix if provided."""
from pydatalab.config import CONFIG
if v and not v.startswith(f"{CONFIG.IDENTIFIER_PREFIX}:"):
raise ValueError(f"refcode missing prefix {CONFIG.IDENTIFIER_PREFIX!r}")
return v
__slots__
special
¶refcode: Refcode
pydantic-field
¶item_id: HumanReadableIdentifier
pydantic-field
required
¶description: str
pydantic-field
¶date: IsoformatDateTime
pydantic-field
¶name: str
pydantic-field
¶files: List[pydatalab.models.files.File]
pydantic-field
¶file_ObjectIds: List[pydatalab.models.utils.PyObjectId]
pydantic-field
¶refcode_validator(v)
classmethod
¶Generate a refcode if not provided; check that the refcode has the correct prefix if provided.
Source code in pydatalab/models/items.py
@validator("refcode", pre=True, always=True)
def refcode_validator(cls, v):
"""Generate a refcode if not provided; check that the refcode has the correct prefix if provided."""
from pydatalab.config import CONFIG
if v and not v.startswith(f"{CONFIG.IDENTIFIER_PREFIX}:"):
raise ValueError(f"refcode missing prefix {CONFIG.IDENTIFIER_PREFIX!r}")
return v
people
¶
IdentityType (str, Enum)
¶
Identity (BaseModel)
pydantic-model
¶A model for identities that can be provided by external systems and associated with a given user.
Source code in pydatalab/models/people.py
class Identity(BaseModel):
"""A model for identities that can be provided by external systems
and associated with a given user.
"""
identity_type: IdentityType
"""The type or provider of the identity."""
identifier: str
"""The identifier for the identity, e.g., an email address, an ORCID, a GitHub user ID."""
name: str
"""The name associated with the identity to be exposed in free-text searches over people, e.g., an institutional username, a GitHub username."""
verified: bool = Field(False)
"""Whether the identity has been verified (by some means, e.g., OAuth2 or email)"""
display_name: Optional[str]
"""The user's display name associated with the identity, also to be exposed in free text searches."""
@validator("name", pre=True, always=True)
def add_missing_name(cls, v, values):
"""If the identity is created without a free-text 'name', then
for certain providers, populate this field so that it can appear
in the free text index, e.g., an ORCID, or an institutional username
from an email address.
"""
if v is None:
if values["identity_type"] == IdentityType.ORCID:
return values["identifier"]
if values["identity_type"] == IdentityType.EMAIL:
return values["identifier"].split("@")[0]
return v
@validator("verified", pre=True, always=True)
def add_missing_verification(cls, v):
"""Fills in missing value for `verified` if not given."""
if not v:
v = False
return v
identity_type: IdentityType
pydantic-field
required
¶identifier: str
pydantic-field
required
¶name: str
pydantic-field
required
¶verified: bool
pydantic-field
¶display_name: str
pydantic-field
¶add_missing_name(v, values)
classmethod
¶If the identity is created without a free-text 'name', then for certain providers, populate this field so that it can appear in the free text index, e.g., an ORCID, or an institutional username from an email address.
Source code in pydatalab/models/people.py
@validator("name", pre=True, always=True)
def add_missing_name(cls, v, values):
"""If the identity is created without a free-text 'name', then
for certain providers, populate this field so that it can appear
in the free text index, e.g., an ORCID, or an institutional username
from an email address.
"""
if v is None:
if values["identity_type"] == IdentityType.ORCID:
return values["identifier"]
if values["identity_type"] == IdentityType.EMAIL:
return values["identifier"].split("@")[0]
return v
add_missing_verification(v)
classmethod
¶Fills in missing value for verified
if not given.
Source code in pydatalab/models/people.py
@validator("verified", pre=True, always=True)
def add_missing_verification(cls, v):
"""Fills in missing value for `verified` if not given."""
if not v:
v = False
return v
Person (Entry)
pydantic-model
¶A model that describes an individual and their digital identities.
Source code in pydatalab/models/people.py
class Person(Entry):
"""A model that describes an individual and their digital identities."""
type: str = Field("people", const=True)
"""The entry type as a string."""
identities: List[Identity] = Field(default_factory=list)
"""A list of identities attached to this person, e.g., email addresses, OAuth accounts."""
display_name: Optional[str]
"""The user-chosen display name."""
contact_email: Optional[EmailStr]
"""In the case of multiple *verified* email identities, this email will be used as the primary contact."""
@validator("type", pre=True, always=True)
def add_missing_type(cls, v):
"""Fill in missing `type` field if not provided."""
if v is None:
v = "people"
return v
@validator("type", pre=True)
def set_default_type(cls, _):
return "people"
@staticmethod
def new_user_from_identity(
identity: Identity, use_display_name: bool = True, use_contact_email: bool = True
) -> "Person":
"""Create a new `Person` object with the given identity.
Arguments:
identity: The identity to populate the `identities` field with.
use_display_name: Whether to set the top-level `display_name`
field with any display name present in the identity.
use_contact_email: If the identity provided is an email address,
this argument decides whether to populate the top-level
`contact_email` field with the address of this identity.
Returns:
A `Person` object with only the provided identity.
"""
user_id = bson.ObjectId()
display_name = None
if use_display_name:
display_name = identity.display_name
contact_email = None
if use_contact_email and identity.identity_type is IdentityType.EMAIL:
contact_email = identity.identifier
return Person(
immutable_id=user_id,
identities=[identity],
display_name=display_name,
contact_email=contact_email,
)
__slots__
special
¶identities: List[pydatalab.models.people.Identity]
pydantic-field
¶display_name: str
pydantic-field
¶contact_email: EmailStr
pydantic-field
¶add_missing_type(v)
classmethod
¶Fill in missing type
field if not provided.
Source code in pydatalab/models/people.py
@validator("type", pre=True, always=True)
def add_missing_type(cls, v):
"""Fill in missing `type` field if not provided."""
if v is None:
v = "people"
return v
set_default_type(_)
classmethod
¶Source code in pydatalab/models/people.py
@validator("type", pre=True)
def set_default_type(cls, _):
return "people"
new_user_from_identity(identity: Identity, use_display_name: bool = True, use_contact_email: bool = True) -> Person
staticmethod
¶Create a new Person
object with the given identity.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
identity |
Identity |
The identity to populate the |
required |
use_display_name |
bool |
Whether to set the top-level |
True |
use_contact_email |
bool |
If the identity provided is an email address,
this argument decides whether to populate the top-level
|
True |
Returns:
Type | Description |
---|---|
Person |
A |
Source code in pydatalab/models/people.py
@staticmethod
def new_user_from_identity(
identity: Identity, use_display_name: bool = True, use_contact_email: bool = True
) -> "Person":
"""Create a new `Person` object with the given identity.
Arguments:
identity: The identity to populate the `identities` field with.
use_display_name: Whether to set the top-level `display_name`
field with any display name present in the identity.
use_contact_email: If the identity provided is an email address,
this argument decides whether to populate the top-level
`contact_email` field with the address of this identity.
Returns:
A `Person` object with only the provided identity.
"""
user_id = bson.ObjectId()
display_name = None
if use_display_name:
display_name = identity.display_name
contact_email = None
if use_contact_email and identity.identity_type is IdentityType.EMAIL:
contact_email = identity.identifier
return Person(
immutable_id=user_id,
identities=[identity],
display_name=display_name,
contact_email=contact_email,
)
relationships
¶
RelationshipType (str, Enum)
¶An enumeration of the possible types of relationship between two entries.
classDiagram
class entryC
entryC --|> entryA: parent
entryC ..|> entryD
entryA <..> entryD: sibling
entryA --|> entryB : child
Source code in pydatalab/models/relationships.py
class RelationshipType(str, Enum):
"""An enumeration of the possible types of relationship between two entries.
```mermaid
classDiagram
class entryC
entryC --|> entryA: parent
entryC ..|> entryD
entryA <..> entryD: sibling
entryA --|> entryB : child
```
"""
PARENT = "parent"
CHILD = "child"
SIBLING = "sibling"
PARTHOOD = "is_part_of"
OTHER = "other"
TypedRelationship (BaseModel)
pydantic-model
¶Source code in pydatalab/models/relationships.py
class TypedRelationship(BaseModel):
description: Optional[str] = Field(
None,
description="A description of the relationship.",
)
relation: Optional[RelationshipType] = Field(
None,
description="The type of relationship between the two items. If the type is 'other', then a human-readable description should be provided.",
)
type: KnownType = Field(description="The type of the related resource.")
immutable_id: Optional[PyObjectId] = Field(
description="The immutable ID of the entry that is related to this entry."
)
item_id: Optional[HumanReadableIdentifier] = Field(
description="The ID of the entry that is related to this entry."
)
refcode: Optional[Refcode] = Field(
description="The refcode of the entry that is related to this entry."
)
@validator("relation")
def check_for_description(cls, v, values):
if v == RelationshipType.OTHER and values.get("description") is None:
raise ValueError(
f"A description must be provided if the relationship type is {RelationshipType.OTHER.value!r}."
)
return v
@root_validator
def check_id_fields(cls, values):
"""Check that only one of the possible identifier fields is provided."""
id_fields = ("immutable_id", "item_id", "refcode")
if all(values[f] is None for f in id_fields):
raise ValueError(f"Must provide at least one of {id_fields!r}")
if sum(1 for f in id_fields if values[f] is not None) > 1:
raise ValueError("Must provide only one of {id_fields!r}")
return values
description: str
pydantic-field
¶A description of the relationship.
relation: RelationshipType
pydantic-field
¶The type of relationship between the two items. If the type is 'other', then a human-readable description should be provided.
type: KnownType
pydantic-field
required
¶The type of the related resource.
immutable_id: PyObjectId
pydantic-field
¶The immutable ID of the entry that is related to this entry.
item_id: HumanReadableIdentifier
pydantic-field
¶The ID of the entry that is related to this entry.
refcode: Refcode
pydantic-field
¶The refcode of the entry that is related to this entry.
check_for_description(v, values)
classmethod
¶Source code in pydatalab/models/relationships.py
@validator("relation")
def check_for_description(cls, v, values):
if v == RelationshipType.OTHER and values.get("description") is None:
raise ValueError(
f"A description must be provided if the relationship type is {RelationshipType.OTHER.value!r}."
)
return v
check_id_fields(values)
classmethod
¶Check that only one of the possible identifier fields is provided.
Source code in pydatalab/models/relationships.py
@root_validator
def check_id_fields(cls, values):
"""Check that only one of the possible identifier fields is provided."""
id_fields = ("immutable_id", "item_id", "refcode")
if all(values[f] is None for f in id_fields):
raise ValueError(f"Must provide at least one of {id_fields!r}")
if sum(1 for f in id_fields if values[f] is not None) > 1:
raise ValueError("Must provide only one of {id_fields!r}")
return values
samples
¶
Sample (Item)
pydantic-model
¶A model for representing an experimental sample.
Source code in pydatalab/models/samples.py
class Sample(Item):
"""A model for representing an experimental sample."""
type: str = Field("samples", const="samples", pattern="^samples$")
chemform: Optional[str] = Field(example=["Na3P", "LiNiO2@C"])
"""A string representation of the chemical formula or composition associated with this sample."""
synthesis_constituents: List[Constituent] = Field([])
"""A list of references to constituent materials giving the amount and relevant inlined details of consituent items."""
synthesis_description: Optional[str]
"""Free-text details of the procedure applied to synthesise the sample"""
@root_validator
def add_missing_synthesis_relationships(cls, values):
"""Add any missing sample synthesis constituents to parent relationships"""
from pydatalab.models.relationships import RelationshipType, TypedRelationship
constituents_set = set()
if values.get("synthesis_constituents") is not None:
existing_parent_relationship_ids = set()
if values.get("relationships") is not None:
existing_parent_relationship_ids = set(
relationship.item_id or relationship.refcode
for relationship in values["relationships"]
if relationship.relation == RelationshipType.PARENT
)
else:
values["relationships"] = []
for constituent in values.get("synthesis_constituents", []):
# If this is an inline relationship, just skip it
if isinstance(constituent.item, InlineSubstance):
continue
if (
constituent.item.item_id not in existing_parent_relationship_ids
and constituent.item.refcode not in existing_parent_relationship_ids
):
relationship = TypedRelationship(
relation=RelationshipType.PARENT,
item_id=constituent.item.item_id,
type=constituent.item.type,
description="Is a constituent of",
)
values["relationships"].append(relationship)
# Accumulate all constituent IDs in a set to filter those that have been deleted
constituents_set.add(constituent.item.item_id)
# Finally, filter out any parent relationships with item that were removed
# from the synthesis constituents
values["relationships"] = [
rel
for rel in values["relationships"]
if not (
rel.item_id not in constituents_set
and rel.relation == RelationshipType.PARENT
and rel.type in ("samples", "starting_materials")
)
]
return values
__slots__
special
¶chemform: str
pydantic-field
¶synthesis_constituents: List[pydatalab.models.utils.Constituent]
pydantic-field
¶synthesis_description: str
pydantic-field
¶add_missing_synthesis_relationships(values)
classmethod
¶Add any missing sample synthesis constituents to parent relationships
Source code in pydatalab/models/samples.py
@root_validator
def add_missing_synthesis_relationships(cls, values):
"""Add any missing sample synthesis constituents to parent relationships"""
from pydatalab.models.relationships import RelationshipType, TypedRelationship
constituents_set = set()
if values.get("synthesis_constituents") is not None:
existing_parent_relationship_ids = set()
if values.get("relationships") is not None:
existing_parent_relationship_ids = set(
relationship.item_id or relationship.refcode
for relationship in values["relationships"]
if relationship.relation == RelationshipType.PARENT
)
else:
values["relationships"] = []
for constituent in values.get("synthesis_constituents", []):
# If this is an inline relationship, just skip it
if isinstance(constituent.item, InlineSubstance):
continue
if (
constituent.item.item_id not in existing_parent_relationship_ids
and constituent.item.refcode not in existing_parent_relationship_ids
):
relationship = TypedRelationship(
relation=RelationshipType.PARENT,
item_id=constituent.item.item_id,
type=constituent.item.type,
description="Is a constituent of",
)
values["relationships"].append(relationship)
# Accumulate all constituent IDs in a set to filter those that have been deleted
constituents_set.add(constituent.item.item_id)
# Finally, filter out any parent relationships with item that were removed
# from the synthesis constituents
values["relationships"] = [
rel
for rel in values["relationships"]
if not (
rel.item_id not in constituents_set
and rel.relation == RelationshipType.PARENT
and rel.type in ("samples", "starting_materials")
)
]
return values
starting_materials
¶
StartingMaterial (Item)
pydantic-model
¶A model for representing an experimental sample.
Source code in pydatalab/models/starting_materials.py
class StartingMaterial(Item):
"""A model for representing an experimental sample."""
type: str = Field(
"starting_materials", const="starting_materials", pattern="^starting_materials$"
)
barcode: Optional[str] = Field(
alias="Barcode", description="A unique barcode from ChemInventory"
)
date_acquired: Optional[IsoformatDateTime] = Field(
alias="Date Acquired", description="The date the item was acquired"
)
date_opened: Optional[IsoformatDateTime] = Field(
alias="Date opened", description="The date the container was opened"
)
CAS: Optional[str] = Field(alias="Substance CAS", description="CAS Registry Number")
chemical_purity: Optional[str] = Field(alias="Chemical purity")
full_percent: Optional[str] = Field(alias="Full %")
name: str = Field(alias="Container Name", description="name of the chemical")
size: Optional[str] = Field(
alias="Container Size", description="size of the container (see 'size_unit' for the units)"
)
size_unit: Optional[str] = Field(alias="Unit", description="units for the 'size' field.")
chemform: Optional[str] = Field(
alias="Molecular Formula",
description="A string representation of the chemical formula associated with this sample.",
)
molar_mass: Optional[float] = Field(
alias="Molecular Weight", description="Mass per formula unit, in g/mol"
)
smiles_representation: Optional[str] = Field(
alias="SMILES", description="Chemical structure in SMILES notation"
)
supplier: Optional[str] = Field(alias="Supplier", description="Manufacturer of the chemical")
location: Optional[str] = Field(
alias="Location", description="Location where chemical is stored"
)
comment: Optional[str] = Field(alias="Comments")
@validator("molar_mass")
def add_molar_mass(cls, v, values):
from periodictable import formula
if v is None and values.get("chemform"):
return formula(values.get("chemform")).mass
return v
__slots__
special
¶barcode: str
pydantic-field
¶A unique barcode from ChemInventory
date_acquired: IsoformatDateTime
pydantic-field
¶The date the item was acquired
date_opened: IsoformatDateTime
pydantic-field
¶The date the container was opened
CAS: str
pydantic-field
¶CAS Registry Number
chemical_purity: str
pydantic-field
¶full_percent: str
pydantic-field
¶size: str
pydantic-field
¶size of the container (see 'size_unit' for the units)
size_unit: str
pydantic-field
¶units for the 'size' field.
chemform: str
pydantic-field
¶A string representation of the chemical formula associated with this sample.
molar_mass: float
pydantic-field
¶Mass per formula unit, in g/mol
smiles_representation: str
pydantic-field
¶Chemical structure in SMILES notation
supplier: str
pydantic-field
¶Manufacturer of the chemical
location: str
pydantic-field
¶Location where chemical is stored
comment: str
pydantic-field
¶add_molar_mass(v, values)
classmethod
¶Source code in pydatalab/models/starting_materials.py
@validator("molar_mass")
def add_molar_mass(cls, v, values):
from periodictable import formula
if v is None and values.get("chemform"):
return formula(values.get("chemform")).mass
return v
traits
¶
HasOwner (BaseModel)
pydantic-model
¶Source code in pydatalab/models/traits.py
class HasOwner(BaseModel):
creator_ids: List[PyObjectId] = Field([])
"""The database IDs of the user(s) who created the item."""
creators: Optional[List[Person]] = Field(None)
"""Inlined info for the people associated with this item."""
HasRevisionControl (BaseModel)
pydantic-model
¶Source code in pydatalab/models/traits.py
class HasRevisionControl(BaseModel):
revision: int = 1
"""The revision number of the entry."""
revisions: Optional[Dict[int, Any]] = None
"""An optional mapping from old revision numbers to the model state at that revision."""
HasBlocks (BaseModel)
pydantic-model
¶Source code in pydatalab/models/traits.py
class HasBlocks(BaseModel):
blocks_obj: Dict[str, Any] = Field({})
"""A mapping from block ID to block data."""
display_order: List[str] = Field([])
"""The order in which to display block data in the UI."""
IsCollectable (BaseModel)
pydantic-model
¶Trait mixin for models that can be added to collections.
Source code in pydatalab/models/traits.py
class IsCollectable(BaseModel):
"""Trait mixin for models that can be
added to collections.
"""
from pydatalab.models.collections import Collection
collections: List[Collection] = Field([])
"""Inlined info for the collections associated with this item."""
@root_validator
def add_missing_collection_relationships(cls, values):
from pydatalab.models.relationships import TypedRelationship
if values.get("collections") is not None:
new_ids = set(coll.immutable_id for coll in values["collections"])
existing_collection_relationship_ids = set()
if values.get("relationships") is not None:
existing_collection_relationship_ids = set(
relationship.immutable_id
for relationship in values["relationships"]
if relationship.type == "collections"
)
else:
values["relationships"] = []
for collection in values.get("collections", []):
if collection.immutable_id not in existing_collection_relationship_ids:
relationship = TypedRelationship(
relation=None,
immutable_id=collection.immutable_id,
type="collections",
description="Is a member of",
)
values["relationships"].append(relationship)
values["relationships"] = [
d
for d in values.get("relationships", [])
if d.type != "collections" or d.immutable_id in new_ids
]
if len([d for d in values.get("relationships", []) if d.type == "collections"]) != len(
values.get("collections", [])
):
breakpoint()
raise RuntimeError("Relationships and collections mismatch")
return values
collections: List[pydatalab.models.collections.Collection]
pydantic-field
¶
Collection (Entry, HasOwner, HasBlocks)
pydantic-model
¶Source code in pydatalab/models/traits.py
class Collection(Entry, HasOwner, HasBlocks):
type: str = Field("collections", const="collections", pattern="^collections$")
collection_id: HumanReadableIdentifier = Field(None)
"""A short human-readable/usable name for the collection."""
title: Optional[str]
"""A descriptive title for the collection."""
description: Optional[str]
"""A description of the collection, either in plain-text or a markup language."""
num_items: Optional[int] = Field(None)
"""Inlined number of items associated with this collection."""
@root_validator
def check_ids(cls, values):
if not any(values.get(k) is not None for k in ("collection_id", "immutable_id")):
raise ValueError("Collection must have at least collection_id or immutable_id")
return values
__slots__
special
¶collection_id: HumanReadableIdentifier
pydantic-field
¶title: str
pydantic-field
¶description: str
pydantic-field
¶num_items: int
pydantic-field
¶check_ids(values)
classmethod
¶Source code in pydatalab/models/traits.py
@root_validator
def check_ids(cls, values):
if not any(values.get(k) is not None for k in ("collection_id", "immutable_id")):
raise ValueError("Collection must have at least collection_id or immutable_id")
return values
add_missing_collection_relationships(values)
classmethod
¶Source code in pydatalab/models/traits.py
@root_validator
def add_missing_collection_relationships(cls, values):
from pydatalab.models.relationships import TypedRelationship
if values.get("collections") is not None:
new_ids = set(coll.immutable_id for coll in values["collections"])
existing_collection_relationship_ids = set()
if values.get("relationships") is not None:
existing_collection_relationship_ids = set(
relationship.immutable_id
for relationship in values["relationships"]
if relationship.type == "collections"
)
else:
values["relationships"] = []
for collection in values.get("collections", []):
if collection.immutable_id not in existing_collection_relationship_ids:
relationship = TypedRelationship(
relation=None,
immutable_id=collection.immutable_id,
type="collections",
description="Is a member of",
)
values["relationships"].append(relationship)
values["relationships"] = [
d
for d in values.get("relationships", [])
if d.type != "collections" or d.immutable_id in new_ids
]
if len([d for d in values.get("relationships", []) if d.type == "collections"]) != len(
values.get("collections", [])
):
breakpoint()
raise RuntimeError("Relationships and collections mismatch")
return values
utils
¶
IDENTIFIER_REGEX
¶A regex that matches identifiers that are url-safe and do not contain leading or trailing punctuation.
JSON_ENCODERS
¶Mass: typing_extensions.TypeAlias
¶Volume: typing_extensions.TypeAlias
¶
ItemType (str, Enum)
¶An enumeration of the types of items known by this implementation, should be made dynamic in the future.
Source code in pydatalab/models/utils.py
class ItemType(str, Enum):
"""An enumeration of the types of items known by this implementation, should be made dynamic in the future."""
SAMPLES = "samples"
STARTING_MATERIALS = "starting_materials"
KnownType (str, Enum)
¶An enumeration of the types of entry known by this implementation, should be made dynamic in the future.
Source code in pydatalab/models/utils.py
class KnownType(str, Enum):
"""An enumeration of the types of entry known by this implementation, should be made dynamic in the future."""
SAMPLES = "samples"
STARTING_MATERIALS = "starting_materials"
BLOCKS = "blocks"
FILES = "files"
PEOPLE = "people"
COLLECTIONS = "collections"
HumanReadableIdentifier (ConstrainedStr)
¶Used to constrain human-readable and URL-safe identifiers for items.
Source code in pydatalab/models/utils.py
class HumanReadableIdentifier(ConstrainedStr):
"""Used to constrain human-readable and URL-safe identifiers for items."""
min_length = 1
max_length = 40
strip_whitespace = True
to_lower = False
strict = False
regex = IDENTIFIER_REGEX
def __init__(self, value):
self.value = parse_obj_as(type(self), value)
def __str__(self):
return self.value
def __repr__(self):
return self.value
def __bool__(self):
return bool(self.value)
max_length
¶min_length
¶regex
¶strict
¶strip_whitespace
¶to_lower
¶__init__(self, value)
special
¶Source code in pydatalab/models/utils.py
def __init__(self, value):
self.value = parse_obj_as(type(self), value)
__str__(self)
special
¶Source code in pydatalab/models/utils.py
def __str__(self):
return self.value
__repr__(self)
special
¶Source code in pydatalab/models/utils.py
def __repr__(self):
return self.value
__bool__(self)
special
¶Source code in pydatalab/models/utils.py
def __bool__(self):
return bool(self.value)
Refcode (HumanReadableIdentifier)
¶Source code in pydatalab/models/utils.py
class Refcode(HumanReadableIdentifier):
regex = r"^[a-z]{2,10}:" + IDENTIFIER_REGEX[1:]
"""A regex to match refcodes that have a lower-case prefix between 2-10 chars, followed by a colon,
and then the normal rules for an ID (url-safe etc.).
"""
@property
def prefix(self):
return self.value.split(":")[0]
@property
def identifier(self):
return self.value.split(":")[1]
UserRole (str, Enum)
¶
PintType (str)
¶A WIP attempt to create a custom pydantic field type for Pint quantities. The idea would eventually be to use TypeAlias to create physical/dimensionful pydantic fields.
Source code in pydatalab/models/utils.py
class PintType(str):
"""A WIP attempt to create a custom pydantic field type for Pint quantities.
The idea would eventually be to use TypeAlias to create physical/dimensionful pydantic fields.
"""
Q = pint.Quantity
def __init__(self, dimensions: str):
self._dimensions = dimensions
@classmethod
def __get_validators__(self):
yield self.validate
@classmethod
def validate(self, v):
q = self.Q(v)
if not q.check(self._dimensions):
raise ValueError("Value {v} must have dimensions of mass, not {v.dimensions}")
return q
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")
Q (DaskQuantity, NumpyQuantity, MeasurementQuantity, FormattingQuantity, NonMultiplicativeQuantity, PlainQuantity)
¶__init__(self, dimensions: str)
special
¶Source code in pydatalab/models/utils.py
def __init__(self, dimensions: str):
self._dimensions = dimensions
__get_validators__()
classmethod
special
¶Source code in pydatalab/models/utils.py
@classmethod
def __get_validators__(self):
yield self.validate
validate(v)
classmethod
¶Source code in pydatalab/models/utils.py
@classmethod
def validate(self, v):
q = self.Q(v)
if not q.check(self._dimensions):
raise ValueError("Value {v} must have dimensions of mass, not {v.dimensions}")
return q
__modify_schema__(field_schema)
classmethod
special
¶Source code in pydatalab/models/utils.py
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")
PyObjectId (ObjectId)
¶A wrapper class for a BSON ObjectId that can be used as a Pydantic field type.
Modified from "Getting started iwth MongoDB and FastAPI": https://www.mongodb.com/developer/languages/python/python-quickstart-fastapi/.
Source code in pydatalab/models/utils.py
class PyObjectId(ObjectId):
"""A wrapper class for a BSON ObjectId that can be used as a Pydantic field type.
Modified from "Getting started iwth MongoDB and FastAPI":
https://www.mongodb.com/developer/languages/python/python-quickstart-fastapi/.
"""
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if isinstance(v, dict) and "$oid" in v:
v = v["$oid"]
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
return ObjectId(v)
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")
__get_validators__()
classmethod
special
¶Source code in pydatalab/models/utils.py
@classmethod
def __get_validators__(cls):
yield cls.validate
validate(v)
classmethod
¶Source code in pydatalab/models/utils.py
@classmethod
def validate(cls, v):
if isinstance(v, dict) and "$oid" in v:
v = v["$oid"]
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
return ObjectId(v)
__modify_schema__(field_schema)
classmethod
special
¶Source code in pydatalab/models/utils.py
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")
IsoformatDateTime (datetime)
¶A datetime container that is more flexible than the pydantic default.
Source code in pydatalab/models/utils.py
class IsoformatDateTime(datetime.datetime):
"""A datetime container that is more flexible than the pydantic default."""
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if isinstance(v, str):
if v in ["0", " "]:
return None
return datetime.datetime.fromisoformat(v)
return v
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="date")
__get_validators__()
classmethod
special
¶Source code in pydatalab/models/utils.py
@classmethod
def __get_validators__(cls):
yield cls.validate
validate(v)
classmethod
¶Source code in pydatalab/models/utils.py
@classmethod
def validate(cls, v):
if isinstance(v, str):
if v in ["0", " "]:
return None
return datetime.datetime.fromisoformat(v)
return v
__modify_schema__(field_schema)
classmethod
special
¶Source code in pydatalab/models/utils.py
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="date")
RefCodeFactory
¶Source code in pydatalab/models/utils.py
class RefCodeFactory:
refcode_generator: Callable
@classmethod
def generate(self):
from pydatalab.config import CONFIG
return f"{CONFIG.IDENTIFIER_PREFIX}:{self.refcode_generator()}"
generate()
classmethod
¶Source code in pydatalab/models/utils.py
@classmethod
def generate(self):
from pydatalab.config import CONFIG
return f"{CONFIG.IDENTIFIER_PREFIX}:{self.refcode_generator()}"
RandomAlphabeticalRefcodeFactory (RefCodeFactory)
¶Source code in pydatalab/models/utils.py
class RandomAlphabeticalRefcodeFactory(RefCodeFactory):
refcode_generator = partial(random_uppercase, length=6)
refcode_generator: Callable
¶
InlineSubstance (BaseModel)
pydantic-model
¶
EntryReference (BaseModel)
pydantic-model
¶A reference to a database entry by ID and type.
Can include additional arbitarary metadata useful for inlining the item data.
Source code in pydatalab/models/utils.py
class EntryReference(BaseModel):
"""A reference to a database entry by ID and type.
Can include additional arbitarary metadata useful for
inlining the item data.
"""
type: str
name: Optional[str]
immutable_id: Optional[PyObjectId]
item_id: Optional[HumanReadableIdentifier]
refcode: Optional[Refcode]
@root_validator
def check_id_fields(cls, values):
"""Check that only one of the possible identifier fields is provided."""
id_fields = ("immutable_id", "item_id", "refcode")
# Temporarily remove refcodes from the list of fields to check
# until it is fully implemented
if values.get("refcode") is not None:
values["refcode"] = None
if all(values.get(f) is None for f in id_fields):
raise ValueError(f"Must provide at least one of {id_fields!r}")
if sum(1 for f in id_fields if values.get(f) is not None) > 1:
raise ValueError("Must provide only one of {id_fields!r}")
return values
class Config:
extra = "allow"
type: str
pydantic-field
required
¶name: str
pydantic-field
¶immutable_id: PyObjectId
pydantic-field
¶item_id: HumanReadableIdentifier
pydantic-field
¶refcode: Refcode
pydantic-field
¶check_id_fields(values)
classmethod
¶Check that only one of the possible identifier fields is provided.
Source code in pydatalab/models/utils.py
@root_validator
def check_id_fields(cls, values):
"""Check that only one of the possible identifier fields is provided."""
id_fields = ("immutable_id", "item_id", "refcode")
# Temporarily remove refcodes from the list of fields to check
# until it is fully implemented
if values.get("refcode") is not None:
values["refcode"] = None
if all(values.get(f) is None for f in id_fields):
raise ValueError(f"Must provide at least one of {id_fields!r}")
if sum(1 for f in id_fields if values.get(f) is not None) > 1:
raise ValueError("Must provide only one of {id_fields!r}")
return values
Constituent (BaseModel)
pydantic-model
¶A constituent of a sample.
Source code in pydatalab/models/utils.py
class Constituent(BaseModel):
"""A constituent of a sample."""
item: Union[EntryReference, InlineSubstance]
"""A reference to item (sample or starting material) entry for the constituent substance."""
quantity: Optional[float] = Field(..., ge=0)
"""The amount of the constituent material used to create the sample."""
unit: str = Field("g")
"""The unit symbol for the value provided in `quantity`, default is mass
in grams (g) but could also refer to volumes (mL, L, etc.) or moles (mol).
"""
@validator("item")
def check_itemhood(cls, v):
"""Check that the reference within the constituent is to an item type."""
if "type" in (v.value for v in ItemType):
raise ValueError(f"`type` must be one of {ItemType!r}")
return v
@validator("item", pre=True, always=True)
def coerce_reference(cls, v):
if isinstance(v, dict):
id = v.pop("item_id", None)
if id:
return EntryReference(item_id=id, **v)
else:
name = v.pop("name", "")
chemform = v.pop("chemform", None)
if not name:
raise ValueError("Inline substance must have a name!")
return InlineSubstance(name=name, chemform=chemform)
return v
item: Union[pydatalab.models.utils.EntryReference, pydatalab.models.utils.InlineSubstance]
pydantic-field
required
¶quantity: ConstrainedFloatValue
pydantic-field
required
¶unit: str
pydantic-field
¶check_itemhood(v)
classmethod
¶Check that the reference within the constituent is to an item type.
Source code in pydatalab/models/utils.py
@validator("item")
def check_itemhood(cls, v):
"""Check that the reference within the constituent is to an item type."""
if "type" in (v.value for v in ItemType):
raise ValueError(f"`type` must be one of {ItemType!r}")
return v
coerce_reference(v)
classmethod
¶Source code in pydatalab/models/utils.py
@validator("item", pre=True, always=True)
def coerce_reference(cls, v):
if isinstance(v, dict):
id = v.pop("item_id", None)
if id:
return EntryReference(item_id=id, **v)
else:
name = v.pop("name", "")
chemform = v.pop("chemform", None)
if not name:
raise ValueError("Inline substance must have a name!")
return InlineSubstance(name=name, chemform=chemform)
return v
random_uppercase(length: int = 6)
¶Source code in pydatalab/models/utils.py
def random_uppercase(length: int = 6):
return "".join(random.choices(string.ascii_uppercase, k=length))
generate_unique_refcode()
¶Generates a unique refcode for an item using the configured convention.
Source code in pydatalab/models/utils.py
def generate_unique_refcode():
"""Generates a unique refcode for an item using the configured convention."""
from pydatalab.config import CONFIG
from pydatalab.mongo import get_database
refcode = f"{CONFIG.REFCODE_GENERATOR.generate()}"
try:
while get_database().items.find_one({"refcode": refcode}):
refcode = f"{CONFIG.IDENTIFIER_PREFIX}:{CONFIG.REFCODE_GENERATOR.generate()}"
except Exception as exc:
raise RuntimeError(f"Cannot check refcode for uniqueness: {exc}")
return refcode
mongo
¶
Attributes¶
flask_mongo
¶
This is the primary database interface used by the Flask app.
Functions¶
insert_pydantic_model_fork_safe(model: BaseModel, collection: str) -> str
¶
Inserts a Pydantic model into chosen collection, returning the inserted ID.
Source code in pydatalab/mongo.py
def insert_pydantic_model_fork_safe(model: BaseModel, collection: str) -> str:
"""Inserts a Pydantic model into chosen collection, returning the inserted ID."""
return (
get_database()[collection]
.insert_one(model.dict(by_alias=True, exclude_none=True))
.inserted_id
)
get_database() -> Database
¶
Returns the configured database.
Source code in pydatalab/mongo.py
def get_database() -> pymongo.database.Database:
"""Returns the configured database."""
return _get_active_mongo_client().get_database()
check_mongo_connection() -> None
¶
Checks that the configured MongoDB is available and returns a
pymongo.MongoClient
for the configured MONGO_URI
.
Source code in pydatalab/mongo.py
def check_mongo_connection() -> None:
"""Checks that the configured MongoDB is available and returns a
`pymongo.MongoClient` for the configured `MONGO_URI`.
Raises:
RuntimeError:
If the configured MongoDB is not available.
"""
try:
cli = _get_active_mongo_client()
cli.list_database_names()
except Exception as exc:
raise RuntimeError from exc
create_default_indices(client: Optional[pymongo.mongo_client.MongoClient] = None, background: bool = False) -> List[str]
¶
Creates indices for the configured or passed MongoClient.
Indexes created are:
- A text index over all string fields in item models,
- An index over item type,
- A unique index over item_id
and refcode
.
- A text index over user names and identities.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
background |
bool |
If true, indexes will be created as background jobs. |
False |
Returns:
Type | Description |
---|---|
List[str] |
A list of messages returned by each |
Source code in pydatalab/mongo.py
def create_default_indices(
client: Optional[pymongo.MongoClient] = None,
background: bool = False,
) -> List[str]:
"""Creates indices for the configured or passed MongoClient.
Indexes created are:
- A text index over all string fields in item models,
- An index over item type,
- A unique index over `item_id` and `refcode`.
- A text index over user names and identities.
Parameters:
background: If true, indexes will be created as background jobs.
Returns:
A list of messages returned by each `create_index` call.
"""
from pydatalab.logger import LOGGER
from pydatalab.models import ITEM_MODELS
if client is None:
client = _get_active_mongo_client()
db = client.get_database()
item_fts_fields = set()
for model in ITEM_MODELS:
schema = ITEM_MODELS[model].schema()
for f in schema["properties"]:
if schema["properties"][f].get("type") == "string":
item_fts_fields.add(f)
def create_or_recreate_text_index(collection, fields, weights):
fts_index_name = f"{collection.name} full-text search"
def create_fts():
return collection.create_index(
[(k, pymongo.TEXT) for k in fields],
name=fts_index_name,
weights=weights,
)
try:
return create_fts()
except pymongo.errors.OperationFailure:
collection.drop_index(fts_index_name)
return create_fts()
ret = []
ret += create_or_recreate_text_index(
db.items,
item_fts_fields,
weights={"refcode": 3, "item_id": 3, "name": 3, "chemform": 3},
)
ret += create_or_recreate_text_index(
db.collections,
["collection_id", "title", "description"],
weights={"collection_id": 3, "title": 3, "description": 3},
)
ret += db.items.create_index("type", name="item type", background=background)
ret += db.items.create_index(
"item_id", unique=True, name="unique item ID", background=background
)
ret += db.items.create_index(
"refcode", unique=True, name="unique refcode", background=background
)
ret += db.items.create_index("last_modified", name="last modified", background=background)
user_fts_fields = {"identities.name", "display_name"}
ret += db.users.create_index(
[
("identities.identifier", pymongo.ASCENDING),
("identities.identity_type", pymongo.ASCENDING),
],
unique=True,
name="unique user identifiers",
background=background,
)
try:
ret += db.users.create_index(
[(k, pymongo.TEXT) for k in user_fts_fields],
name="user identities full-text search",
background=background,
)
except Exception as exc:
LOGGER.warning("Failed to create text index: %s", exc)
return ret
nmr_utils
¶
Functions¶
read_bruker_1d(data, process_number = 1, verbose = True, sample_mass_mg = None)
¶
Read a 1D bruker nmr spectrum and return it as a df.
data: The directory of the full bruker data file. You may also supply a df as this argument. In this case, the df is returned as is. process_number: The process number of the processed data you want to plot [default 1] verbose: Whether to print information such as the spectrum title to stdout (default True) sample_mass_mg: The (optional) sample mass. If provided, the resulting DataFrame will have a "intensity_per_scan_per_gram" column.
Source code in pydatalab/nmr_utils.py
def read_bruker_1d(data, process_number=1, verbose=True, sample_mass_mg=None):
"""Read a 1D bruker nmr spectrum and return it as a df.
arguments:
data: The directory of the full bruker data file. You may also supply a df as this argument. In this case, the df is returned as is.
process_number: The process number of the processed data you want to plot [default 1]
verbose: Whether to print information such as the spectrum title to stdout (default True)
sample_mass_mg: The (optional) sample mass. If provided, the resulting DataFrame will have a "intensity_per_scan_per_gram" column.
"""
# if df is provided, just return it as-is. This functionality is provided to make functions calling read_bruker_1d flexible by default.
# Either the data directory or the already-processed df can always be provided with equivalent results.
if type(data) == pd.core.frame.DataFrame:
if verbose:
print("data frame provided to read_bruker_1d(). Returning it as is.")
return data
else:
data_dir = data
processed_data_dir = os.path.join(data_dir, "pdata", str(process_number))
a_dic, a_data = ng.fileio.bruker.read(data_dir) # aquisition_data
p_dic, p_data = ng.fileio.bruker.read_pdata(processed_data_dir) # processing data
try:
with open(os.path.join(processed_data_dir, "title"), "r") as f:
topspin_title = f.read()
except FileNotFoundError:
topspin_title = None
if len(p_data.shape) > 1:
print("data is more than one dimensional - read failed")
return None, a_dic, topspin_title, p_data.shape
nscans = a_dic["acqus"]["NS"]
# create a unit convertor to get the x-axis in ppm units
udic = ng.bruker.guess_udic(p_dic, p_data)
uc = ng.fileiobase.uc_from_udic(udic)
ppm_scale = uc.ppm_scale()
hz_scale = uc.hz_scale()
df = pd.DataFrame(
{
"ppm": ppm_scale,
"hz": hz_scale,
"intensity": p_data,
"intensity_per_scan": p_data / nscans,
}
)
if sample_mass_mg:
df["intensity_per_scan_per_gram"] = df["intensity_per_scan"] / sample_mass_mg * 1000.0
if verbose:
print(f"reading bruker data file. {udic[0]['label']} 1D spectrum, {nscans} scans.")
if sample_mass_mg:
print(
f'sample mass was provided: {sample_mass_mg:f} mg. "intensity_per_scan_per_gram" column included. '
)
if topspin_title:
print("\nTitle:\n")
print(topspin_title)
else:
print("No title found in scan")
return df, a_dic, topspin_title, a_data.shape
read_topspin_txt(filename, sample_mass_mg = None, nscans = None)
¶
Source code in pydatalab/nmr_utils.py
def read_topspin_txt(filename, sample_mass_mg=None, nscans=None):
MAX_HEADER_LINES = 10
LEFTRIGHT_REGEX = r"# LEFT = (-?\d+\.\d+) ppm. RIGHT = (-?\d+\.\d+) ppm\."
SIZE_REGEX = r"SIZE = (\d+)"
with open(filename, "r") as f:
header = "".join(itertools.islice(f, MAX_HEADER_LINES)) # read the first 10 lines
# print(header)
leftright_match = re.search(LEFTRIGHT_REGEX, header)
if not leftright_match:
raise ValueError("Header improperly formatted. Could not find LEFT and/or RIGHT values")
left = float(leftright_match.group(1))
right = float(leftright_match.group(2))
size_match = re.search(SIZE_REGEX, header)
if not size_match:
raise ValueError("Header improperly formatter. Could not find SIZE value")
size = int(size_match.group(1))
intensity = np.genfromtxt(filename, comments="#")
assert len(intensity) == size, "length of intensities does not match I"
data = {
"ppm": np.linspace(left, right, size),
"intensity": intensity,
"I_norm": (intensity - intensity.min()) / (intensity.max() - intensity.min()),
}
if sample_mass_mg and nscans:
data["I_per_g_per_scan"] = intensity / float(sample_mass_mg) / float(nscans) * 1000
df = pd.DataFrame(data)
return df
integrate_1d(data, process_number = 1, sample_mass_mg = None, left = None, right = None, plot = False, verbose = False)
¶
Source code in pydatalab/nmr_utils.py
def integrate_1d(
data,
process_number=1,
sample_mass_mg=None,
left=None,
right=None,
plot=False,
verbose=False,
):
intensity_cols = ["intensity", "intensity_per_scan", "intensity_per_scan_per_gram"]
df = read_bruker_1d(
data, process_number=process_number, sample_mass_mg=sample_mass_mg, verbose=verbose
)
if left:
df = df[df.ppm >= left]
if right:
df = df[df.ppm <= right]
if plot:
plt.plot(df.ppm, df.intensity, "-")
plt.plot([left, right], [0, 0], "k-", zorder=-1)
plt.xlim(left, right)
plt.show()
integrated_intensities = pd.Series()
for c in intensity_cols:
if c not in df:
integrated_intensities[c] = None
continue
integrated_intensities[c] = -1 * integrate.trapz(df[c], df.ppm)
return integrated_intensities
remote_filesystems
¶
Functions¶
get_directory_structures(directories: List[Dict[str, str]], invalidate_cache: Optional[bool] = None, parallel: bool = True) -> List[Dict[str, Any]]
¶
For all registered top-level directories, call tree either locally or remotely to get their directory structures, or access the cached data for that directory, if it is available and fresh.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
directories |
List[Dict[str, str]] |
The directories to scan. |
required |
invalidate_cache |
Optional[bool] |
If true, then the cached directory structure will
be reset, provided the cache was not updated very recently. If |
None |
parallel |
bool |
If true, run each remote scraper in a new process. |
True |
Returns:
Type | Description |
---|---|
List[Dict[str, Any]] |
A lists of dictionaries for each specified top-level directory. |
Source code in pydatalab/remote_filesystems.py
def get_directory_structures(
directories: List[Dict[str, str]],
invalidate_cache: Optional[bool] = None,
parallel: bool = True,
) -> List[Dict[str, Any]]:
"""For all registered top-level directories, call tree either
locally or remotely to get their directory structures, or access
the cached data for that directory, if it is available and fresh.
Args:
directories: The directories to scan.
invalidate_cache: If true, then the cached directory structure will
be reset, provided the cache was not updated very recently. If `False`,
the cache will not be reset, even if it is older than the maximum configured
age.
parallel: If true, run each remote scraper in a new process.
Returns:
A lists of dictionaries for each specified top-level directory.
"""
if not directories:
return []
if parallel:
return multiprocessing.Pool(max(min(len(directories), 8), 1)).map(
functools.partial(
get_directory_structure,
invalidate_cache=invalidate_cache,
),
directories,
)
else:
return [get_directory_structure(d, invalidate_cache=invalidate_cache) for d in directories]
get_directory_structure(directory: Dict[str, str], invalidate_cache: Optional[bool] = False) -> Dict[str, Any]
¶
For the given remote directory, either reconstruct the directory structure in full, or access the cached version if is it recent enough.
Any errors will be returned in the contents
key for a given
directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
directory |
Dict[str, str] |
A dictionary describing the directory to scan, with keys
|
required |
invalidate_cache |
Optional[bool] |
If |
False |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
A dictionary with keys "name", "type" and "contents" for the top-level directory. |
Source code in pydatalab/remote_filesystems.py
def get_directory_structure(
directory: Dict[str, str],
invalidate_cache: Optional[bool] = False,
) -> Dict[str, Any]:
"""For the given remote directory, either reconstruct the directory
structure in full, or access the cached version if is it recent
enough.
Any errors will be returned in the `contents` key for a given
directory.
Args:
directory: A dictionary describing the directory to scan, with keys
`'name'`, `'path'` and optionally `'hostname'`.
invalidate_cache: If `True`, then the cached directory structure will
be reset, provided the cache was not updated very recently. If `False`,
the cache will not be reset, even if it is older than the maximum configured
age.
Returns:
A dictionary with keys "name", "type" and "contents" for the
top-level directory.
"""
LOGGER.debug(f"Accessing directory structure of {directory}")
try:
cached_dir_structure = _get_cached_directory_structure(directory)
cache_last_updated = None
if cached_dir_structure:
cache_last_updated = cached_dir_structure["last_updated"]
cache_age = datetime.datetime.now() - cached_dir_structure["last_updated"]
if invalidate_cache and cache_age < datetime.timedelta(
minutes=CONFIG.REMOTE_CACHE_MIN_AGE
):
LOGGER.debug(
f"Not invalidating cache as its age ({cache_age=}) is less than the configured {CONFIG.REMOTE_CACHE_MIN_AGE=}."
)
# If either:
# 1) no cache for this directory,
# 2) the cache is older than the max cache age and
# `invalidate_cache` has not been explicitly set to false,
# 3) the `invalidate_cache` parameter is true, and the cache
# is older than the min age,
# then rebuild the cache.
if (
(not cached_dir_structure)
or (
invalidate_cache is not False
and cache_age > datetime.timedelta(minutes=CONFIG.REMOTE_CACHE_MAX_AGE)
)
or (
invalidate_cache
and cache_age > datetime.timedelta(minutes=CONFIG.REMOTE_CACHE_MIN_AGE)
)
):
dir_structure = _get_latest_directory_structure(
directory["path"], directory.get("hostname")
)
last_updated = _save_directory_structure(
directory,
dir_structure,
)
LOGGER.debug(
"Remote filesystems cache miss for '%s': last updated %s",
directory["name"],
cache_last_updated,
)
else:
last_updated = cached_dir_structure["last_updated"]
dir_structure = cached_dir_structure["contents"]
LOGGER.debug(
"Remote filesystems cache hit for '%s': last updated %s",
directory["name"],
last_updated,
)
except RuntimeError as exc:
dir_structure = [{"type": "error", "details": str(exc)}]
last_updated = datetime.datetime.now()
return {
"name": directory["name"],
"type": "toplevel",
"contents": dir_structure,
"last_updated": last_updated,
}
routes
special
¶
Modules¶
utils
¶
get_default_permissions(user_only: bool = True) -> Dict[str, Any]
¶Return the MongoDB query terms corresponding to the current user.
Will return open permissions if a) the CONFIG.TESTING
parameter is True
,
or b) if the current user is registered as an admin.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_only |
bool |
Whether to exclude items that also have no attached user ( |
True |
Source code in pydatalab/routes/utils.py
def get_default_permissions(user_only: bool = True) -> Dict[str, Any]:
"""Return the MongoDB query terms corresponding to the current user.
Will return open permissions if a) the `CONFIG.TESTING` parameter is `True`,
or b) if the current user is registered as an admin.
Parameters:
user_only: Whether to exclude items that also have no attached user (`False`),
i.e., public items. This should be set to `False` when reading (and wanting
to return public items), but left as `True` when modifying or removing items.
"""
if CONFIG.TESTING:
return {}
if (
current_user.is_authenticated
and current_user.person is not None
and current_user.role == UserRole.ADMIN
):
return {}
null_perm = {"creator_ids": {"$size": 0}}
if current_user.is_authenticated and current_user.person is not None:
user_perm = {"creator_ids": {"$in": [current_user.person.immutable_id]}}
if user_only:
return user_perm
return {"$or": [user_perm, null_perm]}
elif user_only:
return {"_id": -1}
return null_perm
v0_1
special
¶
BLUEPRINTS
¶ENDPOINTS: Dict[str, Callable]
¶auth
¶This module implements functionality for authenticating users via OAuth2 providers, and associating these OAuth2 identities with their local accounts.
ENDPOINTS: Dict[str, Callable]
¶KEY_LENGTH: int
¶OAUTH_BLUEPRINTS: Dict[pydatalab.models.people.IdentityType, flask.blueprints.Blueprint]
¶A dictionary of Flask blueprints corresponding to the supported OAuth2 providers.
OAUTH_PROXIES: Dict[pydatalab.models.people.IdentityType, werkzeug.local.LocalProxy]
¶A dictionary of proxy objects (c.f. Flask context locals) corresponding to the supported OAuth2 providers, and can be used to make further authenticated requests out to the providers.
wrapped_login_user(*args, **kwargs)
¶Source code in pydatalab/routes/v0_1/auth.py
def wrapped_login_user(*args, **kwargs):
LOGGER.warning("Logging in user %s with role %s", args[0].display_name, args[0].role)
login_user(*args, **kwargs)
find_create_or_modify_user(identifier: str, identity_type: Union[str, pydatalab.models.people.IdentityType], identity_name: str, display_name: Optional[str] = None, verified: bool = False, create_account: bool = False) -> None
¶Search for a user account with the given identifier and identity type, creating or connecting one if it does not exist.
1. Find any user with the given identity, if found, return it.
2. If no user exists, check if there is currently a user logged in:
- If so, attach the identity to the current user.
- If not, create an entry in the user database with this identity.
3. Log in as the user for this session.
Source code in pydatalab/routes/v0_1/auth.py
def find_create_or_modify_user(
identifier: str,
identity_type: Union[str, IdentityType],
identity_name: str,
display_name: Optional[str] = None,
verified: bool = False,
create_account: bool = False,
) -> None:
"""Search for a user account with the given identifier and identity type, creating
or connecting one if it does not exist.
1. Find any user with the given identity, if found, return it.
2. If no user exists, check if there is currently a user logged in:
- If so, attach the identity to the current user.
- If not, create an entry in the user database with this identity.
3. Log in as the user for this session.
"""
def find_user_with_identity(
identifier: str,
identity_type: Union[str, IdentityType],
) -> Optional[Person]:
"""Look up the given identity in the users database."""
user = flask_mongo.db.users.find_one(
{"identities.identifier": identifier, "identities.identity_type": identity_type},
)
if user:
person = Person(**user)
identity_indices: list[int] = [
ind
for ind, _ in enumerate(person.identities)
if (_.identity_type == identity_type and _.identifier == identifier)
]
if len(identity_indices) != 1:
raise RuntimeError(
"Unexpected error: multiple or no identities matched the OAuth token."
)
identity_index = identity_indices[0]
if not person.identities[identity_index].verified:
flask_mongo.db.users.update_one(
{"_id": person.immutable_id},
{"$set": {f"identities.{identity_index}.verified": True}},
)
return person
return None
def attach_identity_to_user(
user_id: str,
identity: Identity,
use_display_name: bool = False,
use_contact_email: bool = False,
) -> None:
"""Associates an OAuth ID with a user entry in the database.
This function is currently brittle and would need to be updated
if the corresponding `Person` schema changes due to the hard-coded
field names.
Parameters:
user_id: The database ID of the user as a string.
identity: The identity to associate.
use_display_name: Whether to set the user's top-level display name with a
display name provided by this identity.
use_contact_email: Whether to set the user's top-level contact email with
an email address provided by this identity.
Raises:
RuntimeError: If the update was unsuccessful.
"""
update = {"$push": {"identities": identity.dict()}}
if use_display_name and identity.display_name:
update["$set"] = {"display_name": identity.display_name}
if use_contact_email and identity.identity_type is IdentityType.EMAIL and identity.verified:
update["$set"] = {"contact_email": identity.identifier}
result = flask_mongo.db.users.update_one(
{"_id": ObjectId(user_id)},
update,
)
if result.matched_count != 1:
raise RuntimeError(
f"Attempted to modify user {user_id} but performed {result.matched_count} updates. Results:\n{result.raw_result}"
)
user = find_user_with_identity(identifier, identity_type)
# If no user was found in the database with the OAuth ID, make or modify one:
if not user:
identity = Identity(
identifier=identifier,
identity_type=identity_type,
name=identity_name,
display_name=display_name,
verified=verified,
)
# If there is currently a user logged in who has gone through OAuth with a new identity,
# then update the user database with the identity
if current_user.is_authenticated:
attach_identity_to_user(
current_user.id,
identity,
use_display_name=True if current_user.display_name is None else False,
)
current_user.refresh()
user = current_user.person
# If there is no current authenticated user, make one with the current OAuth identity
else:
if not create_account:
raise UserRegistrationForbidden
user = Person.new_user_from_identity(identity, use_display_name=True)
wrapped_login_user(get_by_id_cached(str(user.immutable_id)))
LOGGER.debug("Inserting new user model %s into database", user)
insert_pydantic_model_fork_safe(user, "users")
# Log the user into the session with this identity
if user is not None:
wrapped_login_user(get_by_id_cached(str(user.immutable_id)))
github_logged_in(blueprint, token)
¶This Flask signal hooks into any attempt to use the GitHub blueprint, and will make a user account with this identity if not already present in the database.
Makes one authorized request to the GitHub API to get the user's GitHub ID, username and display name, without storing the OAuth token.
Source code in pydatalab/routes/v0_1/auth.py
@oauth_authorized.connect_via(OAUTH_BLUEPRINTS[IdentityType.GITHUB])
def github_logged_in(blueprint, token):
"""This Flask signal hooks into any attempt to use the GitHub blueprint, and will
make a user account with this identity if not already present in the database.
Makes one authorized request to the GitHub API to get the user's GitHub ID,
username and display name, without storing the OAuth token.
"""
if not token:
return False
resp = blueprint.session.get("/user")
if not resp.ok:
return False
github_info = resp.json()
github_user_id = str(github_info["id"])
username = str(github_info["login"])
name = str(github_info["name"])
org_membership = blueprint.session.get(f"/users/{username}/orgs").json()
if CONFIG.GITHUB_ORG_ALLOW_LIST:
create_account = any(
str(org["id"]) in CONFIG.GITHUB_ORG_ALLOW_LIST for org in org_membership
)
else:
create_account = False
find_create_or_modify_user(
github_user_id,
IdentityType.GITHUB,
username,
display_name=name,
verified=True,
create_account=create_account,
)
# Return false to prevent Flask-dance from trying to store the token elsewhere
return False
orcid_logged_in(_, token)
¶This signal hooks into any attempt to use the ORCID blueprint, and will associate a user account with this identity if not already present in the database.
The OAuth token is not stored alongside the user.
Source code in pydatalab/routes/v0_1/auth.py
@oauth_authorized.connect_via(OAUTH_BLUEPRINTS[IdentityType.ORCID])
def orcid_logged_in(_, token):
"""This signal hooks into any attempt to use the ORCID blueprint, and will
associate a user account with this identity if not already present in the database.
The OAuth token is not stored alongside the user.
"""
if not token:
return False
find_create_or_modify_user(
token["orcid"],
IdentityType.ORCID,
token["orcid"],
display_name=token["name"],
verified=True,
)
# Return false to prevent Flask-dance from trying to store the token elsewhere
return False
redirect_to_ui(blueprint, token)
¶Intercepts the default Flask-Dance and redirects to the referring page.
Source code in pydatalab/routes/v0_1/auth.py
@oauth_authorized.connect
def redirect_to_ui(blueprint, token): # pylint: disable=unused-argument
"""Intercepts the default Flask-Dance and redirects to the referring page."""
from flask import request
referer = request.headers.get("Referer", "/")
return redirect(referer)
get_authenticated_user_info()
¶Returns metadata associated with the currently authenticated user.
Source code in pydatalab/routes/v0_1/auth.py
def get_authenticated_user_info():
"""Returns metadata associated with the currently authenticated user."""
if current_user.is_authenticated:
return jsonify(json.loads(current_user.person.json())), 200
else:
return jsonify({"status": "failure", "message": "User must be authenticated."}), 401
generate_user_api_key()
¶Returns metadata associated with the currently authenticated user.
Source code in pydatalab/routes/v0_1/auth.py
def generate_user_api_key():
"""Returns metadata associated with the currently authenticated user."""
if current_user.is_authenticated and current_user.role == "admin":
new_key = "".join(random.choices(ascii_letters, k=KEY_LENGTH))
flask_mongo.db.api_keys.update_one(
{"_id": ObjectId(current_user.id)},
{"$set": {"hash": sha512(new_key.encode("utf-8")).hexdigest()}},
upsert=True,
)
return jsonify({"key": new_key}), 200
else:
return (
jsonify(
{
"status": "failure",
"message": "User must be an authenticated admin to request an API key.",
}
),
401,
)
blocks
¶ENDPOINTS: Dict[str, Callable]
¶add_data_block()
¶Call with AJAX to add a block to the sample
Source code in pydatalab/routes/v0_1/blocks.py
def add_data_block():
"""Call with AJAX to add a block to the sample"""
request_json = request.get_json()
# pull out required arguments from json
block_type = request_json["block_type"]
item_id = request_json["item_id"]
insert_index = request_json["index"]
if block_type not in BLOCK_TYPES:
return jsonify(status="error", message="Invalid block type"), 400
block = BLOCK_TYPES[block_type](item_id=item_id)
data = block.to_db()
# currently, adding to both blocks and blocks_obj to mantain compatibility with
# the old site. The new site only uses blocks_obj
if insert_index:
display_order_update = {
"$each": [block.block_id],
"$position": insert_index,
}
else:
display_order_update = block.block_id
result = flask_mongo.db.items.update_one(
{"item_id": item_id, **get_default_permissions(user_only=True)},
{
"$push": {"blocks": data, "display_order": display_order_update},
"$set": {f"blocks_obj.{block.block_id}": data},
},
)
if result.modified_count < 1:
return (
jsonify(
status="error",
message=f"Update failed. {item_id=} is probably incorrect.",
),
400,
)
# get the new display_order:
display_order_result = flask_mongo.db.items.find_one(
{"item_id": item_id, **get_default_permissions(user_only=True)}, {"display_order": 1}
)
return jsonify(
status="success",
new_block_obj=block.to_web(),
new_block_insert_index=insert_index
if insert_index is None
else len(display_order_result["display_order"]) - 1,
new_display_order=display_order_result["display_order"],
)
add_collection_data_block()
¶Call with AJAX to add a block to the collection.
Source code in pydatalab/routes/v0_1/blocks.py
def add_collection_data_block():
"""Call with AJAX to add a block to the collection."""
request_json = request.get_json()
# pull out required arguments from json
block_type = request_json["block_type"]
collection_id = request_json["collection_id"]
insert_index = request_json["index"]
if block_type not in BLOCK_TYPES:
return jsonify(status="error", message="Invalid block type"), 400
block = BLOCK_TYPES[block_type](collection_id=collection_id)
data = block.to_db()
# currently, adding to both blocks and blocks_obj to mantain compatibility with
# the old site. The new site only uses blocks_obj
if insert_index:
display_order_update = {
"$each": [block.block_id],
"$position": insert_index,
}
else:
display_order_update = block.block_id
result = flask_mongo.db.collections.update_one(
{"collection_id": collection_id, **get_default_permissions(user_only=True)},
{
"$push": {"blocks": data, "display_order": display_order_update},
"$set": {f"blocks_obj.{block.block_id}": data},
},
)
if result.modified_count < 1:
return (
jsonify(
status="error",
message=f"Update failed. {collection_id=} is probably incorrect.",
),
400,
)
# get the new display_order:
display_order_result = flask_mongo.db.items.find_one(
{"collection_id": collection_id, **get_default_permissions(user_only=True)},
{"display_order": 1},
)
return jsonify(
status="success",
new_block_obj=block.to_web(),
new_block_insert_index=insert_index
if insert_index is None
else len(display_order_result["display_order"]) - 1,
new_display_order=display_order_result["display_order"],
)
update_block()
¶Take in json block data from site, process, and spit out updated data. May be used, for example, when the user changes plot parameters and the server needs to generate a new plot.
Source code in pydatalab/routes/v0_1/blocks.py
def update_block():
"""Take in json block data from site, process, and spit
out updated data. May be used, for example, when the user
changes plot parameters and the server needs to generate a new
plot.
"""
request_json = request.get_json()
block_data = request_json["block_data"]
blocktype = block_data["blocktype"]
save_to_db = request_json.get("save_to_db", False)
block = BLOCK_TYPES[blocktype].from_web(block_data)
saved_successfully = False
if save_to_db:
saved_successfully = _save_block_to_db(block)
return (
jsonify(
status="success", saved_successfully=saved_successfully, new_block_data=block.to_web()
),
200,
)
delete_block()
¶Completely delete a data block from the database. In the future, we may consider preserving data by moving it to a different array, or simply making it invisible
Source code in pydatalab/routes/v0_1/blocks.py
def delete_block():
"""Completely delete a data block from the database. In the future,
we may consider preserving data by moving it to a different array,
or simply making it invisible"""
request_json = request.get_json()
item_id = request_json["item_id"]
block_id = request_json["block_id"]
result = flask_mongo.db.items.update_one(
{"item_id": item_id, **get_default_permissions(user_only=True)},
{
"$pull": {
"blocks": {"block_id": block_id},
"display_order": block_id,
},
"$unset": {f"blocks_obj.{block_id}": ""},
},
)
if result.modified_count < 1:
return (
jsonify(
{
"status": "error",
"message": f"Update failed. The item_id probably incorrect: {item_id}",
}
),
400,
)
return (
jsonify({"status": "success"}),
200,
) # could try to switch to http 204 is "No Content" success with no json
delete_collection_block()
¶Completely delete a data block from the database that is currently attached to a collection.
In the future, we may consider preserving data by moving it to a different array, or simply making it invisible
Source code in pydatalab/routes/v0_1/blocks.py
def delete_collection_block():
"""Completely delete a data block from the database that is currently
attached to a collection.
In the future, we may consider preserving data by moving it to a different array,
or simply making it invisible"""
request_json = request.get_json()
collection_id = request_json["collection_id"]
block_id = request_json["block_id"]
result = flask_mongo.db.collections.update_one(
{"collection_id": collection_id, **get_default_permissions(user_only=True)},
{
"$pull": {
"blocks": {"block_id": block_id},
"display_order": block_id,
},
"$unset": {f"blocks_obj.{block_id}": ""},
},
)
if result.modified_count < 1:
return (
jsonify(
{
"status": "error",
"message": f"Update failed. The collection_id probably incorrect: {collection_id}",
}
),
400,
)
return (
jsonify({"status": "success"}),
200,
)
collections
¶collection
¶get_collections()
¶Source code in pydatalab/routes/v0_1/collections.py
@collection.route("/collections/")
def get_collections():
collections = flask_mongo.db.collections.aggregate(
[
{"$match": get_default_permissions(user_only=True)},
{"$lookup": creators_lookup()},
{"$project": {"_id": 0}},
{"$sort": {"_id": -1}},
]
)
return jsonify({"status": "success", "data": list(collections)})
get_collection(collection_id)
¶Source code in pydatalab/routes/v0_1/collections.py
@collection.route("/collections/<collection_id>", methods=["GET"])
def get_collection(collection_id):
cursor = flask_mongo.db.collections.aggregate(
[
{
"$match": {
"collection_id": collection_id,
**get_default_permissions(user_only=True),
}
},
{"$lookup": creators_lookup()},
{"$sort": {"_id": -1}},
]
)
try:
doc = list(cursor)[0]
except IndexError:
doc = None
if not doc or (not current_user.is_authenticated and not CONFIG.TESTING):
return (
jsonify(
{
"status": "error",
"message": f"No matching collection {collection_id=} with current authorization.",
}
),
404,
)
collection = Collection(**doc)
samples = list(
get_samples_summary(
match={
"relationships.type": "collections",
"relationships.immutable_id": collection.immutable_id,
},
project={"collections": 0},
)
)
collection.num_items = len(samples)
return jsonify(
{
"status": "success",
"collection_id": collection_id,
"data": json.loads(collection.json(exclude_unset=True)),
"child_items": list(samples),
}
)
create_collection()
¶Source code in pydatalab/routes/v0_1/collections.py
@collection.route("/collections/", methods=["PUT"])
def create_collection():
request_json = request.get_json() # noqa: F821 pylint: disable=undefined-variable
data = request_json.get("data", {})
copy_from_id = request_json.get("copy_from_collection_id", None)
starting_members = data.get("starting_members", [])
if not current_user.is_authenticated and not CONFIG.TESTING:
return (
dict(
status="error",
message="Unable to create new collection without user authentication.",
collection_id=data.get("collection_id"),
),
401,
)
if copy_from_id:
raise NotImplementedError("Copying collections is not yet implemented.")
if CONFIG.TESTING:
data["creator_ids"] = [24 * "0"]
data["creators"] = [
{"display_name": "Public testing user", "contact_email": "datalab@odbx.science"}
]
else:
data["creator_ids"] = [current_user.person.immutable_id]
data["creators"] = [
{
"display_name": current_user.person.display_name,
"contact_email": current_user.person.contact_email,
}
]
# check to make sure that item_id isn't taken already
if flask_mongo.db.collections.find_one({"collection_id": data["collection_id"]}):
return (
dict(
status="error",
message=f"collection_id_validation_error: {data['collection_id']!r} already exists in database.",
collection_id=data["collection_id"],
),
409, # 409: Conflict
)
data["date"] = data.get("date", datetime.datetime.now())
try:
data_model = Collection(**data)
except ValidationError as error:
return (
dict(
status="error",
message=f"Unable to create new collection with ID {data['collection_id']}.",
item_id=data["collection_id"],
output=str(error),
),
400,
)
result: InsertOneResult = flask_mongo.db.collections.insert_one(
data_model.dict(exclude={"creators"})
)
if not result.acknowledged:
return (
dict(
status="error",
message=f"Failed to add new collection {data['collection_id']!r} to database.",
collection_id=data["collection_id"],
output=result.raw_result,
),
400,
)
immutable_id = result.inserted_id
errors = []
if starting_members:
item_ids = set(d.get("item_id") for d in starting_members)
if None in item_ids:
item_ids.remove(None)
results: UpdateResult = flask_mongo.db.items.update_many(
{
"item_id": {"$in": list(item_ids)},
**get_default_permissions(user_only=True),
},
{"$push": {"relationships": {"type": "collections", "immutable_id": immutable_id}}},
)
data_model.num_items = results.modified_count
if results.modified_count < len(starting_members):
errors = [
item_id
for item_id in starting_members
if item_id not in results.raw_result.get("upserted", [])
]
else:
data_model.num_items = 0
response = {
"status": "success",
"data": json.loads(data_model.json()),
}
if errors:
response["warnings"] = [
f"Unable to register {errors} to new collection {data_model.collection_id}"
]
return (
jsonify(response),
201, # 201: Created
)
save_collection(collection_id)
¶Source code in pydatalab/routes/v0_1/collections.py
@collection.route("/collections/<collection_id>", methods=["PATCH"])
@logged_route
def save_collection(collection_id):
request_json = request.get_json() # noqa: F821 pylint: disable=undefined-variable
updated_data = request_json.get("data")
if not updated_data:
return (
jsonify(
status="error",
message=f"Unable to find any data in request to update {collection_id=} with.",
),
204, # 204: No content
)
# These keys should not be updated here and cannot be modified by the user through this endpoint
for k in ("_id", "file_ObjectIds", "creators", "creator_ids", "collection_id"):
if k in updated_data:
del updated_data[k]
updated_data["last_modified"] = datetime.datetime.now().isoformat()
collection = flask_mongo.db.collections.find_one(
{"collection_id": collection_id, **get_default_permissions(user_only=True)}
)
if not collection:
return (
jsonify(
status="error",
message=f"Unable to find item with appropriate permissions and {collection_id=}.",
),
400,
)
collection.update(updated_data)
try:
collection = Collection(**collection).dict()
except ValidationError as exc:
return (
jsonify(
status="error",
message=f"Unable to update item {collection_id=} with new data {updated_data}",
output=str(exc),
),
400,
)
result: UpdateResult = flask_mongo.db.collections.update_one(
{"collection_id": collection_id},
{"$set": collection},
)
if result.modified_count != 1:
return (
jsonify(
status="error",
message=f"Unable to update item {collection_id=} with new data {updated_data}",
output=result.raw_result,
),
400,
)
return jsonify(status="success"), 200
delete_collection(collection_id: str)
¶Source code in pydatalab/routes/v0_1/collections.py
@collection.route("/collections/<collection_id>", methods=["DELETE"])
def delete_collection(collection_id: str):
result = flask_mongo.db.collections.delete_one(
{"collection_id": collection_id, **get_default_permissions(user_only=True)}
)
if result.deleted_count != 1:
return (
jsonify(
{
"status": "error",
"message": f"Authorization required to attempt to delete collection with {collection_id=} from the database.",
}
),
401,
)
return (
jsonify(
{
"status": "success",
}
),
200,
)
search_collections()
¶Source code in pydatalab/routes/v0_1/collections.py
@collection.route("/search-collections/", methods=["GET"])
def search_collections():
query = request.args.get("query", type=str)
nresults = request.args.get("nresults", default=100, type=int)
match_obj = {"$text": {"$search": query}, **get_default_permissions(user_only=True)}
cursor = [
json.loads(Collection(**doc).json(exclude_unset=True))
for doc in flask_mongo.db.collections.aggregate(
[
{"$match": match_obj},
{"$sort": {"score": {"$meta": "textScore"}}},
{"$limit": nresults},
{
"$project": {
"collection_id": 1,
"title": 1,
}
},
]
)
]
return jsonify({"status": "success", "data": list(cursor)}), 200
files
¶ENDPOINTS: Dict[str, Callable]
¶get_file(file_id, filename)
¶Source code in pydatalab/routes/v0_1/files.py
def get_file(file_id, filename):
if not current_user.is_authenticated and not CONFIG.TESTING:
return (
jsonify(
{
"status": "error",
"title": "Not Authorized",
"detail": "File access requires login.",
}
),
401,
)
path = os.path.join(CONFIG.FILE_DIRECTORY, secure_filename(file_id))
return send_from_directory(path, filename)
upload()
¶method to upload files to the server todo: think more about security, size limits, and about nested folders
Source code in pydatalab/routes/v0_1/files.py
def upload():
"""method to upload files to the server
todo: think more about security, size limits, and about nested folders
"""
if not current_user.is_authenticated and not CONFIG.TESTING:
return (
jsonify(
{
"status": "error",
"title": "Not Authorized",
"detail": "File upload requires login.",
}
),
401,
)
if len(request.files) == 0:
return jsonify(error="No file in request"), 400
if "item_id" not in request.form:
return jsonify(error="No item id provided in form"), 400
item_id = request.form["item_id"]
replace_file_id = request.form["replace_file"]
is_update = replace_file_id and replace_file_id != "null"
for filekey in request.files: # pretty sure there is just 1 per request
file = request.files[
filekey
] # just a weird thing about the request that comes from uppy. The key is "files[]"
if is_update:
file_information = file_utils.update_uploaded_file(file, ObjectId(replace_file_id))
else:
file_information = file_utils.save_uploaded_file(file, item_ids=[item_id])
return (
jsonify(
{
"status": "success",
"file_id": str(file_information["_id"]),
"file_information": file_information,
"is_update": is_update, # true if update, false if new file
}
),
201,
)
add_remote_file_to_sample()
¶Source code in pydatalab/routes/v0_1/files.py
def add_remote_file_to_sample():
if not current_user.is_authenticated and not CONFIG.TESTING:
return (
jsonify(
{
"status": "error",
"title": "Not Authorized",
"detail": "Adding a file to a sample requires login.",
}
),
401,
)
request_json = request.get_json()
item_id = request_json["item_id"]
file_entry = request_json["file_entry"]
updated_file_entry = file_utils.add_file_from_remote_directory(file_entry, item_id)
return (
jsonify(
{
"status": "success",
"file_id": str(updated_file_entry["_id"]),
"file_information": updated_file_entry,
}
),
201,
)
delete_file_from_sample()
¶Remove a file from a sample, but don't delete the actual file (for now)
Source code in pydatalab/routes/v0_1/files.py
def delete_file_from_sample():
"""Remove a file from a sample, but don't delete the actual file (for now)"""
if not current_user.is_authenticated and not CONFIG.TESTING:
return (
jsonify(
{
"status": "error",
"title": "Not Authorized",
"detail": "Adding a file to a sample requires login.",
}
),
401,
)
request_json = request.get_json()
item_id = request_json["item_id"]
file_id = ObjectId(request_json["file_id"])
result = pydatalab.mongo.flask_mongo.db.items.update_one(
{"item_id": item_id, **get_default_permissions(user_only=True)},
{"$pull": {"file_ObjectIds": file_id}},
)
if result.modified_count != 1:
return (
jsonify(
status="error",
message=f"Not authorized to perform file removal from sample {item_id=}",
output=result.raw_result,
),
401,
)
updated_file_entry = pydatalab.mongo.flask_mongo.db.files.find_one_and_update(
{"_id": file_id},
{"$pull": {"item_ids": item_id}},
return_document=ReturnDocument.AFTER,
)
if not updated_file_entry:
return (
jsonify(
status="error",
message=f"{item_id} {file_id} delete failed. Something went wrong with the db call to remove sample from file",
),
400,
)
return (
jsonify(
{
"status": "success",
"new_file_obj": {request_json["file_id"]: updated_file_entry},
}
),
200,
)
delete_file()
¶delete a data file from the uploads/item_id folder
Source code in pydatalab/routes/v0_1/files.py
def delete_file():
"""delete a data file from the uploads/item_id folder"""
if not current_user.is_authenticated and not CONFIG.TESTING:
return (
jsonify(
{
"status": "error",
"title": "Not Authorized",
"detail": "Adding a file to a sample requires login.",
}
),
401,
)
request_json = request.get_json()
item_id = request_json["item_id"]
filename = request_json["filename"]
secure_item_id = secure_filename(item_id)
secure_fname = secure_filename(filename)
path = os.path.join(CONFIG.FILE_DIRECTORY, secure_item_id, secure_fname)
if not os.path.isfile(path):
return (
jsonify(
status="error",
message="Delete failed. file not found: {}".format(path),
),
400,
)
result = pydatalab.mongo.flask_mongo.db.items.update_one(
{"item_id": item_id, **get_default_permissions(user_only=True)},
{"$pull": {"files": filename}},
return_document=ReturnDocument.AFTER,
)
if result.matched_count != 1:
return (
jsonify(
status="error",
message=f"{item_id} {filename} delete failed. Something went wrong with the db call. File not deleted.",
output=result.raw_result,
),
400,
)
os.remove(path)
return jsonify({"status": "success"}), 200
graphs
¶ENDPOINTS: Dict[str, Callable]
¶get_graph_cy_format(item_id: Optional[str] = None)
¶Source code in pydatalab/routes/v0_1/graphs.py
def get_graph_cy_format(item_id: Optional[str] = None):
if item_id is None:
all_documents = flask_mongo.db.items.find(
get_default_permissions(user_only=False),
projection={"item_id": 1, "name": 1, "type": 1, "relationships": 1},
)
node_ids = {document["item_id"] for document in all_documents}
all_documents.rewind()
else:
all_documents = list(
flask_mongo.db.items.find(
{
"$or": [{"item_id": item_id}, {"relationships.item_id": item_id}],
**get_default_permissions(user_only=False),
},
projection={"item_id": 1, "name": 1, "type": 1, "relationships": 1},
)
)
node_ids = {document["item_id"] for document in all_documents}
if len(node_ids) > 1:
next_shell = flask_mongo.db.items.find(
{
"$or": [
*[{"item_id": id} for id in node_ids if id != item_id],
*[{"relationships.item_id": id} for id in node_ids if id != item_id],
],
**get_default_permissions(user_only=False),
},
projection={"item_id": 1, "name": 1, "type": 1, "relationships": 1},
)
node_ids = node_ids | {document["item_id"] for document in next_shell}
all_documents.extend(next_shell)
nodes = []
edges = []
for document in all_documents:
nodes.append(
{
"data": {
"id": document["item_id"],
"name": document["name"],
"type": document["type"],
"special": document["item_id"] == item_id,
}
}
)
if not document.get("relationships"):
continue
for relationship in document["relationships"]:
# only considering child-parent relationships:
if relationship["relation"] not in ("parent", "is_part_of"):
continue
target = document["item_id"]
source = relationship["item_id"]
if source not in node_ids:
continue
edges.append(
{
"data": {
"id": f"{source}->{target}",
"source": source,
"target": target,
"value": 1,
}
}
)
# We want to filter out all the starting materials that don't have relationships since there are so many of them:
whitelist = {edge["data"]["source"] for edge in edges}
nodes = [
node
for node in nodes
if node["data"]["type"] in ("samples", "cells") or node["data"]["id"] in whitelist
]
return (jsonify(status="success", nodes=nodes, edges=edges), 200)
healthcheck
¶ENDPOINTS: Dict[str, Callable]
¶is_ready()
¶Source code in pydatalab/routes/v0_1/healthcheck.py
def is_ready():
from pydatalab.mongo import check_mongo_connection
try:
check_mongo_connection()
except RuntimeError:
return (
jsonify(status="error", message="Unable to connect to MongoDB at specified URI."),
500,
)
return (jsonify(status="success", message="Server and database are ready"), 200)
is_alive()
¶Source code in pydatalab/routes/v0_1/healthcheck.py
def is_alive():
return (jsonify(status="success", message="Server is alive"), 200)
info
¶This submodule defines introspective info endpoints of the API.
ENDPOINTS: Dict[str, Callable]
¶
Attributes (BaseModel)
pydantic-model
¶Source code in pydatalab/routes/v0_1/info.py
class Attributes(BaseModel):
class Config:
extra = "allow"
Meta (BaseModel)
pydantic-model
¶Source code in pydatalab/routes/v0_1/info.py
class Meta(BaseModel):
timestamp: datetime = Field(default_factory=datetime.now)
query: str = ""
api_version: str = __api_version__
available_api_versions: List[str] = [__api_version__]
server_version: str = __version__
datamodel_version: str = __version__
Links (BaseModel)
pydantic-model
¶Source code in pydatalab/routes/v0_1/info.py
class Links(BaseModel):
self: AnyUrl
class Config:
extra = "allow"
self: AnyUrl
pydantic-field
required
¶
Data (BaseModel)
pydantic-model
¶
JSONAPIResponse (BaseModel)
pydantic-model
¶Source code in pydatalab/routes/v0_1/info.py
class JSONAPIResponse(BaseModel):
data: Union[Data, List[Data]]
meta: Meta
links: Links
MetaPerson (BaseModel)
pydantic-model
¶
Info (Attributes, Meta)
pydantic-model
¶Source code in pydatalab/routes/v0_1/info.py
class Info(Attributes, Meta):
maintainer: Optional[MetaPerson]
issue_tracker: Optional[AnyUrl]
homepage: Optional[AnyUrl]
source_repository: Optional[AnyUrl]
@validator("maintainer")
def strip_maintainer_fields(cls, v):
if isinstance(v, Person):
return MetaPerson(contact_email=v.contact_email, display_name=v.display_name)
return v
maintainer: MetaPerson
pydantic-field
¶issue_tracker: AnyUrl
pydantic-field
¶homepage: AnyUrl
pydantic-field
¶source_repository: AnyUrl
pydantic-field
¶strip_maintainer_fields(v)
classmethod
¶Source code in pydatalab/routes/v0_1/info.py
@validator("maintainer")
def strip_maintainer_fields(cls, v):
if isinstance(v, Person):
return MetaPerson(contact_email=v.contact_email, display_name=v.display_name)
return v
get_info()
¶Source code in pydatalab/routes/v0_1/info.py
def get_info():
metadata = _get_deployment_metadata_once()
return (
jsonify(
json.loads(
JSONAPIResponse(
data=Data(id="/", type="info", attributes=Info(**metadata)),
meta=Meta(query=request.query_string),
links=Links(self=request.url),
).json()
)
),
200,
)
items
¶ENDPOINTS: Dict[str, Callable]
¶reserialize_blocks(display_order: List[str], blocks_obj: Dict[str, Dict]) -> Dict[str, Dict]
¶Create the corresponding Python objects from JSON block data, then serialize it again as JSON to populate any missing properties.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
blocks_obj |
Dict[str, Dict] |
A dictionary containing the JSON block data, keyed by block ID. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Dict] |
A dictionary with the re-serialized block data. |
Source code in pydatalab/routes/v0_1/items.py
def reserialize_blocks(display_order: List[str], blocks_obj: Dict[str, Dict]) -> Dict[str, Dict]:
"""Create the corresponding Python objects from JSON block data, then
serialize it again as JSON to populate any missing properties.
Parameters:
blocks_obj: A dictionary containing the JSON block data, keyed by block ID.
Returns:
A dictionary with the re-serialized block data.
"""
for block_id in display_order:
try:
block_data = blocks_obj[block_id]
except KeyError:
LOGGER.warning(f"block_id {block_id} found in display order but not in blocks_obj")
continue
blocktype = block_data["blocktype"]
blocks_obj[block_id] = (
BLOCK_TYPES.get(blocktype, BLOCK_TYPES["notsupported"]).from_db(block_data).to_web()
)
return blocks_obj
dereference_files(file_ids: List[Union[str, bson.objectid.ObjectId]]) -> Dict[str, Dict]
¶For a list of Object IDs (as strings or otherwise), query the files collection and return a dictionary of the data stored under each ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_ids |
List[Union[str, bson.objectid.ObjectId]] |
The list of IDs of files to return; |
required |
Returns:
Type | Description |
---|---|
Dict[str, Dict] |
The dereferenced data as a dictionary with (string) ID keys. |
Source code in pydatalab/routes/v0_1/items.py
def dereference_files(file_ids: List[Union[str, ObjectId]]) -> Dict[str, Dict]:
"""For a list of Object IDs (as strings or otherwise), query the files collection
and return a dictionary of the data stored under each ID.
Parameters:
file_ids: The list of IDs of files to return;
Returns:
The dereferenced data as a dictionary with (string) ID keys.
"""
results = {
str(f["_id"]): f
for f in flask_mongo.db.files.find(
{
"_id": {"$in": [ObjectId(_id) for _id in file_ids]},
}
)
}
if len(results) != len(file_ids):
raise RuntimeError(
"Some file IDs did not have corresponding database entries.\n"
f"Returned: {list(results.keys())}\n"
f"Requested: {file_ids}\n"
)
return results
get_starting_materials()
¶Source code in pydatalab/routes/v0_1/items.py
def get_starting_materials():
if not current_user.is_authenticated and not CONFIG.TESTING:
return (
jsonify(
status="error",
message="Authorization required to access chemical inventory.",
),
401,
)
items = [
doc
for doc in flask_mongo.db.items.aggregate(
[
{
"$match": {
"type": "starting_materials",
**get_default_permissions(user_only=False),
}
},
{
"$project": {
"_id": 0,
"item_id": 1,
"nblocks": {"$size": "$display_order"},
"date_acquired": 1,
"chemform": 1,
"name": 1,
"chemical_purity": 1,
"supplier": 1,
"location": 1,
}
},
]
)
]
return jsonify({"status": "success", "items": items})
get_samples_summary(match: Optional[Dict] = None, project: Optional[Dict] = None) -> CommandCursor
¶Return a summary of item entries that match some criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
match |
Optional[Dict] |
A MongoDB aggregation match query to filter the results. |
None |
project |
Optional[Dict] |
A MongoDB aggregation project query to filter the results, relative to the default included below. |
None |
Source code in pydatalab/routes/v0_1/items.py
def get_samples_summary(
match: Optional[Dict] = None, project: Optional[Dict] = None
) -> CommandCursor:
"""Return a summary of item entries that match some criteria.
Parameters:
match: A MongoDB aggregation match query to filter the results.
project: A MongoDB aggregation project query to filter the results, relative
to the default included below.
"""
if not match:
match = {}
match.update(get_default_permissions(user_only=False))
match["type"] = {"$in": ["samples", "cells"]}
_project = {
"_id": 0,
"creators": {
"display_name": 1,
"contact_email": 1,
},
"collections": {
"collection_id": 1,
"title": 1,
},
"item_id": 1,
"name": 1,
"chemform": 1,
"nblocks": {"$size": "$display_order"},
"characteristic_chemical_formula": 1,
"type": 1,
"date": 1,
"refcode": 1,
}
# Cannot mix 0 and 1 keys in MongoDB project so must loop and check
if project:
for key in project:
if project[key] == 0:
_project.pop(key, None)
else:
_project[key] = 1
return flask_mongo.db.items.aggregate(
[
{"$match": match},
{"$lookup": creators_lookup()},
{"$lookup": collections_lookup()},
{"$project": _project},
{"$sort": {"date": -1}},
]
)
creators_lookup() -> Dict
¶Source code in pydatalab/routes/v0_1/items.py
def creators_lookup() -> Dict:
return {
"from": "users",
"let": {"creator_ids": "$creator_ids"},
"pipeline": [
{
"$match": {
"$expr": {
"$in": ["$_id", "$$creator_ids"],
},
}
},
{"$project": {"_id": 0, "display_name": 1, "contact_email": 1}},
],
"as": "creators",
}
files_lookup() -> Dict
¶Source code in pydatalab/routes/v0_1/items.py
def files_lookup() -> Dict:
return {
"from": "files",
"localField": "file_ObjectIds",
"foreignField": "_id",
"as": "files",
}
collections_lookup() -> Dict
¶Looks inside the relationships of the item, searches for IDs in the collections table and then projects only the collection ID and name for the response.
Source code in pydatalab/routes/v0_1/items.py
def collections_lookup() -> Dict:
"""Looks inside the relationships of the item, searches for IDs in the collections
table and then projects only the collection ID and name for the response.
"""
return {
"from": "collections",
"let": {"collection_ids": "$relationships.immutable_id"},
"pipeline": [
{
"$match": {
"$expr": {
"$in": ["$_id", "$$collection_ids"],
},
"type": "collections",
}
},
{"$project": {"_id": 1, "collection_id": 1}},
],
"as": "collections",
}
get_samples()
¶Source code in pydatalab/routes/v0_1/items.py
def get_samples():
return jsonify({"status": "success", "samples": list(get_samples_summary())})
search_items()
¶Perform free text search on items and return the top results. GET parameters: query: String with the search terms. nresults: Maximum number of (default 100) !!! types "If None, search all types of items. Otherwise, a list of strings" giving the types to consider. (e.g. ["samples","starting_materials"])
Returns:
Type | Description |
---|---|
response list of dictionaries containing the matching items in order of descending match score. |
Source code in pydatalab/routes/v0_1/items.py
def search_items():
"""Perform free text search on items and return the top results.
GET parameters:
query: String with the search terms.
nresults: Maximum number of (default 100)
types: If None, search all types of items. Otherwise, a list of strings
giving the types to consider. (e.g. ["samples","starting_materials"])
Returns:
response list of dictionaries containing the matching items in order of
descending match score.
"""
query = request.args.get("query", type=str)
nresults = request.args.get("nresults", default=100, type=int)
types = request.args.get("types", default=None)
if isinstance(types, str):
types = types.split(",") # should figure out how to parse as list automatically
match_obj = {"$text": {"$search": query}, **get_default_permissions(user_only=False)}
if types is not None:
match_obj["type"] = {"$in": types}
cursor = flask_mongo.db.items.aggregate(
[
{"$match": match_obj},
{"$sort": {"score": {"$meta": "textScore"}}},
{"$limit": nresults},
{
"$project": {
"_id": 0,
"type": 1,
"item_id": 1,
"name": 1,
"chemform": 1,
"refcode": 1,
}
},
]
)
return jsonify({"status": "success", "items": list(cursor)}), 200
create_sample()
¶Source code in pydatalab/routes/v0_1/items.py
def create_sample():
request_json = request.get_json() # noqa: F821 pylint: disable=undefined-variable
if "new_sample_data" in request_json:
response, http_code = _create_sample(
request_json["new_sample_data"], request_json.get("copy_from_item_id")
)
else:
response, http_code = _create_sample(request_json)
return jsonify(response), http_code
create_samples()
¶attempt to create multiple samples at once. Because each may result in success or failure, 207 is returned along with a json field containing all the individual http_codes
Source code in pydatalab/routes/v0_1/items.py
def create_samples():
"""attempt to create multiple samples at once.
Because each may result in success or failure, 207 is returned along with a
json field containing all the individual http_codes"""
request_json = request.get_json() # noqa: F821 pylint: disable=undefined-variable
sample_jsons = request_json["new_sample_datas"]
copy_from_item_ids = request_json.get("copy_from_item_ids")
if copy_from_item_ids is None:
copy_from_item_ids = [None] * len(sample_jsons)
outputs = [
_create_sample(sample_json, copy_from_item_id)
for sample_json, copy_from_item_id in zip(sample_jsons, copy_from_item_ids)
]
responses, http_codes = zip(*outputs)
statuses = [response["status"] for response in responses]
nsuccess = statuses.count("success")
nerror = statuses.count("error")
return (
jsonify(
nsuccess=nsuccess,
nerror=nerror,
responses=responses,
http_codes=http_codes,
),
207,
) # 207: multi-status
delete_sample()
¶Source code in pydatalab/routes/v0_1/items.py
def delete_sample():
request_json = request.get_json() # noqa: F821 pylint: disable=undefined-variable
item_id = request_json["item_id"]
result = flask_mongo.db.items.delete_one(
{"item_id": item_id, **get_default_permissions(user_only=True)}
)
if result.deleted_count != 1:
return (
jsonify(
{
"status": "error",
"message": f"Authorization required to attempt to delete sample with {item_id=} from the database.",
}
),
401,
)
return (
jsonify(
{
"status": "success",
}
),
200,
)
get_item_data(item_id, load_blocks: bool = False)
¶Generates a JSON response for the item with the given item_id
,
additionally resolving relationships to files and other items.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
load_blocks |
bool |
Whether to regenerate any data blocks associated with this sample (i.e., create the Python object corresponding to the block and call its render function). |
False |
Source code in pydatalab/routes/v0_1/items.py
def get_item_data(item_id, load_blocks: bool = False):
"""Generates a JSON response for the item with the given `item_id`,
additionally resolving relationships to files and other items.
Parameters:
load_blocks: Whether to regenerate any data blocks associated with this
sample (i.e., create the Python object corresponding to the block and
call its render function).
"""
# retrieve the entry from the database:
cursor = flask_mongo.db.items.aggregate(
[
{"$match": {"item_id": item_id, **get_default_permissions(user_only=False)}},
{"$lookup": creators_lookup()},
{"$lookup": collections_lookup()},
{"$lookup": files_lookup()},
],
)
try:
doc = list(cursor)[0]
except IndexError:
doc = None
if not doc or (
not current_user.is_authenticated
and not CONFIG.TESTING
and not doc["type"] == "starting_materials"
):
return (
jsonify(
{
"status": "error",
"message": f"No matching item {item_id=} with current authorization.",
}
),
404,
)
# determine the item type and validate according to the appropriate schema
try:
ItemModel = ITEM_MODELS[doc["type"]]
except KeyError:
if "type" in doc:
raise KeyError(f"Item {item_id=} has invalid type: {doc['type']}")
else:
raise KeyError(f"Item {item_id=} has no type field in document.")
doc = ItemModel(**doc)
if load_blocks:
doc.blocks_obj = reserialize_blocks(doc.display_order, doc.blocks_obj)
# find any documents with relationships that mention this document
relationships_query_results = flask_mongo.db.items.find(
filter={
"$or": [
{"relationships.item_id": doc.item_id},
{"relationships.refcode": doc.refcode},
{"relationships.immutable_id": doc.immutable_id},
]
},
projection={
"item_id": 1,
"refcode": 1,
"relationships": {
"$elemMatch": {
"$or": [
{"item_id": doc.item_id},
{"refcode": doc.refcode},
],
},
},
},
)
# loop over and collect all 'outer' relationships presented by other items
incoming_relationships: Dict[RelationshipType, Set[str]] = {}
for d in relationships_query_results:
for k in d["relationships"]:
if k["relation"] not in incoming_relationships:
incoming_relationships[k["relation"]] = set()
incoming_relationships[k["relation"]].add(
d["item_id"] or d["refcode"] or d["immutable_id"]
)
# loop over and aggregate all 'inner' relationships presented by this item
inlined_relationships: Dict[RelationshipType, Set[str]] = {}
if doc.relationships is not None:
inlined_relationships = {
relation: {
d.item_id or d.refcode or d.immutable_id
for d in doc.relationships
if d.relation == relation
}
for relation in RelationshipType
}
# reunite parents and children from both directions of the relationships field
parents = incoming_relationships.get(RelationshipType.CHILD, set()).union(
inlined_relationships.get(RelationshipType.PARENT, set())
)
children = incoming_relationships.get(RelationshipType.PARENT, set()).union(
inlined_relationships.get(RelationshipType.CHILD, set())
)
# Must be exported to JSON first to apply the custom pydantic JSON encoders
return_dict = json.loads(doc.json(exclude_unset=True))
# create the files_data dictionary keyed by file ObjectId
files_data: Dict[ObjectId, Dict] = dict(
[(f["immutable_id"], f) for f in return_dict.get("files") or []]
)
return jsonify(
{
"status": "success",
"item_id": item_id,
"item_data": return_dict,
"files_data": files_data,
"child_items": sorted(children),
"parent_items": sorted(parents),
}
)
save_item()
¶Source code in pydatalab/routes/v0_1/items.py
def save_item():
request_json = request.get_json() # noqa: F821 pylint: disable=undefined-variable
item_id = request_json["item_id"]
updated_data = request_json["data"]
# These keys should not be updated here and cannot be modified by the user through this endpoint
for k in ("_id", "file_ObjectIds", "creators", "creator_ids", "item_id", "relationships"):
if k in updated_data:
del updated_data[k]
updated_data["last_modified"] = datetime.datetime.now().isoformat()
for block_id, block_data in updated_data.get("blocks_obj", {}).items():
blocktype = block_data["blocktype"]
block = BLOCK_TYPES.get(blocktype, BLOCK_TYPES["notsupported"]).from_web(block_data)
updated_data["blocks_obj"][block_id] = block.to_db()
item = flask_mongo.db.items.find_one(
{"item_id": item_id, **get_default_permissions(user_only=True)}
)
if not item:
return (
jsonify(
status="error",
message=f"Unable to find item with appropriate permissions and {item_id=}.",
),
400,
)
if updated_data.get("collections", []):
try:
updated_data["collections"] = _check_collections(updated_data)
except ValueError as exc:
return (
dict(
status="error",
message=f"Cannot update {item_id!r} with missing collections {updated_data['collections']!r}: {exc}",
item_id=item_id,
),
401,
)
item_type = item["type"]
item.update(updated_data)
try:
item = ITEM_MODELS[item_type](**item).dict()
except ValidationError as exc:
return (
jsonify(
status="error",
message=f"Unable to update item {item_id=} ({item_type=}) with new data {updated_data}",
output=str(exc),
),
400,
)
# remove collections and creators and any other reference fields
item.pop("collections")
item.pop("creators")
result = flask_mongo.db.items.update_one(
{"item_id": item_id},
{"$set": item},
)
if result.matched_count != 1:
return (
jsonify(
status="error",
message=f"{item_id} item update failed. no subdocument matched",
output=result.raw_result,
),
400,
)
return jsonify(status="success", last_modified=updated_data["last_modified"]), 200
search_users()
¶Perform free text search on users and return the top results. GET parameters: query: String with the search terms. nresults: Maximum number of (default 100)
Returns:
Type | Description |
---|---|
response list of dictionaries containing the matching items in order of descending match score. |
Source code in pydatalab/routes/v0_1/items.py
def search_users():
"""Perform free text search on users and return the top results.
GET parameters:
query: String with the search terms.
nresults: Maximum number of (default 100)
Returns:
response list of dictionaries containing the matching items in order of
descending match score.
"""
query = request.args.get("query", type=str)
nresults = request.args.get("nresults", default=100, type=int)
types = request.args.get("types", default=None)
match_obj = {"$text": {"$search": query}}
if types is not None:
match_obj["type"] = {"$in": types}
cursor = flask_mongo.db.users.aggregate(
[
{"$match": match_obj},
{"$sort": {"score": {"$meta": "textScore"}}},
{"$limit": nresults},
{
"$project": {
"_id": 1,
"identities": 1,
"display_name": 1,
}
},
]
)
return jsonify({"status": "success", "users": list(cursor)}), 200
remotes
¶ENDPOINTS: Dict[str, Callable]
¶list_remote_directories()
¶Returns the most recent directory structures from the server.
If the cache is missing or is older than some configured time, then it will be reconstructed.
Source code in pydatalab/routes/v0_1/remotes.py
def list_remote_directories():
"""Returns the most recent directory structures from the server.
If the cache is missing or is older than some configured time,
then it will be reconstructed.
"""
if not current_user.is_authenticated and not CONFIG.TESTING:
return (
jsonify(
{
"status": "error",
"title": "Not Authorized",
"detail": "Listing remote directories requires authentication.",
}
),
401,
)
invalidate_cache = None
if "invalidate_cache" in request.args:
invalidate_cache = request.args["invalidate_cache"]
if invalidate_cache not in ("1", "0"):
return jsonify({"error": "invalidate_cache must be 0 or 1"}), 400
invalidate_cache = bool(int(invalidate_cache))
all_directory_structures = get_directory_structures(
CONFIG.REMOTE_FILESYSTEMS, invalidate_cache=invalidate_cache
)
response = {}
response["meta"] = {}
response["meta"]["remotes"] = CONFIG.REMOTE_FILESYSTEMS
if all_directory_structures:
oldest_update = min(d["last_updated"] for d in all_directory_structures)
response["meta"]["oldest_cache_update"] = oldest_update.isoformat()
response["data"] = all_directory_structures
return jsonify(response), 200
simple_bokeh_plot
¶
FONTSIZE
¶
TOOLS
¶
TYPEFACE
¶
mytheme
¶
style
¶
simple_bokeh_plot(xy_filename, x_label = None, y_label = None)
¶
Source code in pydatalab/simple_bokeh_plot.py
def simple_bokeh_plot(xy_filename, x_label=None, y_label=None):
df = pd.read_csv(xy_filename, sep=r"\s+")
# source = ColumnDataSource(df)
source = ColumnDataSource(
{"x_col": df[df.columns[0]], "y_col": df[df.columns[1]]}
) # plot the first two columns
kw = dict()
p = figure(sizing_mode="scale_width", aspect_ratio=1.5, tools=TOOLS, **kw)
p.xaxis.axis_label = x_label
p.yaxis.axis_label = y_label
# apply a theme. for some reason, this isn't carrying over
# to components() calls, so use components(theme=mytheme)
curdoc().theme = mytheme
p.circle("x_col", "y_col", source=source)
p.toolbar.logo = "grey"
p.js_on_event(DoubleTap, CustomJS(args=dict(p=p), code="p.reset.emit()"))
# show(p)
return p
utils
¶
This module contains utility functions that can be used anywhere in the package.
Classes¶
CustomJSONEncoder (JSONEncoder)
¶
Use a JSON encoder that can handle pymongo's bson.
Source code in pydatalab/utils.py
class CustomJSONEncoder(JSONEncoder):
"""Use a JSON encoder that can handle pymongo's bson."""
def default(self, obj):
if isinstance(obj, datetime.datetime):
return datetime.datetime.isoformat(obj)
return json_util.default(obj)
default(self, obj)
¶Convert o
to a JSON serializable type. See
:meth:json.JSONEncoder.default
. Python does not support
overriding how basic types like str
or list
are
serialized, they are handled before this method.
Source code in pydatalab/utils.py
def default(self, obj):
if isinstance(obj, datetime.datetime):
return datetime.datetime.isoformat(obj)
return json_util.default(obj)
Functions¶
reduce_df_size(df: DataFrame, target_nrows: int, endpoint: bool = True) -> DataFrame
¶
Reduce the dataframe to the number of target rows by applying a stride.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
The dataframe to reduce. |
required |
target_nrows |
int |
The target number of rows to reduce each column to. |
required |
endpoint |
bool |
Whether to include the endpoint of the dataframe. |
True |
Returns:
Type | Description |
---|---|
DataFrame |
A copy of the input dataframe with the applied stride. |
Source code in pydatalab/utils.py
def reduce_df_size(df: pd.DataFrame, target_nrows: int, endpoint: bool = True) -> pd.DataFrame:
"""Reduce the dataframe to the number of target rows by applying a stride.
Parameters:
df: The dataframe to reduce.
target_nrows: The target number of rows to reduce each column to.
endpoint: Whether to include the endpoint of the dataframe.
Returns:
A copy of the input dataframe with the applied stride.
"""
num_rows = len(df)
stride = ceil(num_rows / target_nrows)
if endpoint:
indices = [0] + list(range(stride, num_rows - 1, stride)) + [num_rows - 1]
else:
indices = list(range(0, num_rows, stride))
return df.iloc[indices].copy()