Skip to content

pydatalab special

Modules

apps special

Modules
chat special
Modules
blocks
MAX_CONTEXT_SIZE
MODEL
Classes
ChatBlock (DataBlock)
Source code in pydatalab/apps/chat/blocks.py
class ChatBlock(DataBlock):
    blocktype = "chat"
    description = "Virtual assistant"
    accepted_file_extensions: Sequence[str] = []
    __supports_collections = True
    defaults = {
        "system_prompt": """You are whinchat (lowercase w), a virtual data managment assistant that helps materials chemists manage their experimental data and plan experiments. You are deployed in the group of Professor Clare Grey in the Department of Chemistry at the University of Cambridge.
You are embedded within the program datalab, where you have access to JSON describing an ‘item’, or a collection of items, with connections to other items. These items may include experimental samples, starting materials, and devices (e.g. battery cells made out of experimental samples and starting materials).
Answer questions in markdown. Specify the language for all markdown code blocks. You can make diagrams by writing a mermaid code block or an svg code block. When writing mermaid code, you must use quotations around each of the labels (e.g. A["label1"] --> B["label2"])
Be as concise as possible. When saying your name, type a bird emoji right after whinchat 🐦.
        """,
        "temperature": 0.2,
        "error_message": None,
    }
    openai.api_key = os.environ.get("OPENAI_API_KEY")

    def to_db(self):
        """returns a dictionary with the data for this
        block, ready to be input into mongodb"""
        self.render()
        return super().to_db()

    @property
    def plot_functions(self):
        return (self.render,)

    def render(self):
        if not self.data.get("messages"):
            if (item_id := self.data.get("item_id")) is not None:
                info_json = self._prepare_item_json_for_chat(item_id)
            elif (collection_id := self.data.get("collection_id")) is not None:
                info_json = self._prepare_collection_json_for_chat(collection_id)
            else:
                raise RuntimeError("No item or collection id provided")

            self.data["messages"] = [
                {
                    "role": "system",
                    "content": self.defaults["system_prompt"],
                },
                {
                    "role": "user",
                    "content": f"""Here is the JSON data for the current item(s): {info_json}.
Start with a friendly introduction and give me a one sentence summary of what this is (not detailed, no information about specific masses). """,
                },
            ]

        if self.data.get("prompt"):
            self.data["messages"].append(
                {
                    "role": "user",
                    "content": self.data["prompt"],
                }
            )
            self.data["prompt"] = None

        token_count = num_tokens_from_messages(self.data["messages"])
        self.data["token_count"] = token_count

        if token_count >= MAX_CONTEXT_SIZE:
            self.data[
                "error_message"
            ] = f"""This conversation has reached its maximum context size and the chatbot won't be able to respond further ({token_count} tokens, max: {MAX_CONTEXT_SIZE}). Please make a new chat block to start fresh."""
            return

        try:
            if self.data["messages"][-1].role not in ("user", "system"):
                return
        except AttributeError:
            if self.data["messages"][-1]["role"] not in ("user", "system"):
                return

        try:
            LOGGER.debug(
                f"submitting request to OpenAI API for completion with last message role \"{self.data['messages'][-1]['role']}\" (message = {self.data['messages'][-1:]}). Temperature = {self.data['temperature']} (type {type(self.data['temperature'])})"
            )
            responses = openai.ChatCompletion.create(
                model=MODEL,
                messages=self.data["messages"],
                temperature=self.data["temperature"],
                max_tokens=min(
                    1024, MAX_CONTEXT_SIZE - token_count - 1
                ),  # if less than 1024 tokens are left in the token, then indicate this
            )
            self.data["error_message"] = None
        except openai.OpenAIError as exc:
            LOGGER.debug("Received an error from OpenAI API: %s", exc)
            self.data["error_message"] = f"Received an error from the OpenAi API: {exc}."
            return

        try:
            self.data["messages"].append(responses["choices"][0].message)
        except AttributeError:
            self.data["messages"].append(responses["choices"][0]["message"])

        self.data["model_name"] = MODEL

        token_count = num_tokens_from_messages(self.data["messages"])
        self.data["token_count"] = token_count
        return

    def _prepare_item_json_for_chat(self, item_id: str):
        from pydatalab.routes.v0_1.items import get_item_data

        item_info = get_item_data(item_id, load_blocks=False).json

        model = ITEM_MODELS[item_info["item_data"]["type"]](**item_info["item_data"])
        if model.blocks_obj:
            model.blocks_obj = {
                k: value for k, value in model.blocks_obj.items() if value["blocktype"] != "chat"
            }
        item_info = model.dict(exclude_none=True, exclude_unset=True)
        item_info["type"] = model.type

        # strip irrelevant or large fields
        item_filenames = {
            str(file["immutable_id"]): file["name"] for file in item_info.get("files", [])
        }
        for block in item_info.get("blocks_obj", {}).values():
            block.pop("bokeh_plot_data", None)

            block_fields_to_remove = ["item_id", "block_id"]
            [block.pop(field, None) for field in block_fields_to_remove]

            # nmr block fields to remove (need a more general way to do this)
            NMR_fields_to_remove = [
                "acquisition_parameters",
                "carrier_offset_Hz",
                "nscans",
                "processed_data",
                "processed_data_shape",
                "processing_parameters",
                "pulse_program",
                "selected_process",
            ]
            [block.pop(field, None) for field in NMR_fields_to_remove]

            # replace file_id with the actual filename
            file_id = block.pop("file_id", None)
            if file_id:
                block["file"] = item_filenames.get(file_id, None)

        top_level_keys_to_remove = [
            "display_order",
            "creator_ids",
            "refcode",
            "last_modified",
            "revision",
            "revisions",
            "immutable_id",
            "file_ObjectIds",
        ]

        for k in top_level_keys_to_remove:
            item_info.pop(k, None)

        for ind, f in enumerate(item_info.get("relationships", [])):
            item_info["relationships"][ind] = {
                k: v for k, v in f.items() if k in ["item_id", "type", "relation"]
            }
        item_info["files"] = [file["name"] for file in item_info.get("files", [])]
        item_info["creators"] = [
            creator["display_name"] for creator in item_info.get("creators", [])
        ]

        # move blocks from blocks_obj to a simpler list to further cut down tokens,
        # especially in alphanumeric block_id fields
        item_info["blocks"] = [block for block in item_info.pop("blocks_obj", {}).values()]

        item_info = {k: value for k, value in item_info.items() if value}

        for key in [
            "synthesis_constituents",
            "positive_electrode",
            "negative_electrode",
            "electrolyte",
        ]:
            if key in item_info:
                for constituent in item_info[key]:
                    LOGGER.debug("iterating through constituents:")
                    LOGGER.debug(constituent)
                    if "quantity" in constituent:
                        constituent[
                            "quantity"
                        ] = f"{constituent.get('quantity', 'unknown')} {constituent.get('unit', '')}"
                    constituent.pop("unit", None)

        # Note manual replaces to help avoid escape sequences that take up extra tokens
        item_info_json = (
            json.dumps(item_info, cls=CustomJSONEncoder)
            .replace('"', "'")
            .replace(r"\'", "'")
            .replace(r"\n", " ")
        )

        return item_info_json

    def _prepare_collection_json_for_chat(self, collection_id: str):
        from pydatalab.routes.v0_1.collections import get_collection

        collection_data = get_collection(collection_id).json
        if collection_data["status"] != "success":
            raise RuntimeError(f"Attempt to get collection data for {collection_id} failed.")

        children = collection_data["child_items"]
        return (
            "["
            + ",".join([self._prepare_item_json_for_chat(child["item_id"]) for child in children])
            + "]"
        )
accepted_file_extensions: Sequence[str]
blocktype: str
defaults: Dict[str, Any]
description: str
plot_functions property readonly
Methods
to_db(self)

returns a dictionary with the data for this block, ready to be input into mongodb

Source code in pydatalab/apps/chat/blocks.py
def to_db(self):
    """returns a dictionary with the data for this
    block, ready to be input into mongodb"""
    self.render()
    return super().to_db()
render(self)
Source code in pydatalab/apps/chat/blocks.py
    def render(self):
        if not self.data.get("messages"):
            if (item_id := self.data.get("item_id")) is not None:
                info_json = self._prepare_item_json_for_chat(item_id)
            elif (collection_id := self.data.get("collection_id")) is not None:
                info_json = self._prepare_collection_json_for_chat(collection_id)
            else:
                raise RuntimeError("No item or collection id provided")

            self.data["messages"] = [
                {
                    "role": "system",
                    "content": self.defaults["system_prompt"],
                },
                {
                    "role": "user",
                    "content": f"""Here is the JSON data for the current item(s): {info_json}.
Start with a friendly introduction and give me a one sentence summary of what this is (not detailed, no information about specific masses). """,
                },
            ]

        if self.data.get("prompt"):
            self.data["messages"].append(
                {
                    "role": "user",
                    "content": self.data["prompt"],
                }
            )
            self.data["prompt"] = None

        token_count = num_tokens_from_messages(self.data["messages"])
        self.data["token_count"] = token_count

        if token_count >= MAX_CONTEXT_SIZE:
            self.data[
                "error_message"
            ] = f"""This conversation has reached its maximum context size and the chatbot won't be able to respond further ({token_count} tokens, max: {MAX_CONTEXT_SIZE}). Please make a new chat block to start fresh."""
            return

        try:
            if self.data["messages"][-1].role not in ("user", "system"):
                return
        except AttributeError:
            if self.data["messages"][-1]["role"] not in ("user", "system"):
                return

        try:
            LOGGER.debug(
                f"submitting request to OpenAI API for completion with last message role \"{self.data['messages'][-1]['role']}\" (message = {self.data['messages'][-1:]}). Temperature = {self.data['temperature']} (type {type(self.data['temperature'])})"
            )
            responses = openai.ChatCompletion.create(
                model=MODEL,
                messages=self.data["messages"],
                temperature=self.data["temperature"],
                max_tokens=min(
                    1024, MAX_CONTEXT_SIZE - token_count - 1
                ),  # if less than 1024 tokens are left in the token, then indicate this
            )
            self.data["error_message"] = None
        except openai.OpenAIError as exc:
            LOGGER.debug("Received an error from OpenAI API: %s", exc)
            self.data["error_message"] = f"Received an error from the OpenAi API: {exc}."
            return

        try:
            self.data["messages"].append(responses["choices"][0].message)
        except AttributeError:
            self.data["messages"].append(responses["choices"][0]["message"])

        self.data["model_name"] = MODEL

        token_count = num_tokens_from_messages(self.data["messages"])
        self.data["token_count"] = token_count
        return
num_tokens_from_messages(messages: Sequence[dict])
Source code in pydatalab/apps/chat/blocks.py
def num_tokens_from_messages(messages: Sequence[dict]):
    # see: https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb
    encoding = tiktoken.encoding_for_model(MODEL)

    tokens_per_message = 4  # every message follows <|start|>{role/name}\n{content}<|end|>\n
    tokens_per_name = -1  # if there's a name, the role is omitted

    num_tokens = 0
    for message in messages:
        num_tokens += tokens_per_message
        for key, value in message.items():
            num_tokens += len(encoding.encode(value))
            if key == "name":
                num_tokens += tokens_per_name
    num_tokens += 3  # every reply is primed with <|start|>assistant<|message|>
    return num_tokens
eis special
EISBlock (DataBlock)
Source code in pydatalab/apps/eis/__init__.py
class EISBlock(DataBlock):

    accepted_file_extensions = [".txt"]
    blocktype = "eis"
    name = "Electrochemical Impedance Spectroscopy"
    description = "This block can plot EIS data from Ivium .txt files"

    @property
    def plot_functions(self):
        return (self.generate_eis_plot,)

    def generate_eis_plot(self):
        file_info = None
        # all_files = None
        eis_data = None

        if "file_id" not in self.data:
            LOGGER.warning("No file set in the DataBlock")
            return
        else:
            file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
            ext = os.path.splitext(file_info["location"].split("/")[-1])[-1].lower()
            if ext not in self.accepted_file_extensions:
                LOGGER.warning(
                    "Unsupported file extension (must be one of %s, not %s)",
                    self.accepted_file_extensions,
                    ext,
                )
                return

            eis_data = parse_ivium_eis_txt(Path(file_info["location"]))

        if eis_data is not None:
            plot = selectable_axes_plot(
                eis_data,
                x_options=["Re(Z) [Ω]"],
                y_options=["-Im(Z) [Ω]"],
                color_options=["Frequency [Hz]"],
                color_mapper=LogColorMapper("Cividis256"),
                plot_points=True,
                plot_line=False,
                tools=HoverTool(tooltips=[("Frequency [Hz]", "@{Frequency [Hz]}")]),
            )

            self.data["bokeh_plot_data"] = bokeh.embed.json_item(plot, theme=mytheme)
accepted_file_extensions: Sequence[str]
blocktype: str
description: str
name
plot_functions property readonly
generate_eis_plot(self)
Source code in pydatalab/apps/eis/__init__.py
def generate_eis_plot(self):
    file_info = None
    # all_files = None
    eis_data = None

    if "file_id" not in self.data:
        LOGGER.warning("No file set in the DataBlock")
        return
    else:
        file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
        ext = os.path.splitext(file_info["location"].split("/")[-1])[-1].lower()
        if ext not in self.accepted_file_extensions:
            LOGGER.warning(
                "Unsupported file extension (must be one of %s, not %s)",
                self.accepted_file_extensions,
                ext,
            )
            return

        eis_data = parse_ivium_eis_txt(Path(file_info["location"]))

    if eis_data is not None:
        plot = selectable_axes_plot(
            eis_data,
            x_options=["Re(Z) [Ω]"],
            y_options=["-Im(Z) [Ω]"],
            color_options=["Frequency [Hz]"],
            color_mapper=LogColorMapper("Cividis256"),
            plot_points=True,
            plot_line=False,
            tools=HoverTool(tooltips=[("Frequency [Hz]", "@{Frequency [Hz]}")]),
        )

        self.data["bokeh_plot_data"] = bokeh.embed.json_item(plot, theme=mytheme)
parse_ivium_eis_txt(filename: Path)
Source code in pydatalab/apps/eis/__init__.py
def parse_ivium_eis_txt(filename: Path):
    eis = pd.read_csv(filename, sep="\t")
    eis["Z2 /ohm"] *= -1
    eis.rename(
        {"Z1 /ohm": "Re(Z) [Ω]", "Z2 /ohm": "-Im(Z) [Ω]", "freq. /Hz": "Frequency [Hz]"},
        inplace=True,
        axis="columns",
    )
    return eis
tga special
Modules
blocks
MassSpecBlock (DataBlock)
Source code in pydatalab/apps/tga/blocks.py
class MassSpecBlock(DataBlock):
    blocktype = "ms"
    description = "Mass spectrometry (MS)"
    accepted_file_extensions = (".asc",)

    @property
    def plot_functions(self):
        return (self.generate_ms_plot,)

    def generate_ms_plot(self):
        file_info = None
        # all_files = None
        ms_data = None

        if "file_id" not in self.data:
            LOGGER.warning("No file set in the DataBlock")
            return
        else:
            file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
            ext = os.path.splitext(file_info["location"].split("/")[-1])[-1].lower()
            if ext not in self.accepted_file_extensions:
                LOGGER.warning(
                    "Unsupported file extension (must be one of %s, not %s)",
                    self.accepted_file_extensions,
                    ext,
                )
                return

            ms_data = parse_mt_mass_spec_ascii(Path(file_info["location"]))

        x_options = ["Time Relative [s]"]

        if ms_data:

            # collect the maximum value of the data key for each species for plot ordering
            max_vals: List[Tuple[str, float]] = []

            for species in ms_data["data"]:
                data_key = (
                    "Partial Pressure [mbar]"
                    if "Partial Pressure [mbar]" in ms_data["data"][species]
                    else "Ion Current [A]"
                )
                data = ms_data["data"][species][data_key].to_numpy()

                ms_data["data"][species][f"{data_key} (Savitzky-Golay)"] = savgol_filter(
                    data, len(data) // 10, 3
                )

                max_vals.append((species, ms_data["data"][species][data_key].max()))

            plots = []
            for ind, (species, _) in enumerate(sorted(max_vals, key=lambda x: x[1], reverse=True)):
                plots.append(
                    selectable_axes_plot(
                        {species: ms_data["data"][species]},
                        x_options=x_options,
                        y_options=[data_key],
                        y_default=[
                            f"{data_key} (Savitzky-Golay)",
                            f"{data_key}",
                        ],
                        label_x=(ind == 0),
                        label_y=(ind == 0),
                        plot_line=True,
                        plot_points=False,
                        plot_title=f"Channel name: {species}",
                        plot_index=ind,
                        aspect_ratio=1.5,
                    )
                )

                plots[-1].children[0].xaxis[0].ticker.desired_num_ticks = 2

            # construct MxN grid of all species
            M = 3
            grid = []
            for i in range(0, len(plots), M):
                grid.append(plots[i : i + M])
            p = gridplot(grid, sizing_mode="scale_width", toolbar_location="below")

            self.data["bokeh_plot_data"] = bokeh.embed.json_item(p, theme=grid_theme)
accepted_file_extensions: Sequence[str]
blocktype: str
description: str
plot_functions property readonly
generate_ms_plot(self)
Source code in pydatalab/apps/tga/blocks.py
def generate_ms_plot(self):
    file_info = None
    # all_files = None
    ms_data = None

    if "file_id" not in self.data:
        LOGGER.warning("No file set in the DataBlock")
        return
    else:
        file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
        ext = os.path.splitext(file_info["location"].split("/")[-1])[-1].lower()
        if ext not in self.accepted_file_extensions:
            LOGGER.warning(
                "Unsupported file extension (must be one of %s, not %s)",
                self.accepted_file_extensions,
                ext,
            )
            return

        ms_data = parse_mt_mass_spec_ascii(Path(file_info["location"]))

    x_options = ["Time Relative [s]"]

    if ms_data:

        # collect the maximum value of the data key for each species for plot ordering
        max_vals: List[Tuple[str, float]] = []

        for species in ms_data["data"]:
            data_key = (
                "Partial Pressure [mbar]"
                if "Partial Pressure [mbar]" in ms_data["data"][species]
                else "Ion Current [A]"
            )
            data = ms_data["data"][species][data_key].to_numpy()

            ms_data["data"][species][f"{data_key} (Savitzky-Golay)"] = savgol_filter(
                data, len(data) // 10, 3
            )

            max_vals.append((species, ms_data["data"][species][data_key].max()))

        plots = []
        for ind, (species, _) in enumerate(sorted(max_vals, key=lambda x: x[1], reverse=True)):
            plots.append(
                selectable_axes_plot(
                    {species: ms_data["data"][species]},
                    x_options=x_options,
                    y_options=[data_key],
                    y_default=[
                        f"{data_key} (Savitzky-Golay)",
                        f"{data_key}",
                    ],
                    label_x=(ind == 0),
                    label_y=(ind == 0),
                    plot_line=True,
                    plot_points=False,
                    plot_title=f"Channel name: {species}",
                    plot_index=ind,
                    aspect_ratio=1.5,
                )
            )

            plots[-1].children[0].xaxis[0].ticker.desired_num_ticks = 2

        # construct MxN grid of all species
        M = 3
        grid = []
        for i in range(0, len(plots), M):
            grid.append(plots[i : i + M])
        p = gridplot(grid, sizing_mode="scale_width", toolbar_location="below")

        self.data["bokeh_plot_data"] = bokeh.embed.json_item(p, theme=grid_theme)
parsers
Functions
parse_mt_mass_spec_ascii(path: Path) -> Dict[str, Union[pandas.core.frame.DataFrame, Dict]]

Parses an .asc file containing MS results from a Mettler-Toledo spectrometer and returns a dictionary with keys data and meta, which themselves contain a dictionary of dataframes for each species with the species names/masses as keys, and a dictionary of metadata fields respectively.

Parameters:

Name Type Description Default
path Path

The path of the file to parse.

required
Source code in pydatalab/apps/tga/parsers.py
def parse_mt_mass_spec_ascii(path: Path) -> Dict[str, Union[pd.DataFrame, Dict]]:
    """Parses an .asc file containing MS results from a Mettler-Toledo
    spectrometer and returns a dictionary with keys `data` and `meta`,
    which themselves contain a dictionary of dataframes for each species
    with the species names/masses as keys, and a dictionary of
    metadata fields respectively.

    Parameters:
        path: The path of the file to parse.

    """

    header_keys = ("Sourcefile", "Exporttime", "Start Time", "End Time")
    data_keys = ("Time Relative [s]", "Partial Pressure [mbar]", "Ion Current [A]")
    header = {}
    species = []

    if not path.exists():
        raise RuntimeError(f"Provided path does not exist: {path!r}")

    with open(path, "r") as f:

        # Read start of file until all header keys have been found
        max_header_lines = 8
        reads = 0
        header_end = None
        while reads < max_header_lines:
            line = f.readline().strip()
            reads += 1
            if line:
                for key in header_keys:
                    if key in line:
                        header[key] = line.split(key)[-1].strip()
            if all(k in header for k in header_keys):
                header_end = f.tell()
                break
        else:
            raise ValueError(
                f"Could not find all header keys in first {max_header_lines} lines of file."
            )

        for key in header_keys[1:]:
            if "time" in key.lower():
                header[key] = dateutil.parser.parse(header[key])  # type: ignore

        reads = 0
        max_species_lines = 10
        while reads < max_species_lines:
            line = f.readline().strip()
            reads += 1
            if not line:
                continue
            species = line.split()
            break
        else:
            raise ValueError(
                f"Could not find species list in lines {header_end}:{header_end + max_species_lines} lines of file."
            )

        # Read data with duplicated keys: will have (column number % number of data keys) appended to them
        # MT software also writes "---" if the value is missing, so parse these as NaNs to remove later
        df = pd.read_csv(f, sep="\t", header=0, parse_dates=False, na_values=["---"])
        ms_results: Dict[str, Union[pd.DataFrame, Dict]] = {}
        ms_results["meta"] = header
        ms_results["data"] = {}

        # Some files have Ion Current [A] or Partial Pressure [mbar] -- only rename those that are present
        present_keys = set(df.columns.values) & set(data_keys)
        for ind, specie in enumerate(species):

            # Loop over all species and rename the columns to remove the species name and disaggregate as a dict
            species_data_keys = [k + f"{'.' + str(ind) if ind != 0 else ''}" for k in present_keys]
            ms_results["data"][specie] = df[species_data_keys].rename(
                {mangled: original for mangled, original in zip(species_data_keys, present_keys)},
                axis="columns",
            )

            # Drop time axis as format cannot be easily inferred and data is essentially duplicated: "Start Time" in header
            # provides the timestamp of the first row
            ms_results["data"][specie].drop("Time", axis="columns", inplace=True, errors="ignore")

            # If the file was provided in an incomplete form, the final rows will be NaN, so drop them
            ms_results["data"][specie].dropna(inplace=True)

        return ms_results
xrd special
Modules
blocks
XRDBlock (DataBlock)
Source code in pydatalab/apps/xrd/blocks.py
class XRDBlock(DataBlock):
    blocktype = "xrd"
    description = "Powder XRD"
    accepted_file_extensions = (".xrdml", ".xy", ".dat", ".xye")

    defaults = {"wavelength": 1.54060}

    @property
    def plot_functions(self):
        return (self.generate_xrd_plot,)

    @classmethod
    def load_pattern(
        self, location: str, wavelength: float = None
    ) -> Tuple[pd.DataFrame, List[str]]:

        if not isinstance(location, str):
            location = str(location)

        ext = os.path.splitext(location.split("/")[-1])[-1].lower()

        if ext == ".xrdml":
            df = parse_xrdml(location)

        elif ext == ".xy":
            df = pd.read_csv(location, sep=r"\s+", names=["twotheta", "intensity"])

        else:
            df = pd.read_csv(location, sep=r"\s+", names=["twotheta", "intensity", "error"])

        df = df.rename(columns={"twotheta": "2θ (°)"})

        # if no wavelength (or invalid wavelength) is passed, don't convert to Q and d
        if wavelength:
            try:
                df["Q (Å⁻¹)"] = 4 * np.pi / wavelength * np.sin(np.deg2rad(df["2θ (°)"]) / 2)
                df["d (Å)"] = 2 * np.pi / df["Q (Å⁻¹)"]
            except (ValueError, ZeroDivisionError):
                pass

        df["sqrt(intensity)"] = np.sqrt(df["intensity"])
        df["log(intensity)"] = np.log10(df["intensity"])
        df["normalized intensity"] = df["intensity"] / np.max(df["intensity"])
        polyfit_deg = 15
        polyfit_baseline = np.poly1d(
            np.polyfit(df["2θ (°)"], df["normalized intensity"], deg=polyfit_deg)
        )(df["2θ (°)"])
        df["intensity - polyfit baseline"] = df["normalized intensity"] - polyfit_baseline
        df["intensity - polyfit baseline"] /= np.max(df["intensity - polyfit baseline"])
        df[f"baseline (`numpy.polyfit`, deg={polyfit_deg})"] = polyfit_baseline / np.max(
            df["intensity - polyfit baseline"]
        )

        kernel_size = 101
        median_baseline = medfilt(df["normalized intensity"], kernel_size=kernel_size)
        df["intensity - median baseline"] = df["normalized intensity"] - median_baseline
        df["intensity - median baseline"] /= np.max(df["intensity - median baseline"])
        df[
            f"baseline (`scipy.signal.medfilt`, kernel_size={kernel_size})"
        ] = median_baseline / np.max(df["intensity - median baseline"])

        df.index.name = location.split("/")[-1]

        y_options = [
            "normalized intensity",
            "intensity",
            "sqrt(intensity)",
            "log(intensity)",
            "intensity - median baseline",
            f"baseline (`scipy.signal.medfilt`, kernel_size={kernel_size})",
            "intensity - polyfit baseline",
            f"baseline (`numpy.polyfit`, deg={polyfit_deg})",
        ]

        return df, y_options

    def generate_xrd_plot(self):
        file_info = None
        all_files = None
        pattern_dfs = None

        if "file_id" not in self.data:
            # If no file set, try to plot them all
            item_info = flask_mongo.db.items.find_one(
                {"item_id": self.data["item_id"]},
            )

            all_files = [
                d
                for d in [
                    get_file_info_by_id(f, update_if_live=False)
                    for f in item_info["file_ObjectIds"]
                ]
                if any(d["name"].lower().endswith(ext) for ext in self.accepted_file_extensions)
            ]

            if not all_files:
                LOGGER.warning(
                    "XRDBlock.generate_xrd_plot(): Unsupported file extension (must be .xrdml or .xy)"
                )
                raise RuntimeError("XRDBlock.generate_xrd_plot(): No file set in DataBlock")

            pattern_dfs = []
            for f in all_files:
                try:
                    pattern_df, y_options = self.load_pattern(
                        f["location"],
                        wavelength=float(self.data.get("wavelength", self.defaults["wavelength"])),
                    )
                except Exception as exc:
                    raise RuntimeError(
                        f"Could not parse file {file_info['location']}. Error: {exc}"
                    )
                pattern_dfs.append(pattern_df)

        else:
            file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
            ext = os.path.splitext(file_info["location"].split("/")[-1])[-1].lower()
            if ext not in self.accepted_file_extensions:
                raise RuntimeError(
                    "XRDBlock.generate_xrd_plot(): Unsupported file extension (must be one of %s), not %s",
                    self.accepted_file_extensions,
                    ext,
                )

            pattern_dfs, y_options = self.load_pattern(
                file_info["location"],
                wavelength=float(self.data.get("wavelength", self.defaults["wavelength"])),
            )
            pattern_dfs = [pattern_dfs]

        if pattern_dfs:
            p = selectable_axes_plot(
                pattern_dfs,
                x_options=["2θ (°)", "Q (Å⁻¹)", "d (Å)"],
                y_options=y_options,
                plot_line=True,
                plot_points=True,
                point_size=3,
            )

            self.data["bokeh_plot_data"] = bokeh.embed.json_item(p, theme=mytheme)
accepted_file_extensions: Sequence[str]
blocktype: str
defaults: Dict[str, Any]
description: str
plot_functions property readonly
load_pattern(location: str, wavelength: float = None) -> Tuple[pandas.core.frame.DataFrame, List[str]] classmethod
Source code in pydatalab/apps/xrd/blocks.py
@classmethod
def load_pattern(
    self, location: str, wavelength: float = None
) -> Tuple[pd.DataFrame, List[str]]:

    if not isinstance(location, str):
        location = str(location)

    ext = os.path.splitext(location.split("/")[-1])[-1].lower()

    if ext == ".xrdml":
        df = parse_xrdml(location)

    elif ext == ".xy":
        df = pd.read_csv(location, sep=r"\s+", names=["twotheta", "intensity"])

    else:
        df = pd.read_csv(location, sep=r"\s+", names=["twotheta", "intensity", "error"])

    df = df.rename(columns={"twotheta": "2θ (°)"})

    # if no wavelength (or invalid wavelength) is passed, don't convert to Q and d
    if wavelength:
        try:
            df["Q (Å⁻¹)"] = 4 * np.pi / wavelength * np.sin(np.deg2rad(df["2θ (°)"]) / 2)
            df["d (Å)"] = 2 * np.pi / df["Q (Å⁻¹)"]
        except (ValueError, ZeroDivisionError):
            pass

    df["sqrt(intensity)"] = np.sqrt(df["intensity"])
    df["log(intensity)"] = np.log10(df["intensity"])
    df["normalized intensity"] = df["intensity"] / np.max(df["intensity"])
    polyfit_deg = 15
    polyfit_baseline = np.poly1d(
        np.polyfit(df["2θ (°)"], df["normalized intensity"], deg=polyfit_deg)
    )(df["2θ (°)"])
    df["intensity - polyfit baseline"] = df["normalized intensity"] - polyfit_baseline
    df["intensity - polyfit baseline"] /= np.max(df["intensity - polyfit baseline"])
    df[f"baseline (`numpy.polyfit`, deg={polyfit_deg})"] = polyfit_baseline / np.max(
        df["intensity - polyfit baseline"]
    )

    kernel_size = 101
    median_baseline = medfilt(df["normalized intensity"], kernel_size=kernel_size)
    df["intensity - median baseline"] = df["normalized intensity"] - median_baseline
    df["intensity - median baseline"] /= np.max(df["intensity - median baseline"])
    df[
        f"baseline (`scipy.signal.medfilt`, kernel_size={kernel_size})"
    ] = median_baseline / np.max(df["intensity - median baseline"])

    df.index.name = location.split("/")[-1]

    y_options = [
        "normalized intensity",
        "intensity",
        "sqrt(intensity)",
        "log(intensity)",
        "intensity - median baseline",
        f"baseline (`scipy.signal.medfilt`, kernel_size={kernel_size})",
        "intensity - polyfit baseline",
        f"baseline (`numpy.polyfit`, deg={polyfit_deg})",
    ]

    return df, y_options
generate_xrd_plot(self)
Source code in pydatalab/apps/xrd/blocks.py
def generate_xrd_plot(self):
    file_info = None
    all_files = None
    pattern_dfs = None

    if "file_id" not in self.data:
        # If no file set, try to plot them all
        item_info = flask_mongo.db.items.find_one(
            {"item_id": self.data["item_id"]},
        )

        all_files = [
            d
            for d in [
                get_file_info_by_id(f, update_if_live=False)
                for f in item_info["file_ObjectIds"]
            ]
            if any(d["name"].lower().endswith(ext) for ext in self.accepted_file_extensions)
        ]

        if not all_files:
            LOGGER.warning(
                "XRDBlock.generate_xrd_plot(): Unsupported file extension (must be .xrdml or .xy)"
            )
            raise RuntimeError("XRDBlock.generate_xrd_plot(): No file set in DataBlock")

        pattern_dfs = []
        for f in all_files:
            try:
                pattern_df, y_options = self.load_pattern(
                    f["location"],
                    wavelength=float(self.data.get("wavelength", self.defaults["wavelength"])),
                )
            except Exception as exc:
                raise RuntimeError(
                    f"Could not parse file {file_info['location']}. Error: {exc}"
                )
            pattern_dfs.append(pattern_df)

    else:
        file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
        ext = os.path.splitext(file_info["location"].split("/")[-1])[-1].lower()
        if ext not in self.accepted_file_extensions:
            raise RuntimeError(
                "XRDBlock.generate_xrd_plot(): Unsupported file extension (must be one of %s), not %s",
                self.accepted_file_extensions,
                ext,
            )

        pattern_dfs, y_options = self.load_pattern(
            file_info["location"],
            wavelength=float(self.data.get("wavelength", self.defaults["wavelength"])),
        )
        pattern_dfs = [pattern_dfs]

    if pattern_dfs:
        p = selectable_axes_plot(
            pattern_dfs,
            x_options=["2θ (°)", "Q (Å⁻¹)", "d (Å)"],
            y_options=y_options,
            plot_line=True,
            plot_points=True,
            point_size=3,
        )

        self.data["bokeh_plot_data"] = bokeh.embed.json_item(p, theme=mytheme)
models
Classes
XRDPattern (BaseModel) pydantic-model

This model defines the structure of the data that is expected for a solid-state XRD pattern.

Source code in pydatalab/apps/xrd/models.py
class XRDPattern(BaseModel):
    """This model defines the structure of the data that is expected
    for a solid-state XRD pattern.

    """

    wavelength: float

    two_theta: List[float]

    d_spacings: List[float]

    q_values: List[float]

    intensities: List[float]
wavelength: float pydantic-field required
two_theta: List[float] pydantic-field required
d_spacings: List[float] pydantic-field required
q_values: List[float] pydantic-field required
intensities: List[float] pydantic-field required
XRDProcessing (BaseModel) pydantic-model
Source code in pydatalab/apps/xrd/models.py
class XRDProcessing(BaseModel):

    peak_positions: List[float]

    peak_intensities: List[float]

    peak_widths: List[float]

    baselines: List[List[float]]

    class Config:
        extra = "allow"
peak_positions: List[float] pydantic-field required
peak_intensities: List[float] pydantic-field required
peak_widths: List[float] pydantic-field required
baselines: List[List[float]] pydantic-field required
XRDMetadata (BaseModel) pydantic-model
Source code in pydatalab/apps/xrd/models.py
class XRDMetadata(BaseModel):
    ...
XRDMeasurement (BaseModel) pydantic-model
Source code in pydatalab/apps/xrd/models.py
class XRDMeasurement(BaseModel):

    data: Optional[XRDPattern]
    processing: Optional[XRDProcessing]
    metadata: Optional[XRDMetadata]
data: XRDPattern pydantic-field
processing: XRDProcessing pydantic-field
metadata: XRDMetadata pydantic-field
utils
DATA_REGEX
STARTEND_REGEX
XrdmlParseError (Exception)
Source code in pydatalab/apps/xrd/utils.py
class XrdmlParseError(Exception):
    pass
Functions
parse_xrdml(filename: str) -> DataFrame

Parses an XRDML file and returns a pandas DataFrame with columns twotheta and intensity.

Parameters:

Name Type Description Default
filename str

The file to parse.

required
Source code in pydatalab/apps/xrd/utils.py
def parse_xrdml(filename: str) -> pd.DataFrame:
    """Parses an XRDML file and returns a pandas DataFrame with columns
    twotheta and intensity.

    Parameters:
        filename: The file to parse.

    """
    with open(filename, "r") as f:
        s = f.read()

    start, end = getStartEnd(s)  # extract first and last angle
    intensities = getIntensities(s)  # extract intensities

    angles = np.linspace(start, end, num=len(intensities))

    return pd.DataFrame(
        {
            "twotheta": angles,
            "intensity": intensities,
        }
    )
convertSinglePattern(filename: str, directory: str = '.', adjust_baseline: bool = False, overwrite: bool = False) -> str

Converts an XRDML file to a simple xy and writes it to the passed directory, without overwriting any existing files.

Parameters:

Name Type Description Default
filename str

The file to convert.

required
directory str

The output directory.

'.'
adjust_baseline bool

If True, the baseline will be adjusted so that no points are negative.

False
overwrite bool

If True, existing files with the same filenames will be overwritten.

False

Returns:

Type Description
str

The output filename.

Source code in pydatalab/apps/xrd/utils.py
def convertSinglePattern(
    filename: str,
    directory: str = ".",
    adjust_baseline: bool = False,
    overwrite: bool = False,
) -> str:
    """Converts an XRDML file to a simple xy and writes it to the passed directory, without
    overwriting any existing files.

    Parameters:
        filename: The file to convert.
        directory: The output directory.
        adjust_baseline: If True, the baseline will be adjusted so that no points are negative.
        overwrite: If True, existing files with the same filenames will be overwritten.

    Returns:
        The output filename.

    """
    filename = os.path.join(directory, filename)
    outfn = filename + ".xy"
    if os.path.exists(outfn):
        if overwrite:
            print(f"{outfn} already exists in the directory {directory}. Overwriting.")
        else:
            warnings.warn(
                f"{outfn} already exists in the directory {directory}, will not overwrite"
            )
            return outfn

    with open(filename, "r") as f:
        s = f.read()

    print(f"Processing file {filename}")
    start, end = getStartEnd(s)
    print(f"\tstart angle: {start}\tend angle: {end}")
    intensities = getIntensities(s)

    if adjust_baseline:
        intensities = np.array(intensities)  # type: ignore
        minI = np.min(intensities)
        if minI < 0:
            print(
                f"\tadjusting baseline so that no points are negative (adding {-1 * minI} counts)"
            )
            intensities -= minI
        else:
            print("\tno intensitites are less than zero, so no baseline adjustment performed")

        intensities = intensities.tolist()  # type: ignore

    print(f"\tnumber of datapoints: {len(intensities)}")
    xystring = toXY(intensities, start, end)
    with open(outfn, "w") as of:
        of.write(xystring)
    print("\tSuccess!")
    return outfn
getStartEnd(s: str) -> Tuple[float, float]

Parse a given string representation of an xrdml file to find the start and end 2Theta points of the scan. Note: this could match either Omega or 2Theta depending on their order in the XRDML file.

Exceptions:

Type Description
XrdmlParseError

if the start and end positions could not be found.

Returns:

Type Description
Tuple[float, float]

(start, end) positions in the XRDML file.

Source code in pydatalab/apps/xrd/utils.py
def getStartEnd(s: str) -> Tuple[float, float]:
    """Parse a given string representation of an xrdml file to find the start and end 2Theta points of the scan.
    Note: this could match either Omega or 2Theta depending on their order in the XRDML file.

    Raises:
        XrdmlParseError: if the start and end positions could not be found.

    Returns:
        (start, end) positions in the XRDML file.

    """
    match = re.search(STARTEND_REGEX, s)
    if not match:
        raise XrdmlParseError("the start and end 2theta positions were not found in the XRDML file")

    start = float(match.group(1))
    end = float(match.group(2))

    return start, end
getIntensities(s: str) -> List[float]

Parse a given string representation of an xrdml file to find the peak intensities.

Exceptions:

Type Description
XrdmlParseError

if intensities could not be found in the file

Returns:

Type Description
List[float]

The array of intensitites.

Source code in pydatalab/apps/xrd/utils.py
def getIntensities(s: str) -> List[float]:
    """Parse a given string representation of an xrdml file to find the peak intensities.

    Raises:
        XrdmlParseError: if intensities could not be found in the file

    Returns:
        The array of intensitites.

    """
    match = re.search(DATA_REGEX, s)
    if not match:
        raise XrdmlParseError("the intensitites were not found in the XML file")

    out = [float(x) for x in match.group(1).split()]  # the intensitites as a list of integers
    return out
toXY(intensities: List[float], start: float, end: float) -> str

Converts a given list of intensities, along with a start and end angle, to a string in XY format.

Source code in pydatalab/apps/xrd/utils.py
def toXY(intensities: List[float], start: float, end: float) -> str:
    """Converts a given list of intensities, along with a start and end angle,
    to a string in XY format.

    """
    angles = np.linspace(start, end, num=len(intensities))
    xylines = ["{:.5f} {:.3f}\r\n".format(a, i) for a, i in zip(angles, intensities)]
    return "".join(xylines)

blocks special

BLOCKS: Sequence[Type[pydatalab.blocks.blocks.DataBlock]]
BLOCK_TYPES: Dict[str, Type[pydatalab.blocks.blocks.DataBlock]]
Modules
blocks
Classes
DataBlock

base class for a data block.

Source code in pydatalab/blocks/blocks.py
class DataBlock:
    """base class for a data block."""

    blocktype: str = "generic"
    description: str = "Generic Block"
    accepted_file_extensions: Sequence[str]
    # values that are set by default if they are not supplied by the dictionary in init()
    defaults: Dict[str, Any] = {}
    # values cached on the block instance for faster retrieval
    cache: Optional[Dict[str, Any]] = None
    plot_functions: Optional[Sequence[Callable[[], None]]] = None
    # whether this datablock can operate on collection data, or just individual items
    __supports_collections: bool = False

    def __init__(
        self,
        item_id: Optional[str] = None,
        collection_id: Optional[str] = None,
        dictionary=None,
        unique_id=None,
    ):
        if dictionary is None:
            dictionary = {}

        if item_id is None and not self.__supports_collections:
            raise RuntimeError(f"Must supply `item_id` to make {self.__class__.__name__}.")

        if collection_id is not None and not self.__supports_collections:
            raise RuntimeError(
                f"This block ({self.__class__.__name__}) does not support collections."
            )

        if item_id is not None and collection_id is not None:
            raise RuntimeError("Must provide only one of `item_id` and `collection_id`.")

        # Initialise cache
        self.cache = {}

        LOGGER.debug(
            "Creating new block '%s' associated with item_id '%s'",
            self.__class__.__name__,
            item_id,
        )
        self.block_id = (
            unique_id or generate_random_id()
        )  # this is supposed to be a unique id for use in html and the database.
        self.data = {
            "item_id": item_id,
            "collection_id": collection_id,
            "blocktype": self.blocktype,
            "block_id": self.block_id,
            **self.defaults,
        }

        # convert ObjectId file_ids to string to make handling them easier when sending to and from web
        if "file_id" in self.data:
            self.data["file_id"] = str(self.data["file_id"])

        if "title" not in self.data:
            self.data["title"] = self.description
        self.data.update(
            dictionary
        )  # this could overwrite blocktype and block_id. I think that's reasonable... maybe
        LOGGER.debug(
            "Initialised block %s for item ID %s or collection ID %s.",
            self.__class__.__name__,
            item_id,
            collection_id,
        )

    def to_db(self):
        """returns a dictionary with the data for this
        block, ready to be input into mongodb"""
        LOGGER.debug("Casting block %s to database object.", self.__class__.__name__)

        if "file_id" in self.data:
            dict_for_db = self.data.copy()  # gross, I know
            dict_for_db["file_id"] = ObjectId(dict_for_db["file_id"])
            return dict_for_db

        if "bokeh_plot_data" in self.data:
            self.data.pop("bokeh_plot_data")
        return self.data

    @classmethod
    def from_db(cls, db_entry):
        """create a block from json (dictionary) stored in a db"""
        LOGGER.debug("Loading block %s from database object.", cls.__class__.__name__)
        new_block = cls(
            item_id=db_entry.get("item_id"),
            collection_id=db_entry.get("collection_id"),
            dictionary=db_entry,
        )
        if "file_id" in new_block.data:
            new_block.data["file_id"] = str(new_block.data["file_id"])
        return new_block

    def to_web(self):
        """returns a json-able dictionary to render the block on the web"""
        if self.plot_functions:
            for plot in self.plot_functions:
                try:
                    plot()
                except RuntimeError:
                    LOGGER.warning(
                        f"Could not create plot for {self.__class__.__name__}: {self.data}"
                    )
        return self.data

    @classmethod
    def from_web(cls, data):
        LOGGER.debug("Loading block %s from web request.", cls.__class__.__name__)
        block = cls(
            item_id=data.get("item_id"),
            collection_id=data.get("collection_id"),
            unique_id=data["block_id"],
        )
        block.update_from_web(data)
        return block

    def update_from_web(self, data):
        """update the object with data received from the website. Only updates fields
        that are specified in the dictionary- other fields are left alone"""
        LOGGER.debug(
            "Updating block %s from web request",
            self.__class__.__name__,
        )
        self.data.update(data)

        return self
blocktype: str
cache: Optional[Dict[str, Any]]
defaults: Dict[str, Any]
description: str
plot_functions: Optional[Sequence[Callable[[], NoneType]]]
Methods
__init__(self, item_id: Optional[str] = None, collection_id: Optional[str] = None, dictionary = None, unique_id = None) special
Source code in pydatalab/blocks/blocks.py
def __init__(
    self,
    item_id: Optional[str] = None,
    collection_id: Optional[str] = None,
    dictionary=None,
    unique_id=None,
):
    if dictionary is None:
        dictionary = {}

    if item_id is None and not self.__supports_collections:
        raise RuntimeError(f"Must supply `item_id` to make {self.__class__.__name__}.")

    if collection_id is not None and not self.__supports_collections:
        raise RuntimeError(
            f"This block ({self.__class__.__name__}) does not support collections."
        )

    if item_id is not None and collection_id is not None:
        raise RuntimeError("Must provide only one of `item_id` and `collection_id`.")

    # Initialise cache
    self.cache = {}

    LOGGER.debug(
        "Creating new block '%s' associated with item_id '%s'",
        self.__class__.__name__,
        item_id,
    )
    self.block_id = (
        unique_id or generate_random_id()
    )  # this is supposed to be a unique id for use in html and the database.
    self.data = {
        "item_id": item_id,
        "collection_id": collection_id,
        "blocktype": self.blocktype,
        "block_id": self.block_id,
        **self.defaults,
    }

    # convert ObjectId file_ids to string to make handling them easier when sending to and from web
    if "file_id" in self.data:
        self.data["file_id"] = str(self.data["file_id"])

    if "title" not in self.data:
        self.data["title"] = self.description
    self.data.update(
        dictionary
    )  # this could overwrite blocktype and block_id. I think that's reasonable... maybe
    LOGGER.debug(
        "Initialised block %s for item ID %s or collection ID %s.",
        self.__class__.__name__,
        item_id,
        collection_id,
    )
to_db(self)

returns a dictionary with the data for this block, ready to be input into mongodb

Source code in pydatalab/blocks/blocks.py
def to_db(self):
    """returns a dictionary with the data for this
    block, ready to be input into mongodb"""
    LOGGER.debug("Casting block %s to database object.", self.__class__.__name__)

    if "file_id" in self.data:
        dict_for_db = self.data.copy()  # gross, I know
        dict_for_db["file_id"] = ObjectId(dict_for_db["file_id"])
        return dict_for_db

    if "bokeh_plot_data" in self.data:
        self.data.pop("bokeh_plot_data")
    return self.data
from_db(db_entry) classmethod

create a block from json (dictionary) stored in a db

Source code in pydatalab/blocks/blocks.py
@classmethod
def from_db(cls, db_entry):
    """create a block from json (dictionary) stored in a db"""
    LOGGER.debug("Loading block %s from database object.", cls.__class__.__name__)
    new_block = cls(
        item_id=db_entry.get("item_id"),
        collection_id=db_entry.get("collection_id"),
        dictionary=db_entry,
    )
    if "file_id" in new_block.data:
        new_block.data["file_id"] = str(new_block.data["file_id"])
    return new_block
to_web(self)

returns a json-able dictionary to render the block on the web

Source code in pydatalab/blocks/blocks.py
def to_web(self):
    """returns a json-able dictionary to render the block on the web"""
    if self.plot_functions:
        for plot in self.plot_functions:
            try:
                plot()
            except RuntimeError:
                LOGGER.warning(
                    f"Could not create plot for {self.__class__.__name__}: {self.data}"
                )
    return self.data
from_web(data) classmethod
Source code in pydatalab/blocks/blocks.py
@classmethod
def from_web(cls, data):
    LOGGER.debug("Loading block %s from web request.", cls.__class__.__name__)
    block = cls(
        item_id=data.get("item_id"),
        collection_id=data.get("collection_id"),
        unique_id=data["block_id"],
    )
    block.update_from_web(data)
    return block
update_from_web(self, data)

update the object with data received from the website. Only updates fields that are specified in the dictionary- other fields are left alone

Source code in pydatalab/blocks/blocks.py
def update_from_web(self, data):
    """update the object with data received from the website. Only updates fields
    that are specified in the dictionary- other fields are left alone"""
    LOGGER.debug(
        "Updating block %s from web request",
        self.__class__.__name__,
    )
    self.data.update(data)

    return self
NotSupportedBlock (DataBlock)
Source code in pydatalab/blocks/blocks.py
class NotSupportedBlock(DataBlock):
    blocktype = "notsupported"
    description = "Block not supported"
    __supports_collections = True
blocktype: str
description: str
CommentBlock (DataBlock)
Source code in pydatalab/blocks/blocks.py
class CommentBlock(DataBlock):
    blocktype = "comment"
    description = "Comment"
    __supports_collections = True
blocktype: str
description: str
MediaBlock (DataBlock)
Source code in pydatalab/blocks/blocks.py
class MediaBlock(DataBlock):
    blocktype = "media"
    description = "Media"
    accepted_file_extensions = (".png", ".jpeg", ".jpg", ".tif", ".tiff", ".mp4", ".mov", ".webm")
    __supports_collections = False

    @property
    def plot_functions(self):
        return (self.encode_tiff,)

    def encode_tiff(self):
        if "file_id" not in self.data:
            LOGGER.warning("ImageBlock.encode_tiff(): No file set in the DataBlock")
            return
        if "b64_encoded_image" not in self.data:
            self.data["b64_encoded_image"] = {}
        file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
        if file_info["name"].endswith(".tif") or file_info["name"].endswith(".tiff"):
            im = Image.open(file_info["location"])
            LOGGER.warning("Making base64 encoding of tif")
            with io.BytesIO() as f:
                im.save(f, format="PNG")
                f.seek(0)
                self.data["b64_encoded_image"][self.data["file_id"]] = base64.b64encode(
                    f.getvalue()
                ).decode()
accepted_file_extensions: Sequence[str]
blocktype: str
description: str
plot_functions property readonly
encode_tiff(self)
Source code in pydatalab/blocks/blocks.py
def encode_tiff(self):
    if "file_id" not in self.data:
        LOGGER.warning("ImageBlock.encode_tiff(): No file set in the DataBlock")
        return
    if "b64_encoded_image" not in self.data:
        self.data["b64_encoded_image"] = {}
    file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
    if file_info["name"].endswith(".tif") or file_info["name"].endswith(".tiff"):
        im = Image.open(file_info["location"])
        LOGGER.warning("Making base64 encoding of tif")
        with io.BytesIO() as f:
            im.save(f, format="PNG")
            f.seek(0)
            self.data["b64_encoded_image"][self.data["file_id"]] = base64.b64encode(
                f.getvalue()
            ).decode()
NMRBlock (DataBlock)
Source code in pydatalab/blocks/blocks.py
class NMRBlock(DataBlock):
    blocktype = "nmr"
    description = "Simple NMR Block"
    accepted_file_extensions = ".zip"
    defaults = {"process number": 1}
    __supports_collections = False

    @property
    def plot_functions(self):
        return (self.generate_nmr_plot,)

    def read_bruker_nmr_data(self):
        if "file_id" not in self.data:
            LOGGER.warning("NMRPlot.read_bruker_nmr_data(): No file set in the DataBlock")
            return

        zip_file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
        filename = zip_file_info["name"]

        name, ext = os.path.splitext(filename)
        if ext.lower() not in self.accepted_file_extensions:
            LOGGER.warning(
                "NMRBlock.read_bruker_nmr_data(): Unsupported file extension (must be .zip)"
            )
            return

        # unzip:
        directory_location = zip_file_info["location"] + ".extracted"
        LOGGER.debug(f"Directory location is: {directory_location}")
        with zipfile.ZipFile(zip_file_info["location"], "r") as zip_ref:
            zip_ref.extractall(directory_location)

        extracted_directory_name = os.path.join(directory_location, name)
        available_processes = os.listdir(os.path.join(extracted_directory_name, "pdata"))

        if self.data.get("selected_process") not in available_processes:
            self.data["selected_process"] = available_processes[0]

        try:
            df, a_dic, topspin_title, processed_data_shape = nmr_utils.read_bruker_1d(
                os.path.join(directory_location, name),
                process_number=self.data["selected_process"],
                verbose=False,
            )
        except Exception as error:
            LOGGER.critical(f"Unable to parse {name} as Bruker project. {error}")
            return

        serialized_df = df.to_dict() if (df is not None) else None

        # all data sorted in a fairly raw way
        self.data["processed_data"] = serialized_df
        self.data["acquisition_parameters"] = a_dic["acqus"]
        self.data["processing_parameters"] = a_dic["procs"]
        self.data["pulse_program"] = a_dic["pprog"]

        # specific things that we might want to pull out for the UI:
        self.data["available_processes"] = available_processes
        self.data["nucleus"] = a_dic["acqus"]["NUC1"]
        self.data["carrier_frequency_MHz"] = a_dic["acqus"]["SFO1"]
        self.data["carrier_offset_Hz"] = a_dic["acqus"]["O1"]
        self.data["recycle_delay"] = a_dic["acqus"]["D"][1]
        self.data["nscans"] = a_dic["acqus"]["NS"]
        self.data["CNST31"] = a_dic["acqus"]["CNST"][31]
        self.data["processed_data_shape"] = processed_data_shape

        self.data["probe_name"] = a_dic["acqus"]["PROBHD"]
        self.data["pulse_program_name"] = a_dic["acqus"]["PULPROG"]
        self.data["topspin_title"] = topspin_title

    def generate_nmr_plot(self):
        self.read_bruker_nmr_data()  # currently calls every time plotting happens, but it should only happen if the file was updated
        if "processed_data" not in self.data or not self.data["processed_data"]:
            self.data["bokeh_plot_data"] = None
            return

        df = pd.DataFrame(self.data["processed_data"])
        df["normalized intensity"] = df.intensity / df.intensity.max()

        bokeh_layout = selectable_axes_plot(
            df,
            x_options=["ppm", "hz"],
            y_options=[
                "intensity",
                "intensity_per_scan",
                "normalized intensity",
            ],
            plot_line=True,
            point_size=3,
        )
        bokeh_layout.children[0].x_range.flipped = True  # flip x axis, per NMR convention

        self.data["bokeh_plot_data"] = bokeh.embed.json_item(bokeh_layout, theme=mytheme)
accepted_file_extensions: Sequence[str]
blocktype: str
defaults: Dict[str, Any]
description: str
plot_functions property readonly
read_bruker_nmr_data(self)
Source code in pydatalab/blocks/blocks.py
def read_bruker_nmr_data(self):
    if "file_id" not in self.data:
        LOGGER.warning("NMRPlot.read_bruker_nmr_data(): No file set in the DataBlock")
        return

    zip_file_info = get_file_info_by_id(self.data["file_id"], update_if_live=True)
    filename = zip_file_info["name"]

    name, ext = os.path.splitext(filename)
    if ext.lower() not in self.accepted_file_extensions:
        LOGGER.warning(
            "NMRBlock.read_bruker_nmr_data(): Unsupported file extension (must be .zip)"
        )
        return

    # unzip:
    directory_location = zip_file_info["location"] + ".extracted"
    LOGGER.debug(f"Directory location is: {directory_location}")
    with zipfile.ZipFile(zip_file_info["location"], "r") as zip_ref:
        zip_ref.extractall(directory_location)

    extracted_directory_name = os.path.join(directory_location, name)
    available_processes = os.listdir(os.path.join(extracted_directory_name, "pdata"))

    if self.data.get("selected_process") not in available_processes:
        self.data["selected_process"] = available_processes[0]

    try:
        df, a_dic, topspin_title, processed_data_shape = nmr_utils.read_bruker_1d(
            os.path.join(directory_location, name),
            process_number=self.data["selected_process"],
            verbose=False,
        )
    except Exception as error:
        LOGGER.critical(f"Unable to parse {name} as Bruker project. {error}")
        return

    serialized_df = df.to_dict() if (df is not None) else None

    # all data sorted in a fairly raw way
    self.data["processed_data"] = serialized_df
    self.data["acquisition_parameters"] = a_dic["acqus"]
    self.data["processing_parameters"] = a_dic["procs"]
    self.data["pulse_program"] = a_dic["pprog"]

    # specific things that we might want to pull out for the UI:
    self.data["available_processes"] = available_processes
    self.data["nucleus"] = a_dic["acqus"]["NUC1"]
    self.data["carrier_frequency_MHz"] = a_dic["acqus"]["SFO1"]
    self.data["carrier_offset_Hz"] = a_dic["acqus"]["O1"]
    self.data["recycle_delay"] = a_dic["acqus"]["D"][1]
    self.data["nscans"] = a_dic["acqus"]["NS"]
    self.data["CNST31"] = a_dic["acqus"]["CNST"][31]
    self.data["processed_data_shape"] = processed_data_shape

    self.data["probe_name"] = a_dic["acqus"]["PROBHD"]
    self.data["pulse_program_name"] = a_dic["acqus"]["PULPROG"]
    self.data["topspin_title"] = topspin_title
generate_nmr_plot(self)
Source code in pydatalab/blocks/blocks.py
def generate_nmr_plot(self):
    self.read_bruker_nmr_data()  # currently calls every time plotting happens, but it should only happen if the file was updated
    if "processed_data" not in self.data or not self.data["processed_data"]:
        self.data["bokeh_plot_data"] = None
        return

    df = pd.DataFrame(self.data["processed_data"])
    df["normalized intensity"] = df.intensity / df.intensity.max()

    bokeh_layout = selectable_axes_plot(
        df,
        x_options=["ppm", "hz"],
        y_options=[
            "intensity",
            "intensity_per_scan",
            "normalized intensity",
        ],
        plot_line=True,
        point_size=3,
    )
    bokeh_layout.children[0].x_range.flipped = True  # flip x axis, per NMR convention

    self.data["bokeh_plot_data"] = bokeh.embed.json_item(bokeh_layout, theme=mytheme)
Functions
generate_random_id()

This function generates a random 15-length string for use as an id for a datablock. It should be sufficiently random that there is a negligible risk of ever generating the same id twice, so this is a unique id that can be used as a unique database refrence and also can be used as id in the DOM. Note: uuid.uuid4() would do this too, but I think the generated ids are too long and ugly.

The ids here are HTML id friendly, using lowercase letters and numbers. The first character is always a letter.

Source code in pydatalab/blocks/blocks.py
def generate_random_id():
    """This function generates a random 15-length string for use as an id for a datablock. It
    should be sufficiently random that there is a negligible risk of ever generating
    the same id twice, so this is a unique id that can be used as a unique database refrence
    and also can be used as id in the DOM. Note: uuid.uuid4() would do this too, but I think
    the generated ids are too long and ugly.

    The ids here are HTML id friendly, using lowercase letters and numbers. The first character
    is always a letter.
    """
    randlist = [random.choice("abcdefghijklmnopqrstuvwxyz")] + random.choices(
        "abcdefghijklmnopqrstuvwxyz0123456789", k=14
    )
    return "".join(randlist)
echem_block
Classes
CycleBlock (DataBlock)

A data block for processing electrochemical cycling data.

This class that contains functions for processing dataframes created by navani from raw cycler files and plotting them with Bokeh.

Source code in pydatalab/blocks/echem_block.py
class CycleBlock(DataBlock):
    """A data block for processing electrochemical cycling data.

    This class that contains functions for processing dataframes created by navani
    from raw cycler files and plotting them with Bokeh.

    """

    blocktype = "cycle"
    description = "Electrochemical cycling"

    accepted_file_extensions = (
        ".mpr",
        ".txt",
        ".xls",
        ".xlsx",
        ".txt",
        ".res",
    )

    cache: Dict[str, Any]

    defaults = {
        "p_spline": 5,
        "s_spline": 5,
        "win_size_2": 101,
        "win_size_1": 1001,
        "derivative_mode": None,
    }

    def _get_characteristic_mass_g(self):
        # return {"characteristic_mass": 1000}
        doc = flask_mongo.db.items.find_one(
            {"item_id": self.data["item_id"]}, {"characteristic_mass": 1}
        )
        characteristic_mass_mg = doc.get("characteristic_mass", None)
        if characteristic_mass_mg:
            return characteristic_mass_mg / 1000.0
        return None

    def _load(self, file_id: Union[str, ObjectId], reload: bool = False):
        """Loads the echem data using navani, summarises it, then caches the results
        to disk with suffixed names.

        Parameters:
            file_id: The ID of the file to load.
            reload: Whether to reload the data from the file, or use the cached version, if available.

        """

        required_keys = (
            "Time",
            "Voltage",
            "Capacity",
            "Current",
            "dqdv",
            "dvdq",
            "half cycle",
            "full cycle",
        )

        keys_with_units = {
            "Time": "time (s)",
            "Voltage": "voltage (V)",
            "Capacity": "capacity (mAh)",
            "Current": "current (mA)",
            "Charge Capacity": "charge capacity (mAh)",
            "Discharge Capacity": "discharge capacity (mAh)",
            "dqdv": "dQ/dV (mA/V)",
            "dvdq": "dV/dQ (V/mA)",
        }

        file_info = get_file_info_by_id(file_id, update_if_live=True)
        filename = file_info["name"]

        if file_info.get("is_live"):
            reload = True

        ext = os.path.splitext(filename)[-1].lower()

        if ext not in self.accepted_file_extensions:
            raise RuntimeError(
                f"Unrecognized filetype {ext}, must be one of {self.accepted_file_extensions}"
            )

        parsed_file_loc = Path(file_info["location"]).with_suffix(".RAW_PARSED.pkl")
        cycle_summary_file_loc = Path(file_info["location"]).with_suffix(".SUMMARY.pkl")

        raw_df = None
        cycle_summary_df = None
        if not reload:
            if parsed_file_loc.exists():
                raw_df = pd.read_pickle(parsed_file_loc)

            if cycle_summary_file_loc.exists():
                cycle_summary_df = pd.read_pickle(cycle_summary_file_loc)

        if raw_df is None:
            try:
                LOGGER.debug("Loading file %s", file_info["location"])
                start_time = time.time()
                raw_df = ec.echem_file_loader(file_info["location"])
                LOGGER.debug(
                    "Loaded file %s in %s seconds",
                    file_info["location"],
                    time.time() - start_time,
                )
            except Exception as exc:
                raise RuntimeError(f"Navani raised an error when parsing: {exc}") from exc
            raw_df.to_pickle(parsed_file_loc)

        if cycle_summary_df is None:
            cycle_summary_df = ec.cycle_summary(raw_df)
            cycle_summary_df.to_pickle(cycle_summary_file_loc)

        raw_df = raw_df.filter(required_keys)
        raw_df.rename(columns=keys_with_units, inplace=True)

        cycle_summary_df.rename(columns=keys_with_units, inplace=True)
        cycle_summary_df["cycle index"] = pd.to_numeric(cycle_summary_df.index, downcast="integer")

        return raw_df, cycle_summary_df

    def plot_cycle(self):
        """Plots the electrochemical cycling data from the file ID provided in the request."""
        if "file_id" not in self.data:
            LOGGER.warning("No file_id given")
            return
        file_id = self.data["file_id"]

        derivative_modes = (None, "dQ/dV", "dV/dQ", "final capacity")

        if self.data["derivative_mode"] not in derivative_modes:
            LOGGER.warning(
                "Invalid derivative_mode provided: %s. Expected one of %s. Falling back to `None`.",
                self.data["derivative_mode"],
                derivative_modes,
            )
            self.data["derivative_mode"] = None

        if self.data["derivative_mode"] is None:
            mode = "normal"
        else:
            mode = self.data["derivative_mode"]

        # User list input
        cycle_list = self.data.get("cyclenumber", None)
        if not isinstance(cycle_list, list):
            cycle_list = None

        raw_df, cycle_summary_df = self._load(file_id)

        characteristic_mass_g = self._get_characteristic_mass_g()

        if characteristic_mass_g:
            raw_df["capacity (mAh/g)"] = raw_df["capacity (mAh)"] / characteristic_mass_g
            raw_df["current (mA/g)"] = raw_df["current (mA)"] / characteristic_mass_g
            if cycle_summary_df is not None:
                cycle_summary_df["charge capacity (mAh/g)"] = (
                    cycle_summary_df["charge capacity (mAh)"] / characteristic_mass_g
                )
                cycle_summary_df["discharge capacity (mAh/g)"] = (
                    cycle_summary_df["discharge capacity (mAh)"] / characteristic_mass_g
                )

        df = filter_df_by_cycle_index(raw_df, cycle_list)
        if cycle_summary_df is not None:
            cycle_summary_df = filter_df_by_cycle_index(cycle_summary_df, cycle_list)

        if mode in ("dQ/dV", "dV/dQ"):
            df = compute_gpcl_differential(
                df,
                mode=mode,
                polynomial_spline=int(self.data["p_spline"]),
                s_spline=10 ** (-float(self.data["s_spline"])),
                window_size_1=int(self.data["win_size_1"]),
                window_size_2=int(self.data["win_size_2"]),
                use_normalized_capacity=bool(characteristic_mass_g),
            )

        # Reduce df size to 100 points per cycle by default
        df = reduce_echem_cycle_sampling(df, num_samples=100)

        layout = bokeh_plots.double_axes_echem_plot(
            df, cycle_summary=cycle_summary_df, mode=mode, normalized=bool(characteristic_mass_g)
        )

        self.data["bokeh_plot_data"] = bokeh.embed.json_item(layout, theme=mytheme)
        return

    @property
    def plot_functions(self):
        return (self.plot_cycle,)
accepted_file_extensions: Sequence[str]
blocktype: str
defaults: Dict[str, Any]
description: str
plot_functions property readonly
Methods
plot_cycle(self)

Plots the electrochemical cycling data from the file ID provided in the request.

Source code in pydatalab/blocks/echem_block.py
def plot_cycle(self):
    """Plots the electrochemical cycling data from the file ID provided in the request."""
    if "file_id" not in self.data:
        LOGGER.warning("No file_id given")
        return
    file_id = self.data["file_id"]

    derivative_modes = (None, "dQ/dV", "dV/dQ", "final capacity")

    if self.data["derivative_mode"] not in derivative_modes:
        LOGGER.warning(
            "Invalid derivative_mode provided: %s. Expected one of %s. Falling back to `None`.",
            self.data["derivative_mode"],
            derivative_modes,
        )
        self.data["derivative_mode"] = None

    if self.data["derivative_mode"] is None:
        mode = "normal"
    else:
        mode = self.data["derivative_mode"]

    # User list input
    cycle_list = self.data.get("cyclenumber", None)
    if not isinstance(cycle_list, list):
        cycle_list = None

    raw_df, cycle_summary_df = self._load(file_id)

    characteristic_mass_g = self._get_characteristic_mass_g()

    if characteristic_mass_g:
        raw_df["capacity (mAh/g)"] = raw_df["capacity (mAh)"] / characteristic_mass_g
        raw_df["current (mA/g)"] = raw_df["current (mA)"] / characteristic_mass_g
        if cycle_summary_df is not None:
            cycle_summary_df["charge capacity (mAh/g)"] = (
                cycle_summary_df["charge capacity (mAh)"] / characteristic_mass_g
            )
            cycle_summary_df["discharge capacity (mAh/g)"] = (
                cycle_summary_df["discharge capacity (mAh)"] / characteristic_mass_g
            )

    df = filter_df_by_cycle_index(raw_df, cycle_list)
    if cycle_summary_df is not None:
        cycle_summary_df = filter_df_by_cycle_index(cycle_summary_df, cycle_list)

    if mode in ("dQ/dV", "dV/dQ"):
        df = compute_gpcl_differential(
            df,
            mode=mode,
            polynomial_spline=int(self.data["p_spline"]),
            s_spline=10 ** (-float(self.data["s_spline"])),
            window_size_1=int(self.data["win_size_1"]),
            window_size_2=int(self.data["win_size_2"]),
            use_normalized_capacity=bool(characteristic_mass_g),
        )

    # Reduce df size to 100 points per cycle by default
    df = reduce_echem_cycle_sampling(df, num_samples=100)

    layout = bokeh_plots.double_axes_echem_plot(
        df, cycle_summary=cycle_summary_df, mode=mode, normalized=bool(characteristic_mass_g)
    )

    self.data["bokeh_plot_data"] = bokeh.embed.json_item(layout, theme=mytheme)
    return
Functions
reduce_echem_cycle_sampling(df: DataFrame, num_samples: int = 100) -> DataFrame

Reduce number of cycles to at most num_samples points per half cycle. Will keep the endpoint values of each half cycle.

Parameters:

Name Type Description Default
df DataFrame

The echem dataframe to reduce, which must have cycling data stored under a "half cycle" column.

required
num_samples int

The maximum number of sample points to include per cycle.

100

Returns:

Type Description
DataFrame

The output dataframe.

Source code in pydatalab/blocks/echem_block.py
def reduce_echem_cycle_sampling(df: pd.DataFrame, num_samples: int = 100) -> pd.DataFrame:
    """Reduce number of cycles to at most `num_samples` points per half cycle. Will
    keep the endpoint values of each half cycle.

    Parameters:
        df: The echem dataframe to reduce, which must have cycling data stored
            under a `"half cycle"` column.
        num_samples: The maximum number of sample points to include per cycle.

    Returns:
        The output dataframe.

    """

    return_df = pd.DataFrame([])

    for _, half_cycle in df.groupby("half cycle"):
        return_df = pd.concat([return_df, reduce_df_size(half_cycle, num_samples, endpoint=True)])

    return return_df
compute_gpcl_differential(df: DataFrame, mode: str = 'dQ/dV', smoothing: bool = True, polynomial_spline: int = 3, s_spline: float = 1e-05, window_size_1: int = 101, window_size_2: int = 1001, polyorder_1: int = 5, polyorder_2: int = 5, use_normalized_capacity: bool = False) -> DataFrame

Compute differential dQ/dV or dV/dQ for the input dataframe.

Parameters:

Name Type Description Default
df DataFrame

The input dataframe containing the raw cycling data.

required
mode str

Either 'dQ/dV' or 'dV/dQ'. Invalid inputs will default to 'dQ/dV'.

'dQ/dV'
smoothing bool

Whether or not to apply additional smoothing to the output differential curve.

True
polynomial_spline int

The degree of the B-spline fit used by navani.

3
s_spline float

The smoothing parameter used by navani.

1e-05
window_size_1 int

The window size for the savgol filter when smoothing the capacity.

101
window_size_2 int

The window size for the savgol filter when smoothing the final differential.

1001
polyorder_1 int

The polynomial order for the savgol filter when smoothing the capacity.

5
polyorder_2 int

The polynomial order for the savgol filter when smoothing the final differential.

5

Returns:

Type Description
DataFrame

A data frame containing the voltages, capacities and requested differential on the reduced cycle list.

Source code in pydatalab/blocks/echem_block.py
def compute_gpcl_differential(
    df: pd.DataFrame,
    mode: str = "dQ/dV",
    smoothing: bool = True,
    polynomial_spline: int = 3,
    s_spline: float = 1e-5,
    window_size_1: int = 101,
    window_size_2: int = 1001,
    polyorder_1: int = 5,
    polyorder_2: int = 5,
    use_normalized_capacity: bool = False,
) -> pd.DataFrame:
    """Compute differential dQ/dV or dV/dQ for the input dataframe.

    Args:
        df: The input dataframe containing the raw cycling data.
        mode: Either 'dQ/dV' or 'dV/dQ'. Invalid inputs will default to 'dQ/dV'.
        smoothing: Whether or not to apply additional smoothing to the output differential curve.
        polynomial_spline: The degree of the B-spline fit used by navani.
        s_spline: The smoothing parameter used by navani.
        window_size_1: The window size for the `savgol` filter when smoothing the capacity.
        window_size_2: The window size for the `savgol` filter when smoothing the final differential.
        polyorder_1: The polynomial order for the `savgol` filter when smoothing the capacity.
        polyorder_2: The polynomial order for the `savgol` filter when smoothing the final differential.

    Returns:
        A data frame containing the voltages, capacities and requested differential
        on the reduced cycle list.

    """
    if len(df) < 2:
        LOGGER.debug(
            f"compute_gpcl_differential called on dataframe with length {len(df)}, too small to calculate derivatives"
        )
        return df

    if mode.lower().replace("/", "") == "dvdq":
        y_label = "voltage (V)"
        x_label = "capacity (mAh/g)" if use_normalized_capacity else "capacity (mAh)"
        yp_label = "dV/dQ (V/mA)"
    else:
        y_label = "capacity (mAh/g)" if use_normalized_capacity else "capacity (mAh)"
        x_label = "voltage (V)"
        yp_label = "dQ/dV (mA/V)"

    smoothing_parameters = {
        "polynomial_spline": polynomial_spline,
        "s_spline": s_spline,
        "window_size_1": window_size_1 if window_size_1 % 2 else window_size_1 + 1,
        "window_size_2": window_size_2 if window_size_2 % 2 else window_size_2 + 1,
        "polyorder_1": polyorder_1,
        "polyorder_2": polyorder_2,
        "final_smooth": smoothing,
    }

    differential_df = pd.DataFrame()

    # Loop over distinct half cycles
    for cycle in df["half cycle"].unique():
        # Extract all segments corresponding to this half cycle index
        df_cycle = df[df["half cycle"] == cycle]

        # Compute the desired derivative
        try:
            x, yp, y = ec.dqdv_single_cycle(
                df_cycle[y_label], df_cycle[x_label], **smoothing_parameters
            )
        except TypeError as e:
            LOGGER.debug(
                f"""Calculating derivative {mode} of half_cycle {cycle} failed with the following error (likely it is a rest or voltage hold):
                 {e}
                Skipping derivative calculation for this half cycle."""
            )
            continue

        # Set up an array per cycle segment that stores the cycle and half-cycle index
        cycle_index = df_cycle["full cycle"].max()
        cycle_index_array = np.full(len(x), int(cycle_index), dtype=int)
        half_cycle_index_array = np.full(len(x), int(cycle), dtype=int)

        differential_df = pd.concat(
            [
                differential_df,
                pd.DataFrame(
                    {
                        x_label: x,
                        y_label: y,
                        yp_label: yp,
                        "full cycle": cycle_index_array,
                        "half cycle": half_cycle_index_array,
                    }
                ),
            ]
        )

    return differential_df
filter_df_by_cycle_index(df: DataFrame, cycle_list: Optional[List[int]] = None) -> DataFrame

Filters the input dataframe by the chosen rows in the cycle_list. If half_cycle is a column in the df, it will be used for filtering, otherwise cycle index will be used.

Parameters:

Name Type Description Default
df DataFrame

The input dataframe to filter. Must have the column "half cycle".

required
cycle_list Optional[List[int]]

The provided list of cycle indices to keep.

None

Returns:

Type Description
DataFrame

A dataframe with all the data for the selected cycles.

Source code in pydatalab/blocks/echem_block.py
def filter_df_by_cycle_index(
    df: pd.DataFrame, cycle_list: Optional[List[int]] = None
) -> pd.DataFrame:
    """Filters the input dataframe by the chosen rows in the `cycle_list`.
    If `half_cycle` is a column in the df, it will be used for filtering,
    otherwise `cycle index` will be used.

    Args:
        df: The input dataframe to filter. Must have the column "half cycle".
        cycle_list: The provided list of cycle indices to keep.

    Returns:
        A dataframe with all the data for the selected cycles.

    """
    if cycle_list is None:
        return df

    if "half cycle" not in df.columns:
        if "cycle index" not in df.columns:
            raise ValueError(
                "Input dataframe must have either 'half cycle' or 'cycle index' column"
            )
        return df[df["cycle index"].isin(i for i in cycle_list)]

    try:
        half_cycles = [i for item in cycle_list for i in [(2 * int(item)) - 1, 2 * int(item)]]
    except ValueError as exc:
        raise ValueError(
            f"Unable to parse `cycle_list` as integers: {cycle_list}. Error: {exc}"
        ) from exc
    return df[df["half cycle"].isin(half_cycles)]

bokeh_plots

Attributes
COLORS
FONTSIZE
SELECTABLE_CALLBACK_x
SELECTABLE_CALLBACK_y
TOOLS
TYPEFACE
grid_style
grid_theme
mytheme
style

Additional style suitable for grid plots

Functions
selectable_axes_plot(df: Union[Dict[str, pandas.core.frame.DataFrame], List[pandas.core.frame.DataFrame], pandas.core.frame.DataFrame], x_options: List[str], y_options: List[str], color_options: Optional[List[str]] = None, color_mapper: Optional[bokeh.models.mappers.ColorMapper] = None, x_default: Optional[str] = None, y_default: Union[str, List[str]] = None, label_x: bool = True, label_y: bool = True, plot_points: bool = True, point_size: int = 4, plot_line: bool = True, plot_title: Optional[str] = None, plot_index: Optional[int] = None, tools: Optional[List] = None, **kwargs)

Creates bokeh layout with selectable axis.

Parameters:

Name Type Description Default
df Union[Dict[str, pandas.core.frame.DataFrame], List[pandas.core.frame.DataFrame], pandas.core.frame.DataFrame]

Dataframe, or list/dict of dataframes from data block.

required
x_options List[str]

Selectable fields to use for the x-values

required
y_options List[str]

Selectable fields to use for the y-values

required
color_options Optional[List[str]]

Selectable fields to colour lines/points by.

None
color_mapper Optional[bokeh.models.mappers.ColorMapper]

Optional colour mapper to pass to switch between log and linear scales.

None
x_default Optional[str]

Default x-axis that is plotted at start, defaults to first value of x_options

None
y_default Union[str, List[str]]

Default y-axis that is plotted at start, defaults to first value of y_options. If provided a list, the first entry will be plotted as solid line, and all others will be transparent lines.

None
plot_points bool

Whether to use plot markers.

True
point_size int

The size of markers, if enabled.

4
plot_line bool

Whether to draw a line between points.

True
plot_title Optional[str]

Global plot title to give to the figure.

None
plot_index Optional[int]

If part of a larger number of plots, use this index for e.g., choosing the correct value in the colour cycle.

None
tools Optional[List]

A list of Bokeh tools to enable.

None

Returns:

Type Description

Bokeh layout

Source code in pydatalab/bokeh_plots.py
def selectable_axes_plot(
    df: Union[Dict[str, pd.DataFrame], List[pd.DataFrame], pd.DataFrame],
    x_options: List[str],
    y_options: List[str],
    color_options: Optional[List[str]] = None,
    color_mapper: Optional[ColorMapper] = None,
    x_default: Optional[str] = None,
    y_default: Optional[Union[str, List[str]]] = None,
    label_x: bool = True,
    label_y: bool = True,
    plot_points: bool = True,
    point_size: int = 4,
    plot_line: bool = True,
    plot_title: Optional[str] = None,
    plot_index: Optional[int] = None,
    tools: Optional[List] = None,
    **kwargs,
):
    """
    Creates bokeh layout with selectable axis.

    Args:
        df: Dataframe, or list/dict of dataframes from data block.
        x_options: Selectable fields to use for the x-values
        y_options: Selectable fields to use for the y-values
        color_options: Selectable fields to colour lines/points by.
        color_mapper: Optional colour mapper to pass to switch between log and linear scales.
        x_default: Default x-axis that is plotted at start, defaults to first value of `x_options`
        y_default: Default y-axis that is plotted at start, defaults to first value of `y_options`.
            If provided a list, the first entry will be plotted as solid line, and all others will
            be transparent lines.
        plot_points: Whether to use plot markers.
        point_size: The size of markers, if enabled.
        plot_line: Whether to draw a line between points.
        plot_title: Global plot title to give to the figure.
        plot_index: If part of a larger number of plots, use this index for e.g., choosing the correct
            value in the colour cycle.
        tools: A list of Bokeh tools to enable.

    Returns:
        Bokeh layout
    """
    if not x_default:
        x_default = x_options[0]
    if not y_default:
        y_default = y_options[0]

    if isinstance(y_default, list):
        y_label = y_options[0]
    else:
        y_label = y_default

    x_axis_label = x_default if label_x else ""
    y_axis_label = y_label if label_y else ""

    p = figure(
        sizing_mode="scale_width",
        aspect_ratio=kwargs.pop("aspect_ratio", 1.5),
        x_axis_label=x_axis_label,
        y_axis_label=y_axis_label,
        tools=TOOLS,
        title=plot_title,
        **kwargs,
    )

    if tools:
        p.add_tools(tools)

    if isinstance(df, pd.DataFrame):
        df = [df]

    callbacks_x = []
    callbacks_y = []

    if color_options:
        if color_mapper is None:
            color_mapper = LinearColorMapper(palette="Cividis256")

    hatch_patterns = [None, ".", "/", "x"]

    labels = []

    if isinstance(df, dict):
        labels = list(df.keys())

    for ind, df_ in enumerate(df):

        if isinstance(df, dict):
            df_ = df[df_]

        if labels:
            label = labels[ind]
        else:
            label = df_.index.name if len(df) > 1 else ""

        source = ColumnDataSource(df_)

        if color_options:
            color = {"field": color_options[0], "transform": color_mapper}
            line_color = "black"
            fill_color = None
            if hatch_patterns[ind % len(hatch_patterns)] is None:
                fill_color = color
        elif plot_index is not None:
            color = COLORS[plot_index % len(COLORS)]
            line_color = COLORS[plot_index % len(COLORS)]
            fill_color = COLORS[plot_index % len(COLORS)]
        else:
            color = COLORS[ind % len(COLORS)]
            line_color = COLORS[ind % len(COLORS)]
            fill_color = COLORS[ind % len(COLORS)]

        # If y_default is a list, plot the first one as a solid line, and the rest as transparent "auxiliary" lines
        y_aux = None
        if isinstance(y_default, list):
            if len(y_default) > 1:
                y_aux = y_default[1:]
            y_default = y_default[0]

        circles = (
            p.circle(
                x=x_default,
                y=y_default,
                source=source,
                size=point_size,
                line_color=color,
                fill_color=fill_color,
                legend_label=label,
                hatch_pattern=hatch_patterns[ind % len(hatch_patterns)],
                hatch_color=color,
            )
            if plot_points
            else None
        )

        lines = (
            p.line(x=x_default, y=y_default, source=source, color=line_color, legend_label=label)
            if plot_line
            else None
        )

        if y_aux:
            for y in y_aux:
                aux_lines = (  # noqa
                    p.line(
                        x=x_default,
                        y=y,
                        source=source,
                        color=color,
                        legend_label=label,
                        alpha=0.3,
                    )
                    if plot_line
                    else None
                )

        callbacks_x.append(
            CustomJS(
                args=dict(circle1=circles, line1=lines, source=source, xaxis=p.xaxis[0]),
                code=SELECTABLE_CALLBACK_x,
            )
        )
        callbacks_y.append(
            CustomJS(
                args=dict(circle1=circles, line1=lines, source=source, yaxis=p.yaxis[0]),
                code=SELECTABLE_CALLBACK_y,
            )
        )

    if color_mapper and color_options:
        color_bar = ColorBar(color_mapper=color_mapper, title=color_options[0])  # type: ignore
        p.add_layout(color_bar, "right")

    # Add list boxes for selecting which columns to plot on the x and y axis
    xaxis_select = Select(title="X axis:", value=x_default, options=x_options)
    xaxis_select.js_on_change("value", *callbacks_x)

    yaxis_select = Select(title="Y axis:", value=y_default, options=y_options)
    yaxis_select.js_on_change("value", *callbacks_y)

    p.legend.click_policy = "hide"
    if len(df) <= 1:
        p.legend.visible = False

    plot_columns = [p]
    if len(x_options) > 1:
        plot_columns.append(xaxis_select)
    if len(y_options) > 1:
        plot_columns.append(yaxis_select)

    layout = column(*plot_columns)

    p.js_on_event(DoubleTap, CustomJS(args=dict(p=p), code="p.reset.emit()"))
    return layout
double_axes_echem_plot(df: DataFrame, mode: Optional[str] = None, cycle_summary: DataFrame = None, x_options: Sequence[str] = [], pick_peaks: bool = True, normalized: bool = False, **kwargs) -> <function gridplot at 0x7fd6f5c59040>

Creates a Bokeh plot for electrochemistry data.

Parameters:

Name Type Description Default
df DataFrame

The pre-processed dataframe containing capacities and voltages, indexed by half cycle.

required
mode Optional[str]

Either "dQ/dV", "dV/dQ", "normal" or None.

None
x_options Sequence[str]

Columns from df that can be selected for the first plot. The first will be used as the default.

[]
pick_peaks bool

Whether or not to pick and plot the peaks in dV/dQ mode.

True

Returns: The Bokeh layout.

Source code in pydatalab/bokeh_plots.py
def double_axes_echem_plot(
    df: pd.DataFrame,
    mode: Optional[str] = None,
    cycle_summary: pd.DataFrame = None,
    x_options: Sequence[str] = [],
    pick_peaks: bool = True,
    normalized: bool = False,
    **kwargs,
) -> gridplot:
    """Creates a Bokeh plot for electrochemistry data.

    Args:
        df: The pre-processed dataframe containing capacities and
            voltages, indexed by half cycle.
        mode: Either "dQ/dV", "dV/dQ", "normal" or None.
        x_options: Columns from `df` that can be selected for the
            first plot. The first will be used as the default.
        pick_peaks: Whether or not to pick and plot the peaks in dV/dQ mode.

    Returns: The Bokeh layout.
    """

    if not x_options:
        x_options = (
            ["capacity (mAh/g)", "voltage (V)", "time (s)", "current (mA/g)"]
            if normalized
            else ["capacity (mAh)", "voltage (V)", "time (s)", "current (mA)"]
        )

    x_options = [opt for opt in x_options if opt in df.columns]

    common_options = {"aspect_ratio": 1.5, "tools": TOOLS}
    common_options.update(**kwargs)

    if mode == "normal":
        mode = None

    modes = ("dQ/dV", "dV/dQ", "final capacity", None)
    if mode not in modes:
        raise RuntimeError(f"Mode must be one of {modes} not {mode}.")

    x_default = x_options[0]
    y_default = x_options[1]

    x_options = list(x_options)

    cmap = plt.get_cmap("inferno")

    plots = []
    # normal plot
    # x_label = "Capacity (mAh/g)" if x_default == "Capacity normalized" else x_default
    x_label = x_default
    p1 = figure(x_axis_label=x_label, y_axis_label="voltage (V)", **common_options)
    plots.append(p1)

    # the differential plot
    if mode in ("dQ/dV", "dV/dQ"):
        if mode == "dQ/dV":
            p2 = figure(
                x_axis_label=mode,
                y_axis_label="voltage (V)",
                y_range=p1.y_range,
                **common_options,
            )
        else:
            p2 = figure(
                x_axis_label=x_default, y_axis_label=mode, x_range=p1.x_range, **common_options
            )
        plots.append(p2)

    elif mode == "final capacity" and cycle_summary is not None:
        palette = Accent[3]

        p3 = figure(
            x_axis_label="Cycle number",
            y_axis_label="capacity (mAh/g)" if normalized else "capacity (mAh)",
            **common_options,
        )

        p3.line(
            x="full cycle",
            y="charge capacity (mAh/g)" if normalized else "charge capacity (mAh)",
            source=cycle_summary,
            legend_label="charge",
            line_width=2,
            color=palette[0],
        )
        p3.circle(
            x="full cycle",
            y="charge capacity (mAh/g)" if normalized else "charge capacity (mAh)",
            source=cycle_summary,
            fill_color="white",
            hatch_color=palette[0],
            legend_label="charge",
            line_width=2,
            size=12,
            color=palette[0],
        )
        p3.line(
            x="full cycle",
            y="discharge capacity (mAh/g)" if normalized else "discharge capacity (mAh)",
            source=cycle_summary,
            legend_label="discharge",
            line_width=2,
            color=palette[2],
        )
        p3.triangle(
            x="full cycle",
            y="discharge capacity (mAh/g)" if normalized else "discharge capacity (mAh)",
            source=cycle_summary,
            fill_color="white",
            hatch_color=palette[2],
            line_width=2,
            legend_label="discharge",
            size=12,
            color=palette[2],
        )

        p3.legend.location = "right"
        p3.y_range.start = 0

    lines = []
    grouped_by_half_cycle = df.groupby("half cycle")

    for ind, plot in enumerate(plots):
        x = x_default
        y = "voltage (V)"
        if ind == 1:
            if mode == "dQ/dV":
                x = "dQ/dV (mA/V)"
            else:
                y = "dV/dQ (V/mA)"

        # trim the end of the colour cycle for visibility on a white background
        color_space = np.linspace(0.3, 0.7, int(df["half cycle"].max()))  # type: ignore

        for _, group in grouped_by_half_cycle:

            line = plot.line(
                x=x,
                y=y,
                source=group,
                line_color=matplotlib.colors.rgb2hex(
                    cmap(color_space[int(group["half cycle"].max()) - 1])
                ),
                hover_line_width=2,
                selection_line_width=2,
                selection_line_color="black",
            )
            if mode == "dV/dQ" and ind == 1 and pick_peaks:
                # Check if half cycle or not
                dvdq_array = np.array(group[y])
                if group[y].mean() < 0:
                    dvdq_array *= -1

                peaks, _ = find_peaks(dvdq_array, prominence=5)
                peak_locs = group.iloc[peaks]
                p2.circle(x=x, y=y, source=peak_locs)

            if ind == 0:
                lines.append(line)

    # Only add the selectable axis to dQ/dV mode
    if mode in ("dQ/dV", None):
        callback_x = CustomJS(
            args=dict(lines=lines, xaxis=p1.xaxis[0]),
            code="""
                var column = cb_obj.value;
                console.log(column)
                for (let line of lines) {
                    line.glyph.x = { field: column };
                }
                xaxis.axis_label = column;
            """,
        )

        xaxis_select = Select(title="X axis:", value=x_default, options=x_options)
        xaxis_select.js_on_change("value", callback_x)

    if mode is None:
        callback_y = CustomJS(
            args=dict(lines=lines, yaxis=p1.yaxis[0]),
            code="""
                var column = cb_obj.value;
                console.log(column)
                for (let line of lines) {
                    line.glyph.y = { field: column };
                }
                yaxis.axis_label = column;
            """,
        )

        yaxis_select = Select(title="Y axis:", value=y_default, options=x_options)
        yaxis_select.js_on_change("value", callback_y)

    hovertooltips = [("Cycle No.", "@{full cycle}"), ("Half-cycle", "@{half cycle}")]

    if mode:
        crosshair = CrosshairTool(dimensions="width" if mode == "dQ/dV" else "height")
    for p in plots:
        if len(lines) < 100:
            p.add_tools(HoverTool(tooltips=hovertooltips))
        if mode:
            p.add_tools(crosshair)
        p.js_on_event(DoubleTap, CustomJS(args=dict(p=p), code="p.reset.emit()"))

    if mode == "dQ/dV":
        grid = [[p1, p2], [xaxis_select]]
    elif mode == "dV/dQ":
        grid = [[p1], [p2]]
    elif mode == "final capacity":
        grid = [[p3]]
    else:
        grid = [[p1], [xaxis_select], [yaxis_select]]

    return gridplot(grid, sizing_mode="scale_width", toolbar_location="below")

config

CONFIG
DeploymentMetadata (BaseModel) pydantic-model
Source code in pydatalab/config.py
class DeploymentMetadata(BaseModel):

    maintainer: Optional[Person]
    issue_tracker: Optional[AnyUrl]
    homepage: Optional[AnyUrl]
    source_repository: Optional[AnyUrl]

    @validator("maintainer")
    def strip_fields_from_person(cls, v):
        if not v.contact_email:
            raise ValueError("Must provide contact email for maintainer.")

        return Person(contact_email=v.contact_email, display_name=v.display_name)

    class Config:
        extra = "allow"
maintainer: Person pydantic-field
issue_tracker: AnyUrl pydantic-field
homepage: AnyUrl pydantic-field
source_repository: AnyUrl pydantic-field
strip_fields_from_person(v) classmethod
Source code in pydatalab/config.py
@validator("maintainer")
def strip_fields_from_person(cls, v):
    if not v.contact_email:
        raise ValueError("Must provide contact email for maintainer.")

    return Person(contact_email=v.contact_email, display_name=v.display_name)
Functions
config_file_settings(settings: BaseSettings) -> Dict[str, Any]

Returns a dictionary of server settings loaded from the default or specified JSON config file location (via the env var PYDATALAB_CONFIG_FILE).

Source code in pydatalab/config.py
def config_file_settings(settings: BaseSettings) -> Dict[str, Any]:
    """Returns a dictionary of server settings loaded from the default or specified
    JSON config file location (via the env var `PYDATALAB_CONFIG_FILE`).

    """
    config_file = Path(os.getenv("PYDATALAB_CONFIG_FILE", "/app/config.json"))

    res = {}
    if config_file.is_file():
        logging.debug("Loading from config file at %s", config_file)
        config_file_content = config_file.read_text(encoding=settings.__config__.env_file_encoding)

        try:
            res = json.loads(config_file_content)
        except json.JSONDecodeError as json_exc:
            raise RuntimeError(f"Unable to read JSON config file {config_file}") from json_exc

    else:
        logging.debug("Unable to load from config file at %s", config_file)
        res = {}

    return res

errors

ERROR_HANDLERS: Iterable[Tuple[Any, Callable[[Any], Tuple[flask.wrappers.Response, int]]]]
Classes
UserRegistrationForbidden (Forbidden)

Raised when a user tries to register via OAuth without the appropriate properties/credentials, e.g., public membership of a GitHub organization that is on the allow list.

Source code in pydatalab/errors.py
class UserRegistrationForbidden(Forbidden):
    """Raised when a user tries to register via OAuth without the appropriate
    properties/credentials, e.g., public membership of a GitHub organization
    that is on the allow list.
    """

    description: str = """<html><head></head>
    <body>
    <h1>403 Forbidden</h1>

<h2>Unable to create account</h2>

<p>No user data will be stored as a result of this interaction, but you may wish to clear your cookies for this site.</p>

<p>
The OAuth identity used for registration is not on the allow list.
If you believe this to be an error, please first verify that your membership of the allowed
group (e.g., a GitHub organization) is public, and verify with the deployment manager that
the organization is indeed on the allow list.
</p>

<p>If this was not an error, you may wish to revoke any permissions given to the datalab OAuth application.</p>
</body>
</html>
"""
description: str
Functions
handle_http_exception(exc: HTTPException) -> Tuple[flask.wrappers.Response, int]

Return a specific error message and status code if the exception stores them.

Source code in pydatalab/errors.py
def handle_http_exception(exc: HTTPException) -> Tuple[Response, int]:
    """Return a specific error message and status code if the exception stores them."""
    response = {
        "title": exc.__class__.__name__,
        "description": exc.description,
    }
    status_code = exc.code if exc.code else 400

    return jsonify(response), status_code
render_unauthorised_user_template(exc: UserRegistrationForbidden) -> Tuple[flask.wrappers.Response, int]

Return a rich HTML page on user account creation failure.

Source code in pydatalab/errors.py
def render_unauthorised_user_template(exc: UserRegistrationForbidden) -> Tuple[Response, int]:
    """Return a rich HTML page on user account creation failure."""
    return Response(response=exc.description), exc.code
handle_pydantic_validation_error(exc: ValidationError) -> Tuple[flask.wrappers.Response, int]

Handle pydantic validation errors separately from other exceptions. These always come from malformed data, so should not necessarily trigger the Flask debugger.

Source code in pydatalab/errors.py
def handle_pydantic_validation_error(exc: ValidationError) -> Tuple[Response, int]:
    """Handle pydantic validation errors separately from other exceptions.
    These always come from malformed data, so should not necessarily trigger the
    Flask debugger.
    """
    response = {
        "title": exc.__class__.__name__,
        "message": str(exc.args[:]) if exc.args else "",
    }
    return jsonify(response), 500
handle_generic_exception(exc: Exception) -> Tuple[flask.wrappers.Response, int]

Return a specific error message and status code if the exception stores them.

Source code in pydatalab/errors.py
def handle_generic_exception(exc: Exception) -> Tuple[Response, int]:
    """Return a specific error message and status code if the exception stores them."""
    if os.environ.get("FLASK_ENV") == "development":
        raise exc

    response = {
        "title": exc.__class__.__name__,
        "message": str(exc.args) if exc.args else "",
    }
    return jsonify(response), 500

file_utils

DIRECTORIES_DICT
FILE_DIRECTORY
LIVE_FILE_CUTOFF
Functions
get_file_info_by_id(file_id: Union[str, bson.objectid.ObjectId], update_if_live: bool = True) -> Dict[str, Any]

Query the files collection for the given ID.

If the update_if_live and the file has been updated on the remote since it was added to the database, then the new version will be copied into the local filestore.

Parameters:

Name Type Description Default
file_id Union[str, bson.objectid.ObjectId]

Either the string or ObjectID representatoin of the file ID.

required
update_if_live bool

Whether or not to update the stored file to a newer version, if it exists.

True

Exceptions:

Type Description
IOError

If the given file ID does not exist in the database.

Returns:

Type Description
Dict[str, Any]

The stored file information as a dictonary. Will be empty if the corresponding file does not exist on disk.

Source code in pydatalab/file_utils.py
@logged_route
def get_file_info_by_id(
    file_id: Union[str, ObjectId], update_if_live: bool = True
) -> Dict[str, Any]:
    """Query the files collection for the given ID.

    If the `update_if_live` and the file has been updated on the
    remote since it was added to the database, then the new version
    will be copied into the local filestore.

    Arguments:
        file_id: Either the string or ObjectID representatoin of the file ID.
        update_if_live: Whether or not to update the stored file to a
            newer version, if it exists.

    Raises:
        IOError: If the given file ID does not exist in the database.

    Returns:
        The stored file information as a dictonary. Will be empty if the
            corresponding file does not exist on disk.

    """
    LOGGER.debug("getting file for file_id: %s", file_id)
    file_collection = get_database().files
    file_id = ObjectId(file_id)
    file_info = file_collection.find_one({"_id": file_id})
    if not file_info:
        raise IOError(f"could not find file with id: {file_id} in db")

    file_info = File(**file_info)

    if update_if_live and file_info.is_live:
        file_info = _check_and_sync_file(file_info, file_id)

    return file_info.dict()
update_uploaded_file(file, file_id, last_modified = None, size_bytes = None)

file is a file object from a flask request. last_modified should be an isodate format. if None, the current time will be inserted By default, only changes the last_modified, and size_bytes, increments version, and verifies source=remote and is_live=false. (converts ) additional_updates can be used to pass other fields to change in (NOT IMPLEMENTED YET)

Source code in pydatalab/file_utils.py
@logged_route
def update_uploaded_file(file, file_id, last_modified=None, size_bytes=None):
    """file is a file object from a flask request.
    last_modified should be an isodate format. if None, the current time will be inserted
    By default, only changes the last_modified, and size_bytes, increments version, and verifies source=remote and is_live=false. (converts )
    additional_updates can be used to pass other fields to change in (NOT IMPLEMENTED YET)"""

    last_modified = datetime.datetime.now().isoformat()
    file_collection = get_database().files

    updated_file_entry = file_collection.find_one_and_update(
        {"_id": file_id},  # Note, needs to be ObjectID()
        {
            "$set": {
                "last_modified": last_modified,
                "size": size_bytes,
                "source": "remote",
                "is_live": False,
            },
            "$inc": {"revision": 1},
        },
        return_document=ReturnDocument.AFTER,
    )

    if not updated_file_entry:
        raise IOError(f"Issue with db update uploaded file {file.name} id {file_id}")

    updated_file_entry = File(**updated_file_entry)

    # overwrite the old file with the new location
    file.save(updated_file_entry["location"])

    ret = updated_file_entry.dict()
    ret.update({"_id": file_id})
    return ret
save_uploaded_file(file, item_ids = None, block_ids = None, last_modified = None, size_bytes = None)

file is a file object from a flask request. last_modified should be an isodate format. if last_modified is None, the current time will be inserted

Source code in pydatalab/file_utils.py
@logged_route
def save_uploaded_file(file, item_ids=None, block_ids=None, last_modified=None, size_bytes=None):
    """file is a file object from a flask request.
    last_modified should be an isodate format. if last_modified is None, the current time will be inserted"""

    from pydatalab.routes.utils import get_default_permissions

    sample_collection = get_database().items
    file_collection = get_database().files

    # validate item_ids
    if not item_ids:
        item_ids = []
    if not block_ids:
        block_ids = []

    for item_id in item_ids:
        if not sample_collection.find_one(
            {"item_id": item_id, **get_default_permissions(user_only=True)}
        ):
            raise ValueError(f"item_id is invalid: {item_id}")

    filename = secure_filename(file.filename)
    extension = os.path.splitext(filename)[1]

    if not last_modified:
        last_modified = datetime.datetime.now().isoformat()

    new_file_document = File(
        name=filename,
        original_name=file.filename,  # not escaped
        location=None,  # file storage location in datalab. Important! will be filled in below
        url_path=None,  # the url used to access this file. Important! will be filled in below
        extension=extension,
        source="uploaded",
        size=size_bytes,
        item_ids=item_ids,
        blocks=block_ids,
        last_modified=last_modified,
        time_added=last_modified,
        metadata={},
        representation=None,
        source_server_name=None,  # not used for source=uploaded
        source_path=None,  # not used for source=uploaded
        last_modified_remote=None,  # not used for source=uploaded
        is_live=False,  # not available for source=uploaded
        revision=1,  # increment with each update
    )

    result = file_collection.insert_one(new_file_document.dict())
    if not result.acknowledged:
        raise IOError(f"db operation failed when trying to insert new file. Result: {result}")

    inserted_id = result.inserted_id

    new_directory = os.path.join(FILE_DIRECTORY, str(inserted_id))
    file_location = os.path.join(new_directory, filename)
    os.makedirs(new_directory)
    file.save(file_location)

    updated_file_entry = file_collection.find_one_and_update(
        {"_id": inserted_id},
        {
            "$set": {
                "location": file_location,
                "size": os.path.getsize(file_location),
            }
        },
        return_document=ReturnDocument.AFTER,
    )

    updated_file_entry = File(**updated_file_entry)

    # update any referenced item_ids
    for item_id in item_ids:
        sample_update_result = sample_collection.update_one(
            {"item_id": item_id, **get_default_permissions(user_only=True)},
            {"$push": {"file_ObjectIds": inserted_id}},
        )
        if sample_update_result.modified_count != 1:
            raise IOError(
                f"db operation failed when trying to insert new file ObjectId into sample: {item_id}"
            )

    ret = updated_file_entry.dict()
    ret.update({"_id": inserted_id})
    return ret
add_file_from_remote_directory(file_entry, item_id, block_ids = None)
Source code in pydatalab/file_utils.py
def add_file_from_remote_directory(file_entry, item_id, block_ids=None):
    from pydatalab.routes.utils import get_default_permissions

    file_collection = get_database().files
    sample_collection = get_database().items

    if not block_ids:
        block_ids = []
    filename = secure_filename(file_entry["name"])
    extension = os.path.splitext(filename)[1]

    # generate the remote url
    host = DIRECTORIES_DICT[file_entry["toplevel_name"]]

    remote_path = os.path.join(file_entry["relative_path"].lstrip("/"), file_entry["name"])

    # If we are dealing with a truly remote host
    if host["hostname"]:
        remote_toplevel_path = f'{host["hostname"]}:{host["path"]}'
        full_remote_path = f"{remote_toplevel_path}/{remote_path}"
        if file_entry.get("time") is None:
            remote_timestamp = None
        else:
            remote_timestamp = datetime.datetime.fromtimestamp(int(file_entry["time"]))

    # Otherwise we assume the file is mounted locally
    else:
        remote_toplevel_path = host["path"]
        full_remote_path = os.path.join(remote_toplevel_path, remote_path)
        # check that the path is valid and get the last modified time from the server
        remote_timestamp = os.path.getmtime(full_remote_path)

    new_file_document = File(
        name=filename,
        original_name=file_entry["name"],  # not escaped
        # file storage location in datalab. Important! will be filled in below
        location=None,
        # the URL used to access this file. Important! will be filled in below
        url_path=None,
        extension=extension,
        source="remote",
        size=file_entry["size"],
        item_ids=[item_id],
        blocks=block_ids,
        # last_modified is the last modified time of the db entry in isoformat. For last modified file timestamp, see last_modified_remote_timestamp
        last_modified=datetime.datetime.now().isoformat(),
        time_added=datetime.datetime.now().isoformat(),
        metadata={},
        representation=None,
        source_server_name=file_entry["toplevel_name"],
        # this is the relative path from the given source_server_name (server directory)
        source_path=remote_path,
        # last modified time as provided from the remote server. May by different than last_modified if the two servers times are not synchrotronized.
        last_modified_remote=remote_timestamp,
        # Whether this file will update (if changes have occured) on access
        is_live=bool(host["hostname"]),
        # incremented with each update
        version=1,
    )

    result = file_collection.insert_one(new_file_document.dict())
    if not result.acknowledged:
        raise IOError(f"db operation failed when trying to insert new file. Result: {result}")

    inserted_id = result.inserted_id

    new_directory = os.path.join(FILE_DIRECTORY, str(inserted_id))
    new_file_location = os.path.join(new_directory, filename)
    os.makedirs(new_directory)
    _sync_file_with_remote(full_remote_path, new_file_location)

    updated_file_entry = file_collection.find_one_and_update(
        {"_id": inserted_id},
        {
            "$set": {
                "location": new_file_location,
                "url_path": new_file_location,
            }
        },
        return_document=ReturnDocument.AFTER,
    )

    sample_update_result = sample_collection.update_one(
        {"item_id": item_id, **get_default_permissions(user_only=True)},
        {"$push": {"file_ObjectIds": inserted_id}},
    )
    if sample_update_result.modified_count != 1:
        raise IOError(
            f"db operation failed when trying to insert new file ObjectId into sample: {item_id}"
        )

    return updated_file_entry
retrieve_file_path(file_ObjectId)
Source code in pydatalab/file_utils.py
def retrieve_file_path(file_ObjectId):
    file_collection = get_database().files
    result = file_collection.find_one({"_id": ObjectId(file_ObjectId)})
    if not result:
        raise FileNotFoundError(
            f"The file with file_ObjectId: {file_ObjectId} could not be found in the database"
        )

    result = File(**result)

    return result.location
remove_file_from_sample(item_id: Union[str, bson.objectid.ObjectId], file_id: Union[str, bson.objectid.ObjectId]) -> None

Detach the file at file_id from the item at item_id.

Parameters:

Name Type Description Default
item_id Union[str, bson.objectid.ObjectId]

The database ID of the item to alter.

required
file_id Union[str, bson.objectid.ObjectId]

The database ID of the file to remove from the item.

required
Source code in pydatalab/file_utils.py
def remove_file_from_sample(item_id: Union[str, ObjectId], file_id: Union[str, ObjectId]) -> None:
    """Detach the file at `file_id` from the item at `item_id`.

    Args:
        item_id: The database ID of the item to alter.
        file_id: The database ID of the file to remove from the item.

    """
    from pydatalab.routes.utils import get_default_permissions

    item_id, file_id = ObjectId(item_id), ObjectId(file_id)
    sample_collection = get_database().items
    file_collection = get_database().files
    sample_result = sample_collection.update_one(
        {"item_id": item_id, **get_default_permissions(user_only=True)},
        {"$pull": {"file_ObjectIds": file_id}},
    )

    if sample_result.modified_count < 1:
        raise IOError(
            f"Failed to remove {file_id!r} from item {item_id!r}. Result: {sample_result.raw_result}"
        )

    file_collection.update_one(
        {"_id": file_id},
        {"$pull": {"item_ids": item_id}},
    )

logger

LOGGER
Classes
AnsiColorHandler (StreamHandler)

Colourful and truncated log handler, exfiltrated from/inspired by various answers at https://stackoverflow.com/questions/7484454/removing-handlers-from-pythons-logging-loggers

Source code in pydatalab/logger.py
class AnsiColorHandler(logging.StreamHandler):
    """Colourful and truncated log handler, exfiltrated from/inspired
    by various answers at
    https://stackoverflow.com/questions/7484454/removing-handlers-from-pythons-logging-loggers

    """

    LOGLEVEL_COLORS = {
        logging.DEBUG: "36m",
        logging.INFO: "32m",
        logging.WARNING: "33m",
        logging.ERROR: "1;91m",
        logging.CRITICAL: "101;30m",
    }

    max_width = 2000

    def __init__(self) -> None:
        super().__init__()
        self.formatter = logging.Formatter("%(asctime)s - %(name)s | %(levelname)-8s: %(message)s")

    def format(self, record: logging.LogRecord) -> str:
        from flask_login import current_user

        prefix = "🔓"
        if current_user and current_user.is_authenticated:
            prefix = "🔒"
        message: str = super().format(record)
        if len(message) > self.max_width:
            message = message[: self.max_width] + "[...]"
        color = self.LOGLEVEL_COLORS[record.levelno]
        message = f"\x1b[{color} {prefix} {message}\x1b[0m"
        return message
LOGLEVEL_COLORS
max_width
Methods
__init__(self) -> None special
Source code in pydatalab/logger.py
def __init__(self) -> None:
    super().__init__()
    self.formatter = logging.Formatter("%(asctime)s - %(name)s | %(levelname)-8s: %(message)s")
format(self, record: LogRecord) -> str

Format the specified record.

If a formatter is set, use it. Otherwise, use the default formatter for the module.

Source code in pydatalab/logger.py
def format(self, record: logging.LogRecord) -> str:
    from flask_login import current_user

    prefix = "🔓"
    if current_user and current_user.is_authenticated:
        prefix = "🔒"
    message: str = super().format(record)
    if len(message) > self.max_width:
        message = message[: self.max_width] + "[...]"
    color = self.LOGLEVEL_COLORS[record.levelno]
    message = f"\x1b[{color} {prefix} {message}\x1b[0m"
    return message
Functions
setup_log(log_name: str = 'pydatalab', log_level: Optional[int] = None) -> Logger

Creates a logger a simple coloured stdout output.

Verbosity can be set to debug in the config file via the DEBUG option, or passed the the function.

Parameters:

Name Type Description Default
log_name str

The name of the logger.

'pydatalab'
log_level Optional[int]

The logging level to use.

None

Returns:

Type Description
Logger

The logger object.

Source code in pydatalab/logger.py
def setup_log(log_name: str = "pydatalab", log_level: Optional[int] = None) -> logging.Logger:
    """Creates a logger a simple coloured stdout output.

    Verbosity can be set to debug in the config file via
    the DEBUG option, or passed the the function.

    Parameters:
        log_name: The name of the logger.
        log_level: The logging level to use.

    Returns:
        The logger object.

    """
    from pydatalab.config import CONFIG

    logger = logging.getLogger(log_name)
    logger.handlers = []
    logger.propagate = False
    handler = AnsiColorHandler()
    logger.addHandler(handler)
    if log_level is None:
        log_level = logging.INFO

        if CONFIG.DEBUG:
            log_level = logging.DEBUG

    logger.setLevel(log_level)
    return logger
logged_route(fn: Callable)

A decorator that enables logging of inputs (arguments and request body) and outputs (server response) when debug mode is enabled.

Parameters:

Name Type Description Default
fn Callable

The function to wrap.

required
Source code in pydatalab/logger.py
def logged_route(fn: Callable):
    """A decorator that enables logging of inputs (arguments
    and request body) and outputs (server response) when debug
    mode is enabled.

    Args:
        fn: The function to wrap.

    """

    @wraps(fn)
    def wrapped_logged_route(*args, **kwargs):
        from flask import request

        start = time.monotonic_ns()
        try:
            LOGGER.debug(
                "Calling %s with request: %s, JSON payload with keys %s",
                fn.__name__,
                request,
                request.get_json().keys() if request.get_json() else "null",
            )
        except Exception:
            pass
        try:
            result = fn(*args, **kwargs)

            LOGGER.debug(
                "%s returned in %s seconds with %s",
                fn.__name__,
                (time.monotonic_ns() - start) / 1e9,
                result,
            )
            return result
        except Exception as exc:
            import traceback

            LOGGER.error(
                "%s errored in %s seconds with %s %s %s",
                fn.__name__,
                (time.monotonic_ns() - start) / 1e9,
                exc.__class__.__name__,
                exc,
                traceback.print_tb(exc.__traceback__),
            )
            raise exc

    return wrapped_logged_route

login

This module implements functionality around the Flask-login manager, for retrieving the authenticated user for a session and their identities.

Attributes
LOGIN_MANAGER: LoginManager

The global login manager for the app.

Classes
LoginUser (UserMixin)

A wrapper class around Person to allow flask-login to track the session of the current user and get their details from the database.

(See https://flask-login.readthedocs.io/en/latest/#your-user-class)

Source code in pydatalab/login.py
class LoginUser(UserMixin):
    """A wrapper class around `Person` to allow flask-login to track
    the session of the current user and get their details
    from the database.

    (See https://flask-login.readthedocs.io/en/latest/#your-user-class)

    """

    id: str
    person: Person
    role: UserRole

    def __init__(
        self,
        _id: str,
        data: Person,
        role: UserRole,
    ):
        """Construct the logged in user from a given ID and user data.

        Parameters:
            _id: The ID of the person in the database.
            data: The relevant metadata for this user, e.g., their identities, contact
                details, for use by the app.

        """
        self.id = _id
        self.person = data
        self.role = role

    @property
    def display_name(self) -> Optional[str]:
        """Returns the top-level display name for the user, if set."""
        return self.person.display_name

    @property
    def identities(self) -> List[Identity]:
        """Returns the list of identities of the user."""
        return self.person.identities

    @property
    def identity_types(self) -> List[IdentityType]:
        """Returns a list of the identity types associated with the user."""
        return [_.identity_type for _ in self.person.identities]

    def refresh(self) -> None:
        """Reconstruct the user object from their database entry, to be used when,
        e.g., a new identity has been associated with them.
        """
        user = get_by_id(self.id)
        if user:
            self.person = user.person
            self.role = user.role
Attributes
display_name: Optional[str] property readonly

Returns the top-level display name for the user, if set.

identities: List[pydatalab.models.people.Identity] property readonly

Returns the list of identities of the user.

identity_types: List[pydatalab.models.people.IdentityType] property readonly

Returns a list of the identity types associated with the user.

Methods
__init__(self, _id: str, data: Person, role: UserRole) special

Construct the logged in user from a given ID and user data.

Parameters:

Name Type Description Default
_id str

The ID of the person in the database.

required
data Person

The relevant metadata for this user, e.g., their identities, contact details, for use by the app.

required
Source code in pydatalab/login.py
def __init__(
    self,
    _id: str,
    data: Person,
    role: UserRole,
):
    """Construct the logged in user from a given ID and user data.

    Parameters:
        _id: The ID of the person in the database.
        data: The relevant metadata for this user, e.g., their identities, contact
            details, for use by the app.

    """
    self.id = _id
    self.person = data
    self.role = role
refresh(self) -> None

Reconstruct the user object from their database entry, to be used when, e.g., a new identity has been associated with them.

Source code in pydatalab/login.py
def refresh(self) -> None:
    """Reconstruct the user object from their database entry, to be used when,
    e.g., a new identity has been associated with them.
    """
    user = get_by_id(self.id)
    if user:
        self.person = user.person
        self.role = user.role
Functions
get_by_id_cached(user_id)

Cached version of get_by_id.

Source code in pydatalab/login.py
@lru_cache(maxsize=128)
def get_by_id_cached(user_id):
    """Cached version of get_by_id."""
    return get_by_id(user_id)
get_by_id(user_id: str) -> Optional[pydatalab.login.LoginUser]

Lookup the user database ID and create a new LoginUser with the relevant metadata.

Parameters:

Name Type Description Default
user_id str

The user's ID in the database, either as a string, an ObjectID, or a JSON {'$oid': <id>} dictionary.

required

Exceptions:

Type Description
ValueError

if the user could not be found.

Source code in pydatalab/login.py
def get_by_id(user_id: str) -> Optional[LoginUser]:
    """Lookup the user database ID and create a new `LoginUser`
    with the relevant metadata.

    Parameters:
        user_id: The user's ID in the database, either as a string,
            an ObjectID, or a JSON `{'$oid': <id>}` dictionary.

    Raises:
        ValueError: if the user could not be found.

    """

    user = flask_mongo.db.users.find_one({"_id": ObjectId(user_id)})
    if not user:
        return None

    role = flask_mongo.db.roles.find_one({"_id": ObjectId(user_id)})
    if not role:
        role = "user"
    else:
        role = role["role"]

    return LoginUser(_id=user_id, data=Person(**user), role=UserRole(role))
get_by_api_key(key: str)

Checks if the hashed version of the key is in the keys collection, if so, return the authenticated user.

Source code in pydatalab/login.py
def get_by_api_key(key: str):
    """Checks if the hashed version of the key is in the keys collection,
    if so, return the authenticated user.

    """

    hash = sha512(key.encode("utf-8")).hexdigest()
    user = flask_mongo.db.api_keys.find_one({"hash": hash}, projection={"hash": 0})
    if user:
        return get_by_id_cached(str(user["_id"]))
load_user(user_id: str) -> Optional[pydatalab.login.LoginUser]

Looks up the currently authenticated user and returns a LoginUser model.

Source code in pydatalab/login.py
@LOGIN_MANAGER.user_loader
def load_user(user_id: str) -> Optional[LoginUser]:
    """Looks up the currently authenticated user and returns a `LoginUser` model."""
    return get_by_id_cached(str(user_id))
request_loader(request) -> Optional[pydatalab.login.LoginUser]
Source code in pydatalab/login.py
@LOGIN_MANAGER.request_loader
def request_loader(request) -> Optional[LoginUser]:

    api_key = request.headers.get("DATALAB-API-KEY", None)
    if api_key:
        return get_by_api_key(str(api_key))
    return None

main

compress
Functions
create_app(config_override: Dict[str, Any] = None) -> Flask

Create the main Flask app with the given config.

Parameters:

Name Type Description Default
config_override Dict[str, Any]

Config value overrides to use within the Flask app.

None

Returns:

Type Description
Flask

The Flask app with all associated endpoints.

Source code in pydatalab/main.py
def create_app(config_override: Dict[str, Any] = None) -> Flask:
    """Create the main `Flask` app with the given config.

    Parameters:
        config_override: Config value overrides to use
            within the `Flask` app.

    Returns:
        The `Flask` app with all associated endpoints.

    """
    setup_log("werkzeug", log_level=logging.INFO)
    setup_log("", log_level=logging.INFO)

    app = Flask(__name__, instance_relative_config=True)

    if config_override:
        CONFIG.update(config_override)

    app.config.update(CONFIG.dict())
    app.config.update(dotenv_values())

    LOGGER.info("Starting app with Flask app.config: %s", app.config)
    LOGGER.info("Datalab config: %s", CONFIG.dict())

    if CONFIG.BEHIND_REVERSE_PROXY:
        # Fix headers for reverse proxied app:
        # https://flask.palletsprojects.com/en/2.2.x/deploying/proxy_fix/
        app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)  # type: ignore

    CORS(
        app,
        resources={r"/*": {"origins": "*"}},
        supports_credentials=True,
    )

    app.json_encoder = CustomJSONEncoder

    # Must use the full path so that this object can be mocked for testing
    flask_mongo = pydatalab.mongo.flask_mongo
    flask_mongo.init_app(app, connectTimeoutMS=100, serverSelectionTimeoutMS=100)

    register_endpoints(app)

    LOGIN_MANAGER.init_app(app)

    pydatalab.mongo.create_default_indices()

    compress.init_app(app)

    @app.route("/logout")
    def logout():
        """Logs out the local user from the current session."""
        logout_user()
        return redirect(request.environ.get("HTTP_REFERER", "/"))

    @app.before_first_request  # runs before FIRST request (only once)
    def make_session_permanent():
        """Make the session permanent so that it doesn't expire on browser close, but instead adds a lifetime."""
        session.permanent = True
        app.permanent_session_lifetime = datetime.timedelta(days=1)

    @app.route("/")
    def index():
        """Landing page endpoint that renders a rudimentary welcome page based on the currently
        authenticated user.

        Warning:
            Does not use a Jinja template, so care must be taken in validating
            the embedded inputs.

        """
        from pydatalab.routes import (  # pylint: disable=import-outside-toplevel
            ENDPOINTS,
            auth,
        )

        OAUTH_PROXIES = auth.OAUTH_PROXIES

        connected = True
        try:
            pydatalab.mongo.check_mongo_connection()
        except RuntimeError:
            connected = False

        if connected:
            database_string = (
                '<p style="color: DarkSeaGreen">✅ Connected to underlying database</p>'
            )
        else:
            database_string = (
                '<p style="color: FireBrick">❎ Unable to connect to underlying database</p>'
            )

        if connected:
            if current_user.is_authenticated:
                welcome_string = f"""
                    <h2>Hello, {current_user.display_name}!</h2>
                    <h3>Connected identities:</h3>
                    <ul>
                """

                for identity in current_user.identities:
                    if identity.identity_type == "github":
                        welcome_string += f"""
                            <li>
                                <a href="https://github.com/{identity.name}">
                                    <i class="fa fa-github"></i>
                                    {identity.name}
                                </a>
                            </li>
                        """

                    elif identity.identity_type == "orcid":
                        welcome_string += f"""
                            <li>
                                <a href="https://orcid.org/{identity.name}">
                                    <img alt="ORCID logo" style="vertical-align: middle;", src="https://info.orcid.org/wp-content/uploads/2019/11/orcid_16x16.png" width="16" height="16" />
                                    {identity.name}
                                </a>
                            </li>
                        """

                welcome_string += "</ul>"

            else:
                welcome_string = (
                    """<h2>Welcome!</h2><h4>Please connect an OAuth account to continue:</h4>"""
                )

            connect_buttons = {
                "github": f"""
                    <a href={url_for('github.login')}>
                        <i class="fa fa-github"></i>
                        Connect GitHub
                    </a></br>
                """,
                "orcid": f"""
                    <a href={url_for("orcid.login")}>
                        <img alt="ORCID logo" style="vertical-align: middle;", src="https://info.orcid.org/wp-content/uploads/2019/11/orcid_16x16.png" width="16" height="16" />
                        Connect ORCID
                    </a></br>
                """,
            }

            auth_string = "<ul>"
            logout_string = ""

            if current_user.is_authenticated:
                for k in OAUTH_PROXIES:
                    if k not in current_user.identity_types:
                        auth_string += f"<li>{connect_buttons[k]}</li>"
                logout_string += f'<a href={url_for("logout")}>Log out</a>'

            else:
                for k in OAUTH_PROXIES:
                    auth_string += f'<li>{connect_buttons[k].replace("Connect", "Login via")}</li>'

            auth_string += "</ul>"

            endpoints_string = "\n".join(
                [
                    f'<li><a href="{endp[0]}"><pre>{endp[0]}</pre></a></li>'
                    for endp in ENDPOINTS.items()
                ]
            )
            endpoints_string = f"""<h3>Available endpoints:</h3><ul>{endpoints_string}</ul>"""

        else:
            auth_string = ""
            logout_string = ""
            welcome_string = ""
            endpoints_string = ""

        return f"""<head>
            <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css">
            </head>
            <h2><p style="color: CornflowerBlue">Welcome to pydatalab</p></h2>
<p>{welcome_string}</p>
<p>{auth_string}</p>
<p>{logout_string}</p>
<h3>API status:</h3>
<h4>{database_string}</h4>
{endpoints_string}
"""

    return app
register_endpoints(app: Flask)

Loops through the implemented endpoints, blueprints and error handlers adds them to the app.

Source code in pydatalab/main.py
def register_endpoints(app: Flask):
    """Loops through the implemented endpoints, blueprints and error handlers adds them to the app."""
    from pydatalab.errors import ERROR_HANDLERS
    from pydatalab.routes import BLUEPRINTS, ENDPOINTS, __api_version__, auth

    OAUTH_BLUEPRINTS = auth.OAUTH_BLUEPRINTS

    major, minor, patch = __api_version__.split(".")
    versions = ["", f"/v{major}", f"/v{major}.{minor}", f"/v{major}.{minor}.{patch}"]

    for rule, func in ENDPOINTS.items():
        for ver in versions:
            app.add_url_rule(
                f"{ver}{rule}",
                f"{ver}{rule}",
                logged_route(func),
            )

    for bp in BLUEPRINTS:
        for ver in versions:
            app.register_blueprint(bp, url_prefix=f"{ver}", name=f"{ver}/{bp.name}")

    for bp in OAUTH_BLUEPRINTS:  # type: ignore
        app.register_blueprint(OAUTH_BLUEPRINTS[bp], url_prefix="/login")  # type: ignore

    for exception_type, handler in ERROR_HANDLERS:
        app.register_error_handler(exception_type, handler)

models special

ITEM_MODELS: Dict[str, Type[pydantic.main.BaseModel]]
Modules
cells
Classes
CellComponent (Constituent) pydantic-model
Source code in pydatalab/models/cells.py
class CellComponent(Constituent):
    ...
CellFormat (str, Enum)

An enumeration.

Source code in pydatalab/models/cells.py
class CellFormat(str, Enum):

    coin = "coin"
    pouch = "pouch"
    in_situ_xrd = "in situ (XRD)"
    in_situ_nmr = "in situ (NMR)"
    in_situ_squid = "in situ (SQUID)"
    swagelok = "swagelok"
    cylindrical = "cylindrical"
    other = "other"
coin
cylindrical
in_situ_nmr
in_situ_squid
in_situ_xrd
other
pouch
swagelok
Cell (Item) pydantic-model

A model for representing electrochemical cells.

Source code in pydatalab/models/cells.py
class Cell(Item):
    """A model for representing electrochemical cells."""

    type: str = Field("cells", const="cells", pattern="^cells$")

    cell_format: Optional[CellFormat] = Field(
        description="The form factor of the cell, e.g., coin, pouch, in situ or otherwise.",
    )

    cell_format_description: Optional[str] = Field(
        description="Additional human-readable description of the cell form factor, e.g., 18650, AMPIX, CAMPIX"
    )

    cell_preparation_description: Optional[str] = Field()

    characteristic_mass: Optional[float] = Field(
        description="The characteristic mass of the cell in milligrams. Can be used to normalize capacities."
    )

    characteristic_chemical_formula: Optional[str] = Field(
        description="The chemical formula of the active material. Can be used to calculated molar mass in g/mol for normalizing capacities."
    )

    characteristic_molar_mass: Optional[float] = Field(
        description="The molar mass of the active material, in g/mol. Will be inferred from the chemical formula, or can be supplied if it cannot be supplied"
    )

    positive_electrode: List[CellComponent] = Field([])

    negative_electrode: List[CellComponent] = Field([])

    electrolyte: List[CellComponent] = Field([])

    active_ion_charge: float = Field(1)

    @validator("characteristic_molar_mass", always=True, pre=True)
    def set_molar_mass(cls, v, values):
        from periodictable import formula

        if not v:
            chemical_formula = values.get("characteristic_chemical_formula")

            if chemical_formula:
                try:
                    return formula(chemical_formula).mass
                except Exception:
                    return None

        return v

    @root_validator
    def add_missing_electrode_relationships(cls, values):
        """Add any missing sample synthesis constituents to parent relationships"""
        from pydatalab.models.relationships import RelationshipType, TypedRelationship

        existing_parthood_relationship_ids = set()
        if values.get("relationships") is not None:
            existing_parthood_relationship_ids = set(
                relationship.item_id
                for relationship in values["relationships"]
                if relationship.relation == RelationshipType.PARTHOOD
            )
        else:
            values["relationships"] = []

        for component in ("positive_electrode", "negative_electrode", "electrolyte"):
            for constituent in values.get(component, []):
                if (
                    isinstance(constituent.item, EntryReference)
                    and constituent.item.item_id not in existing_parthood_relationship_ids
                ):
                    relationship = TypedRelationship(
                        relation=RelationshipType.PARTHOOD,
                        item_id=constituent.item.item_id,
                        type=constituent.item.type,
                        description="Is a constituent of",
                    )
                    values["relationships"].append(relationship)

        return values
Attributes
__slots__ special
cell_format: CellFormat pydantic-field

The form factor of the cell, e.g., coin, pouch, in situ or otherwise.

cell_format_description: str pydantic-field

Additional human-readable description of the cell form factor, e.g., 18650, AMPIX, CAMPIX

cell_preparation_description: str pydantic-field
characteristic_mass: float pydantic-field

The characteristic mass of the cell in milligrams. Can be used to normalize capacities.

characteristic_chemical_formula: str pydantic-field

The chemical formula of the active material. Can be used to calculated molar mass in g/mol for normalizing capacities.

characteristic_molar_mass: float pydantic-field

The molar mass of the active material, in g/mol. Will be inferred from the chemical formula, or can be supplied if it cannot be supplied

positive_electrode: List[pydatalab.models.cells.CellComponent] pydantic-field
negative_electrode: List[pydatalab.models.cells.CellComponent] pydantic-field
electrolyte: List[pydatalab.models.cells.CellComponent] pydantic-field
active_ion_charge: float pydantic-field
Methods
set_molar_mass(v, values) classmethod
Source code in pydatalab/models/cells.py
@validator("characteristic_molar_mass", always=True, pre=True)
def set_molar_mass(cls, v, values):
    from periodictable import formula

    if not v:
        chemical_formula = values.get("characteristic_chemical_formula")

        if chemical_formula:
            try:
                return formula(chemical_formula).mass
            except Exception:
                return None

    return v
add_missing_electrode_relationships(values) classmethod

Add any missing sample synthesis constituents to parent relationships

Source code in pydatalab/models/cells.py
@root_validator
def add_missing_electrode_relationships(cls, values):
    """Add any missing sample synthesis constituents to parent relationships"""
    from pydatalab.models.relationships import RelationshipType, TypedRelationship

    existing_parthood_relationship_ids = set()
    if values.get("relationships") is not None:
        existing_parthood_relationship_ids = set(
            relationship.item_id
            for relationship in values["relationships"]
            if relationship.relation == RelationshipType.PARTHOOD
        )
    else:
        values["relationships"] = []

    for component in ("positive_electrode", "negative_electrode", "electrolyte"):
        for constituent in values.get(component, []):
            if (
                isinstance(constituent.item, EntryReference)
                and constituent.item.item_id not in existing_parthood_relationship_ids
            ):
                relationship = TypedRelationship(
                    relation=RelationshipType.PARTHOOD,
                    item_id=constituent.item.item_id,
                    type=constituent.item.type,
                    description="Is a constituent of",
                )
                values["relationships"].append(relationship)

    return values
collections
Collection (Entry, HasOwner, HasBlocks) pydantic-model
Source code in pydatalab/models/collections.py
class Collection(Entry, HasOwner, HasBlocks):

    type: str = Field("collections", const="collections", pattern="^collections$")

    collection_id: HumanReadableIdentifier = Field(None)
    """A short human-readable/usable name for the collection."""

    title: Optional[str]
    """A descriptive title for the collection."""

    description: Optional[str]
    """A description of the collection, either in plain-text or a markup language."""

    num_items: Optional[int] = Field(None)
    """Inlined number of items associated with this collection."""

    @root_validator
    def check_ids(cls, values):
        if not any(values.get(k) is not None for k in ("collection_id", "immutable_id")):
            raise ValueError("Collection must have at least collection_id or immutable_id")

        return values
__slots__ special
collection_id: HumanReadableIdentifier pydantic-field
title: str pydantic-field
description: str pydantic-field
num_items: int pydantic-field
check_ids(values) classmethod
Source code in pydatalab/models/collections.py
@root_validator
def check_ids(cls, values):
    if not any(values.get(k) is not None for k in ("collection_id", "immutable_id")):
        raise ValueError("Collection must have at least collection_id or immutable_id")

    return values
entries
Classes
Entry (BaseModel, ABC) pydantic-model

An Entry is an abstract base class for any model that can be deserialized and stored in the database.

Source code in pydatalab/models/entries.py
class Entry(BaseModel, abc.ABC):
    """An Entry is an abstract base class for any model that can be
    deserialized and stored in the database.

    """

    type: str
    """The resource type of the entry."""

    immutable_id: PyObjectId = Field(
        None,
        title="Immutable ID",
        alias="_id",
    )
    """The immutable database ID of the entry."""

    last_modified: Optional[IsoformatDateTime] = None
    """The timestamp at which the entry was last modified."""

    relationships: Optional[List[TypedRelationship]] = None
    """A list of related entries and their types."""

    @root_validator(pre=True)
    def check_id_names(cls, values):
        """Slightly upsetting hack: this case *should* be covered by the pydantic setting for
        populating fields by alias names.
        """
        if "_id" in values:
            values["immutable_id"] = values.pop("_id")

        return values

    def to_reference(self, additional_fields: Optional[List[str]] = None) -> "EntryReference":
        """Populate an EntryReference model from this entry, selecting additional fields to inline.

        Parameters:
            additional_fields: A list of fields to inline in the reference.

        """
        if additional_fields is None:
            additional_fields = []

        data = {
            "type": self.type,
            "item_id": getattr(self, "item_id", None),
            "immutable_id": getattr(self, "immutable_id", None),
        }
        data.update({field: getattr(self, field, None) for field in additional_fields})

        return EntryReference(**data)

    class Config:
        allow_population_by_field_name = True
        json_encoders = JSON_ENCODERS
        extra = "ignore"
__slots__ special
type: str pydantic-field required
immutable_id: PyObjectId pydantic-field
last_modified: IsoformatDateTime pydantic-field
relationships: List[pydatalab.models.relationships.TypedRelationship] pydantic-field
Methods
check_id_names(values) classmethod

Slightly upsetting hack: this case should be covered by the pydantic setting for populating fields by alias names.

Source code in pydatalab/models/entries.py
@root_validator(pre=True)
def check_id_names(cls, values):
    """Slightly upsetting hack: this case *should* be covered by the pydantic setting for
    populating fields by alias names.
    """
    if "_id" in values:
        values["immutable_id"] = values.pop("_id")

    return values
to_reference(self, additional_fields: Optional[List[str]] = None) -> EntryReference

Populate an EntryReference model from this entry, selecting additional fields to inline.

Parameters:

Name Type Description Default
additional_fields Optional[List[str]]

A list of fields to inline in the reference.

None
Source code in pydatalab/models/entries.py
def to_reference(self, additional_fields: Optional[List[str]] = None) -> "EntryReference":
    """Populate an EntryReference model from this entry, selecting additional fields to inline.

    Parameters:
        additional_fields: A list of fields to inline in the reference.

    """
    if additional_fields is None:
        additional_fields = []

    data = {
        "type": self.type,
        "item_id": getattr(self, "item_id", None),
        "immutable_id": getattr(self, "immutable_id", None),
    }
    data.update({field: getattr(self, field, None) for field in additional_fields})

    return EntryReference(**data)
files
Classes
File (Entry) pydantic-model

A model for representing a file that has been tracked or uploaded to datalab.

Source code in pydatalab/models/files.py
class File(Entry):
    """A model for representing a file that has been tracked or uploaded to datalab."""

    type: str = Field("files", const="files", pattern="^files$")

    size: Optional[int] = Field(description="The size of the file on disk in bytes.")

    last_modified_remote: Optional[IsoformatDateTime] = Field(
        description="The last date/time at which the remote file was modified."
    )

    item_ids: List[str] = Field(description="A list of item IDs associated with this file.")

    blocks: List[str] = Field(description="A list of block IDs associated with this file.")

    name: str = Field(description="The filename on disk.")

    extension: str = Field(description="The file extension that the file was uploaded with.")

    original_name: Optional[str] = Field(description="The raw filename as uploaded.")

    location: Optional[str] = Field(description="The location of the file on disk.")

    url_path: Optional[str] = Field(description="The path to a remote file.")

    source: Optional[str] = Field(
        description="The source of the file, e.g. 'remote' or 'uploaded'."
    )

    time_added: datetime.datetime = Field(description="The timestamp for the original file upload.")

    metadata: Optional[Dict[Any, Any]] = Field(description="Any additional metadata.")

    representation: Optional[Any] = Field()

    source_server_name: Optional[str] = Field(
        description="The server name at which the file is stored."
    )

    source_path: Optional[str] = Field(description="The path to the file on the remote resource.")

    is_live: bool = Field(
        description="Whether or not the file should be watched for future updates."
    )
Attributes
__slots__ special
size: int pydantic-field

The size of the file on disk in bytes.

last_modified_remote: IsoformatDateTime pydantic-field

The last date/time at which the remote file was modified.

item_ids: List[str] pydantic-field required

A list of item IDs associated with this file.

blocks: List[str] pydantic-field required

A list of block IDs associated with this file.

name: str pydantic-field required

The filename on disk.

extension: str pydantic-field required

The file extension that the file was uploaded with.

original_name: str pydantic-field

The raw filename as uploaded.

location: str pydantic-field

The location of the file on disk.

url_path: str pydantic-field

The path to a remote file.

source: str pydantic-field

The source of the file, e.g. 'remote' or 'uploaded'.

time_added: datetime pydantic-field required

The timestamp for the original file upload.

metadata: Dict[Any, Any] pydantic-field

Any additional metadata.

representation: Any pydantic-field
source_server_name: str pydantic-field

The server name at which the file is stored.

source_path: str pydantic-field

The path to the file on the remote resource.

is_live: bool pydantic-field required

Whether or not the file should be watched for future updates.

items
Classes
Item (Entry, HasOwner, HasRevisionControl, IsCollectable, HasBlocks, ABC) pydantic-model

The generic model for data types that will be exposed with their own named endpoints.

Source code in pydatalab/models/items.py
class Item(Entry, HasOwner, HasRevisionControl, IsCollectable, HasBlocks, abc.ABC):
    """The generic model for data types that will be exposed with their own named endpoints."""

    refcode: Refcode = None  # type: ignore
    """A globally unique immutable ID comprised of the deployment prefix (e.g., `grey`)
    and a locally unique string, ideally created with some consistent scheme.
    """

    item_id: HumanReadableIdentifier
    """A locally unique, human-readable identifier for the entry. This ID is mutable."""

    description: Optional[str]
    """A description of the item, either in plain-text or a markup language."""

    date: Optional[IsoformatDateTime]
    """A relevant 'creation' timestamp for the entry (e.g., purchase date, synthesis date)."""

    name: Optional[str]
    """An optional human-readable/usable name for the entry."""

    files: Optional[List[File]]
    """Any files attached to this sample."""

    file_ObjectIds: List[PyObjectId] = Field([])
    """Links to object IDs of files stored within the database."""

    @validator("refcode", pre=True, always=True)
    def refcode_validator(cls, v):
        """Generate a refcode if not provided; check that the refcode has the correct prefix if provided."""

        from pydatalab.config import CONFIG

        if v and not v.startswith(f"{CONFIG.IDENTIFIER_PREFIX}:"):
            raise ValueError(f"refcode missing prefix {CONFIG.IDENTIFIER_PREFIX!r}")

        return v
__slots__ special
refcode: Refcode pydantic-field
item_id: HumanReadableIdentifier pydantic-field required
description: str pydantic-field
date: IsoformatDateTime pydantic-field
name: str pydantic-field
files: List[pydatalab.models.files.File] pydantic-field
file_ObjectIds: List[pydatalab.models.utils.PyObjectId] pydantic-field
Methods
refcode_validator(v) classmethod

Generate a refcode if not provided; check that the refcode has the correct prefix if provided.

Source code in pydatalab/models/items.py
@validator("refcode", pre=True, always=True)
def refcode_validator(cls, v):
    """Generate a refcode if not provided; check that the refcode has the correct prefix if provided."""

    from pydatalab.config import CONFIG

    if v and not v.startswith(f"{CONFIG.IDENTIFIER_PREFIX}:"):
        raise ValueError(f"refcode missing prefix {CONFIG.IDENTIFIER_PREFIX!r}")

    return v
people
Classes
IdentityType (str, Enum)

A string enum representing the supported verifiable identity types.

Source code in pydatalab/models/people.py
class IdentityType(str, Enum):
    """A string enum representing the supported verifiable identity types."""

    EMAIL = "email"
    ORCID = "orcid"
    GITHUB = "github"
EMAIL
GITHUB
ORCID
Identity (BaseModel) pydantic-model

A model for identities that can be provided by external systems and associated with a given user.

Source code in pydatalab/models/people.py
class Identity(BaseModel):
    """A model for identities that can be provided by external systems
    and associated with a given user.

    """

    identity_type: IdentityType
    """The type or provider of the identity."""

    identifier: str
    """The identifier for the identity, e.g., an email address, an ORCID, a GitHub user ID."""

    name: str
    """The name associated with the identity to be exposed in free-text searches over people, e.g., an institutional username, a GitHub username."""

    verified: bool = Field(False)
    """Whether the identity has been verified (by some means, e.g., OAuth2 or email)"""

    display_name: Optional[str]
    """The user's display name associated with the identity, also to be exposed in free text searches."""

    @validator("name", pre=True, always=True)
    def add_missing_name(cls, v, values):
        """If the identity is created without a free-text 'name', then
        for certain providers, populate this field so that it can appear
        in the free text index, e.g., an ORCID, or an institutional username
        from an email address.

        """
        if v is None:
            if values["identity_type"] == IdentityType.ORCID:
                return values["identifier"]
            if values["identity_type"] == IdentityType.EMAIL:
                return values["identifier"].split("@")[0]

        return v

    @validator("verified", pre=True, always=True)
    def add_missing_verification(cls, v):
        """Fills in missing value for `verified` if not given."""
        if not v:
            v = False
        return v
identity_type: IdentityType pydantic-field required
identifier: str pydantic-field required
name: str pydantic-field required
verified: bool pydantic-field
display_name: str pydantic-field
Methods
add_missing_name(v, values) classmethod

If the identity is created without a free-text 'name', then for certain providers, populate this field so that it can appear in the free text index, e.g., an ORCID, or an institutional username from an email address.

Source code in pydatalab/models/people.py
@validator("name", pre=True, always=True)
def add_missing_name(cls, v, values):
    """If the identity is created without a free-text 'name', then
    for certain providers, populate this field so that it can appear
    in the free text index, e.g., an ORCID, or an institutional username
    from an email address.

    """
    if v is None:
        if values["identity_type"] == IdentityType.ORCID:
            return values["identifier"]
        if values["identity_type"] == IdentityType.EMAIL:
            return values["identifier"].split("@")[0]

    return v
add_missing_verification(v) classmethod

Fills in missing value for verified if not given.

Source code in pydatalab/models/people.py
@validator("verified", pre=True, always=True)
def add_missing_verification(cls, v):
    """Fills in missing value for `verified` if not given."""
    if not v:
        v = False
    return v
Person (Entry) pydantic-model

A model that describes an individual and their digital identities.

Source code in pydatalab/models/people.py
class Person(Entry):
    """A model that describes an individual and their digital identities."""

    type: str = Field("people", const=True)
    """The entry type as a string."""

    identities: List[Identity] = Field(default_factory=list)
    """A list of identities attached to this person, e.g., email addresses, OAuth accounts."""

    display_name: Optional[str]
    """The user-chosen display name."""

    contact_email: Optional[EmailStr]
    """In the case of multiple *verified* email identities, this email will be used as the primary contact."""

    @validator("type", pre=True, always=True)
    def add_missing_type(cls, v):
        """Fill in missing `type` field if not provided."""
        if v is None:
            v = "people"
        return v

    @validator("type", pre=True)
    def set_default_type(cls, _):
        return "people"

    @staticmethod
    def new_user_from_identity(
        identity: Identity, use_display_name: bool = True, use_contact_email: bool = True
    ) -> "Person":
        """Create a new `Person` object with the given identity.

        Arguments:
            identity: The identity to populate the `identities` field with.
            use_display_name: Whether to set the top-level `display_name`
                field with any display name present in the identity.
            use_contact_email: If the identity provided is an email address,
                this argument decides whether to populate the top-level
                `contact_email` field with the address of this identity.

        Returns:
            A `Person` object with only the provided identity.

        """
        user_id = bson.ObjectId()

        display_name = None
        if use_display_name:
            display_name = identity.display_name

        contact_email = None
        if use_contact_email and identity.identity_type is IdentityType.EMAIL:
            contact_email = identity.identifier

        return Person(
            immutable_id=user_id,
            identities=[identity],
            display_name=display_name,
            contact_email=contact_email,
        )
__slots__ special
identities: List[pydatalab.models.people.Identity] pydantic-field
display_name: str pydantic-field
contact_email: EmailStr pydantic-field
Methods
add_missing_type(v) classmethod

Fill in missing type field if not provided.

Source code in pydatalab/models/people.py
@validator("type", pre=True, always=True)
def add_missing_type(cls, v):
    """Fill in missing `type` field if not provided."""
    if v is None:
        v = "people"
    return v
set_default_type(_) classmethod
Source code in pydatalab/models/people.py
@validator("type", pre=True)
def set_default_type(cls, _):
    return "people"
new_user_from_identity(identity: Identity, use_display_name: bool = True, use_contact_email: bool = True) -> Person staticmethod

Create a new Person object with the given identity.

Parameters:

Name Type Description Default
identity Identity

The identity to populate the identities field with.

required
use_display_name bool

Whether to set the top-level display_name field with any display name present in the identity.

True
use_contact_email bool

If the identity provided is an email address, this argument decides whether to populate the top-level contact_email field with the address of this identity.

True

Returns:

Type Description
Person

A Person object with only the provided identity.

Source code in pydatalab/models/people.py
@staticmethod
def new_user_from_identity(
    identity: Identity, use_display_name: bool = True, use_contact_email: bool = True
) -> "Person":
    """Create a new `Person` object with the given identity.

    Arguments:
        identity: The identity to populate the `identities` field with.
        use_display_name: Whether to set the top-level `display_name`
            field with any display name present in the identity.
        use_contact_email: If the identity provided is an email address,
            this argument decides whether to populate the top-level
            `contact_email` field with the address of this identity.

    Returns:
        A `Person` object with only the provided identity.

    """
    user_id = bson.ObjectId()

    display_name = None
    if use_display_name:
        display_name = identity.display_name

    contact_email = None
    if use_contact_email and identity.identity_type is IdentityType.EMAIL:
        contact_email = identity.identifier

    return Person(
        immutable_id=user_id,
        identities=[identity],
        display_name=display_name,
        contact_email=contact_email,
    )
relationships
Classes
RelationshipType (str, Enum)

An enumeration of the possible types of relationship between two entries.

classDiagram
class entryC
entryC --|> entryA: parent
entryC ..|> entryD
entryA <..> entryD: sibling
entryA --|> entryB : child
Source code in pydatalab/models/relationships.py
class RelationshipType(str, Enum):
    """An enumeration of the possible types of relationship between two entries.

    ```mermaid
    classDiagram
    class entryC
    entryC --|> entryA: parent
    entryC ..|> entryD
    entryA <..> entryD: sibling
    entryA --|> entryB : child
    ```

    """

    PARENT = "parent"
    CHILD = "child"
    SIBLING = "sibling"
    PARTHOOD = "is_part_of"
    OTHER = "other"
CHILD
OTHER
PARENT
PARTHOOD
SIBLING
TypedRelationship (BaseModel) pydantic-model
Source code in pydatalab/models/relationships.py
class TypedRelationship(BaseModel):

    description: Optional[str] = Field(
        None,
        description="A description of the relationship.",
    )

    relation: Optional[RelationshipType] = Field(
        None,
        description="The type of relationship between the two items. If the type is 'other', then a human-readable description should be provided.",
    )

    type: KnownType = Field(description="The type of the related resource.")

    immutable_id: Optional[PyObjectId] = Field(
        description="The immutable ID of the entry that is related to this entry."
    )

    item_id: Optional[HumanReadableIdentifier] = Field(
        description="The ID of the entry that is related to this entry."
    )

    refcode: Optional[Refcode] = Field(
        description="The refcode of the entry that is related to this entry."
    )

    @validator("relation")
    def check_for_description(cls, v, values):
        if v == RelationshipType.OTHER and values.get("description") is None:
            raise ValueError(
                f"A description must be provided if the relationship type is {RelationshipType.OTHER.value!r}."
            )

        return v

    @root_validator
    def check_id_fields(cls, values):
        """Check that only one of the possible identifier fields is provided."""
        id_fields = ("immutable_id", "item_id", "refcode")
        if all(values[f] is None for f in id_fields):
            raise ValueError(f"Must provide at least one of {id_fields!r}")
        if sum(1 for f in id_fields if values[f] is not None) > 1:
            raise ValueError("Must provide only one of {id_fields!r}")

        return values
Attributes
description: str pydantic-field

A description of the relationship.

relation: RelationshipType pydantic-field

The type of relationship between the two items. If the type is 'other', then a human-readable description should be provided.

type: KnownType pydantic-field required

The type of the related resource.

immutable_id: PyObjectId pydantic-field

The immutable ID of the entry that is related to this entry.

item_id: HumanReadableIdentifier pydantic-field

The ID of the entry that is related to this entry.

refcode: Refcode pydantic-field

The refcode of the entry that is related to this entry.

Methods
check_for_description(v, values) classmethod
Source code in pydatalab/models/relationships.py
@validator("relation")
def check_for_description(cls, v, values):
    if v == RelationshipType.OTHER and values.get("description") is None:
        raise ValueError(
            f"A description must be provided if the relationship type is {RelationshipType.OTHER.value!r}."
        )

    return v
check_id_fields(values) classmethod

Check that only one of the possible identifier fields is provided.

Source code in pydatalab/models/relationships.py
@root_validator
def check_id_fields(cls, values):
    """Check that only one of the possible identifier fields is provided."""
    id_fields = ("immutable_id", "item_id", "refcode")
    if all(values[f] is None for f in id_fields):
        raise ValueError(f"Must provide at least one of {id_fields!r}")
    if sum(1 for f in id_fields if values[f] is not None) > 1:
        raise ValueError("Must provide only one of {id_fields!r}")

    return values
samples
Classes
Sample (Item) pydantic-model

A model for representing an experimental sample.

Source code in pydatalab/models/samples.py
class Sample(Item):
    """A model for representing an experimental sample."""

    type: str = Field("samples", const="samples", pattern="^samples$")

    chemform: Optional[str] = Field(example=["Na3P", "LiNiO2@C"])
    """A string representation of the chemical formula or composition associated with this sample."""

    synthesis_constituents: List[Constituent] = Field([])
    """A list of references to constituent materials giving the amount and relevant inlined details of consituent items."""

    synthesis_description: Optional[str]
    """Free-text details of the procedure applied to synthesise the sample"""

    @root_validator
    def add_missing_synthesis_relationships(cls, values):
        """Add any missing sample synthesis constituents to parent relationships"""
        from pydatalab.models.relationships import RelationshipType, TypedRelationship

        constituents_set = set()
        if values.get("synthesis_constituents") is not None:

            existing_parent_relationship_ids = set()
            if values.get("relationships") is not None:
                existing_parent_relationship_ids = set(
                    relationship.item_id or relationship.refcode
                    for relationship in values["relationships"]
                    if relationship.relation == RelationshipType.PARENT
                )
            else:
                values["relationships"] = []

            for constituent in values.get("synthesis_constituents", []):
                # If this is an inline relationship, just skip it
                if isinstance(constituent.item, InlineSubstance):
                    continue
                if (
                    constituent.item.item_id not in existing_parent_relationship_ids
                    and constituent.item.refcode not in existing_parent_relationship_ids
                ):
                    relationship = TypedRelationship(
                        relation=RelationshipType.PARENT,
                        item_id=constituent.item.item_id,
                        type=constituent.item.type,
                        description="Is a constituent of",
                    )
                    values["relationships"].append(relationship)

                # Accumulate all constituent IDs in a set to filter those that have been deleted
                constituents_set.add(constituent.item.item_id)

        # Finally, filter out any parent relationships with item that were removed
        # from the synthesis constituents
        values["relationships"] = [
            rel
            for rel in values["relationships"]
            if not (
                rel.item_id not in constituents_set
                and rel.relation == RelationshipType.PARENT
                and rel.type in ("samples", "starting_materials")
            )
        ]

        return values
__slots__ special
chemform: str pydantic-field
synthesis_constituents: List[pydatalab.models.utils.Constituent] pydantic-field
synthesis_description: str pydantic-field
Methods
add_missing_synthesis_relationships(values) classmethod

Add any missing sample synthesis constituents to parent relationships

Source code in pydatalab/models/samples.py
@root_validator
def add_missing_synthesis_relationships(cls, values):
    """Add any missing sample synthesis constituents to parent relationships"""
    from pydatalab.models.relationships import RelationshipType, TypedRelationship

    constituents_set = set()
    if values.get("synthesis_constituents") is not None:

        existing_parent_relationship_ids = set()
        if values.get("relationships") is not None:
            existing_parent_relationship_ids = set(
                relationship.item_id or relationship.refcode
                for relationship in values["relationships"]
                if relationship.relation == RelationshipType.PARENT
            )
        else:
            values["relationships"] = []

        for constituent in values.get("synthesis_constituents", []):
            # If this is an inline relationship, just skip it
            if isinstance(constituent.item, InlineSubstance):
                continue
            if (
                constituent.item.item_id not in existing_parent_relationship_ids
                and constituent.item.refcode not in existing_parent_relationship_ids
            ):
                relationship = TypedRelationship(
                    relation=RelationshipType.PARENT,
                    item_id=constituent.item.item_id,
                    type=constituent.item.type,
                    description="Is a constituent of",
                )
                values["relationships"].append(relationship)

            # Accumulate all constituent IDs in a set to filter those that have been deleted
            constituents_set.add(constituent.item.item_id)

    # Finally, filter out any parent relationships with item that were removed
    # from the synthesis constituents
    values["relationships"] = [
        rel
        for rel in values["relationships"]
        if not (
            rel.item_id not in constituents_set
            and rel.relation == RelationshipType.PARENT
            and rel.type in ("samples", "starting_materials")
        )
    ]

    return values
starting_materials
Classes
StartingMaterial (Item) pydantic-model

A model for representing an experimental sample.

Source code in pydatalab/models/starting_materials.py
class StartingMaterial(Item):
    """A model for representing an experimental sample."""

    type: str = Field(
        "starting_materials", const="starting_materials", pattern="^starting_materials$"
    )

    barcode: Optional[str] = Field(
        alias="Barcode", description="A unique barcode from ChemInventory"
    )

    date_acquired: Optional[IsoformatDateTime] = Field(
        alias="Date Acquired", description="The date the item was acquired"
    )

    date_opened: Optional[IsoformatDateTime] = Field(
        alias="Date opened", description="The date the container was opened"
    )

    CAS: Optional[str] = Field(alias="Substance CAS", description="CAS Registry Number")

    chemical_purity: Optional[str] = Field(alias="Chemical purity")

    full_percent: Optional[str] = Field(alias="Full %")

    name: str = Field(alias="Container Name", description="name of the chemical")

    size: Optional[str] = Field(
        alias="Container Size", description="size of the container (see 'size_unit' for the units)"
    )

    size_unit: Optional[str] = Field(alias="Unit", description="units for the 'size' field.")

    chemform: Optional[str] = Field(
        alias="Molecular Formula",
        description="A string representation of the chemical formula associated with this sample.",
    )

    molar_mass: Optional[float] = Field(
        alias="Molecular Weight", description="Mass per formula unit, in g/mol"
    )

    smiles_representation: Optional[str] = Field(
        alias="SMILES", description="Chemical structure in SMILES notation"
    )

    supplier: Optional[str] = Field(alias="Supplier", description="Manufacturer of the chemical")

    location: Optional[str] = Field(
        alias="Location", description="Location where chemical is stored"
    )

    comment: Optional[str] = Field(alias="Comments")

    @validator("molar_mass")
    def add_molar_mass(cls, v, values):
        from periodictable import formula

        if v is None and values.get("chemform"):
            return formula(values.get("chemform")).mass

        return v
Attributes
__slots__ special
barcode: str pydantic-field

A unique barcode from ChemInventory

date_acquired: IsoformatDateTime pydantic-field

The date the item was acquired

date_opened: IsoformatDateTime pydantic-field

The date the container was opened

CAS: str pydantic-field

CAS Registry Number

chemical_purity: str pydantic-field
full_percent: str pydantic-field
size: str pydantic-field

size of the container (see 'size_unit' for the units)

size_unit: str pydantic-field

units for the 'size' field.

chemform: str pydantic-field

A string representation of the chemical formula associated with this sample.

molar_mass: float pydantic-field

Mass per formula unit, in g/mol

smiles_representation: str pydantic-field

Chemical structure in SMILES notation

supplier: str pydantic-field

Manufacturer of the chemical

location: str pydantic-field

Location where chemical is stored

comment: str pydantic-field
add_molar_mass(v, values) classmethod
Source code in pydatalab/models/starting_materials.py
@validator("molar_mass")
def add_molar_mass(cls, v, values):
    from periodictable import formula

    if v is None and values.get("chemform"):
        return formula(values.get("chemform")).mass

    return v
traits
Classes
HasOwner (BaseModel) pydantic-model
Source code in pydatalab/models/traits.py
class HasOwner(BaseModel):

    creator_ids: List[PyObjectId] = Field([])
    """The database IDs of the user(s) who created the item."""

    creators: Optional[List[Person]] = Field(None)
    """Inlined info for the people associated with this item."""
creator_ids: List[pydatalab.models.utils.PyObjectId] pydantic-field
creators: List[pydatalab.models.people.Person] pydantic-field
HasRevisionControl (BaseModel) pydantic-model
Source code in pydatalab/models/traits.py
class HasRevisionControl(BaseModel):

    revision: int = 1
    """The revision number of the entry."""

    revisions: Optional[Dict[int, Any]] = None
    """An optional mapping from old revision numbers to the model state at that revision."""
revision: int pydantic-field
revisions: Dict[int, Any] pydantic-field
HasBlocks (BaseModel) pydantic-model
Source code in pydatalab/models/traits.py
class HasBlocks(BaseModel):

    blocks_obj: Dict[str, Any] = Field({})
    """A mapping from block ID to block data."""

    display_order: List[str] = Field([])
    """The order in which to display block data in the UI."""
blocks_obj: Dict[str, Any] pydantic-field
display_order: List[str] pydantic-field
IsCollectable (BaseModel) pydantic-model

Trait mixin for models that can be added to collections.

Source code in pydatalab/models/traits.py
class IsCollectable(BaseModel):
    """Trait mixin for models that can be
    added to collections.
    """

    from pydatalab.models.collections import Collection

    collections: List[Collection] = Field([])
    """Inlined info for the collections associated with this item."""

    @root_validator
    def add_missing_collection_relationships(cls, values):
        from pydatalab.models.relationships import TypedRelationship

        if values.get("collections") is not None:

            new_ids = set(coll.immutable_id for coll in values["collections"])
            existing_collection_relationship_ids = set()
            if values.get("relationships") is not None:
                existing_collection_relationship_ids = set(
                    relationship.immutable_id
                    for relationship in values["relationships"]
                    if relationship.type == "collections"
                )
            else:
                values["relationships"] = []

            for collection in values.get("collections", []):
                if collection.immutable_id not in existing_collection_relationship_ids:
                    relationship = TypedRelationship(
                        relation=None,
                        immutable_id=collection.immutable_id,
                        type="collections",
                        description="Is a member of",
                    )
                    values["relationships"].append(relationship)

            values["relationships"] = [
                d
                for d in values.get("relationships", [])
                if d.type != "collections" or d.immutable_id in new_ids
            ]

        if len([d for d in values.get("relationships", []) if d.type == "collections"]) != len(
            values.get("collections", [])
        ):
            breakpoint()
            raise RuntimeError("Relationships and collections mismatch")

        return values
collections: List[pydatalab.models.collections.Collection] pydantic-field
Collection (Entry, HasOwner, HasBlocks) pydantic-model
Source code in pydatalab/models/traits.py
class Collection(Entry, HasOwner, HasBlocks):

    type: str = Field("collections", const="collections", pattern="^collections$")

    collection_id: HumanReadableIdentifier = Field(None)
    """A short human-readable/usable name for the collection."""

    title: Optional[str]
    """A descriptive title for the collection."""

    description: Optional[str]
    """A description of the collection, either in plain-text or a markup language."""

    num_items: Optional[int] = Field(None)
    """Inlined number of items associated with this collection."""

    @root_validator
    def check_ids(cls, values):
        if not any(values.get(k) is not None for k in ("collection_id", "immutable_id")):
            raise ValueError("Collection must have at least collection_id or immutable_id")

        return values
__slots__ special
collection_id: HumanReadableIdentifier pydantic-field
title: str pydantic-field
description: str pydantic-field
num_items: int pydantic-field
check_ids(values) classmethod
Source code in pydatalab/models/traits.py
@root_validator
def check_ids(cls, values):
    if not any(values.get(k) is not None for k in ("collection_id", "immutable_id")):
        raise ValueError("Collection must have at least collection_id or immutable_id")

    return values
add_missing_collection_relationships(values) classmethod
Source code in pydatalab/models/traits.py
@root_validator
def add_missing_collection_relationships(cls, values):
    from pydatalab.models.relationships import TypedRelationship

    if values.get("collections") is not None:

        new_ids = set(coll.immutable_id for coll in values["collections"])
        existing_collection_relationship_ids = set()
        if values.get("relationships") is not None:
            existing_collection_relationship_ids = set(
                relationship.immutable_id
                for relationship in values["relationships"]
                if relationship.type == "collections"
            )
        else:
            values["relationships"] = []

        for collection in values.get("collections", []):
            if collection.immutable_id not in existing_collection_relationship_ids:
                relationship = TypedRelationship(
                    relation=None,
                    immutable_id=collection.immutable_id,
                    type="collections",
                    description="Is a member of",
                )
                values["relationships"].append(relationship)

        values["relationships"] = [
            d
            for d in values.get("relationships", [])
            if d.type != "collections" or d.immutable_id in new_ids
        ]

    if len([d for d in values.get("relationships", []) if d.type == "collections"]) != len(
        values.get("collections", [])
    ):
        breakpoint()
        raise RuntimeError("Relationships and collections mismatch")

    return values
utils
Attributes
IDENTIFIER_REGEX

A regex that matches identifiers that are url-safe and do not contain leading or trailing punctuation.

JSON_ENCODERS
Mass: typing_extensions.TypeAlias
Volume: typing_extensions.TypeAlias
Classes
ItemType (str, Enum)

An enumeration of the types of items known by this implementation, should be made dynamic in the future.

Source code in pydatalab/models/utils.py
class ItemType(str, Enum):
    """An enumeration of the types of items known by this implementation, should be made dynamic in the future."""

    SAMPLES = "samples"
    STARTING_MATERIALS = "starting_materials"
SAMPLES
STARTING_MATERIALS
KnownType (str, Enum)

An enumeration of the types of entry known by this implementation, should be made dynamic in the future.

Source code in pydatalab/models/utils.py
class KnownType(str, Enum):
    """An enumeration of the types of entry known by this implementation, should be made dynamic in the future."""

    SAMPLES = "samples"
    STARTING_MATERIALS = "starting_materials"
    BLOCKS = "blocks"
    FILES = "files"
    PEOPLE = "people"
    COLLECTIONS = "collections"
BLOCKS
COLLECTIONS
FILES
PEOPLE
SAMPLES
STARTING_MATERIALS
HumanReadableIdentifier (ConstrainedStr)

Used to constrain human-readable and URL-safe identifiers for items.

Source code in pydatalab/models/utils.py
class HumanReadableIdentifier(ConstrainedStr):
    """Used to constrain human-readable and URL-safe identifiers for items."""

    min_length = 1
    max_length = 40
    strip_whitespace = True
    to_lower = False
    strict = False
    regex = IDENTIFIER_REGEX

    def __init__(self, value):
        self.value = parse_obj_as(type(self), value)

    def __str__(self):
        return self.value

    def __repr__(self):
        return self.value

    def __bool__(self):
        return bool(self.value)
max_length
min_length
regex
strict
strip_whitespace
to_lower
__init__(self, value) special
Source code in pydatalab/models/utils.py
def __init__(self, value):
    self.value = parse_obj_as(type(self), value)
__str__(self) special
Source code in pydatalab/models/utils.py
def __str__(self):
    return self.value
__repr__(self) special
Source code in pydatalab/models/utils.py
def __repr__(self):
    return self.value
__bool__(self) special
Source code in pydatalab/models/utils.py
def __bool__(self):
    return bool(self.value)
Refcode (HumanReadableIdentifier)
Source code in pydatalab/models/utils.py
class Refcode(HumanReadableIdentifier):

    regex = r"^[a-z]{2,10}:" + IDENTIFIER_REGEX[1:]
    """A regex to match refcodes that have a lower-case prefix between 2-10 chars, followed by a colon,
    and then the normal rules for an ID (url-safe etc.).

    """

    @property
    def prefix(self):
        return self.value.split(":")[0]

    @property
    def identifier(self):
        return self.value.split(":")[1]
Attributes
regex

A regex to match refcodes that have a lower-case prefix between 2-10 chars, followed by a colon, and then the normal rules for an ID (url-safe etc.).

prefix property readonly
identifier property readonly
UserRole (str, Enum)

An enumeration.

Source code in pydatalab/models/utils.py
class UserRole(str, Enum):
    USER = "user"
    ADMIN = "admin"
    MANAGER = "manager"
ADMIN
MANAGER
USER
PintType (str)

A WIP attempt to create a custom pydantic field type for Pint quantities. The idea would eventually be to use TypeAlias to create physical/dimensionful pydantic fields.

Source code in pydatalab/models/utils.py
class PintType(str):
    """A WIP attempt to create a custom pydantic field type for Pint quantities.
    The idea would eventually be to use TypeAlias to create physical/dimensionful pydantic fields.

    """

    Q = pint.Quantity

    def __init__(self, dimensions: str):
        self._dimensions = dimensions

    @classmethod
    def __get_validators__(self):
        yield self.validate

    @classmethod
    def validate(self, v):
        q = self.Q(v)
        if not q.check(self._dimensions):
            raise ValueError("Value {v} must have dimensions of mass, not {v.dimensions}")
        return q

    @classmethod
    def __modify_schema__(cls, field_schema):
        field_schema.update(type="string")
Q (DaskQuantity, NumpyQuantity, MeasurementQuantity, FormattingQuantity, NonMultiplicativeQuantity, PlainQuantity)
__init__(self, dimensions: str) special
Source code in pydatalab/models/utils.py
def __init__(self, dimensions: str):
    self._dimensions = dimensions
__get_validators__() classmethod special
Source code in pydatalab/models/utils.py
@classmethod
def __get_validators__(self):
    yield self.validate
validate(v) classmethod
Source code in pydatalab/models/utils.py
@classmethod
def validate(self, v):
    q = self.Q(v)
    if not q.check(self._dimensions):
        raise ValueError("Value {v} must have dimensions of mass, not {v.dimensions}")
    return q
__modify_schema__(field_schema) classmethod special
Source code in pydatalab/models/utils.py
@classmethod
def __modify_schema__(cls, field_schema):
    field_schema.update(type="string")
PyObjectId (ObjectId)

A wrapper class for a BSON ObjectId that can be used as a Pydantic field type.

Modified from "Getting started iwth MongoDB and FastAPI": https://www.mongodb.com/developer/languages/python/python-quickstart-fastapi/.

Source code in pydatalab/models/utils.py
class PyObjectId(ObjectId):
    """A wrapper class for a BSON ObjectId that can be used as a Pydantic field type.

    Modified from "Getting started iwth MongoDB and FastAPI":
    https://www.mongodb.com/developer/languages/python/python-quickstart-fastapi/.

    """

    @classmethod
    def __get_validators__(cls):
        yield cls.validate

    @classmethod
    def validate(cls, v):
        if isinstance(v, dict) and "$oid" in v:
            v = v["$oid"]

        if not ObjectId.is_valid(v):
            raise ValueError("Invalid ObjectId")

        return ObjectId(v)

    @classmethod
    def __modify_schema__(cls, field_schema):
        field_schema.update(type="string")
__get_validators__() classmethod special
Source code in pydatalab/models/utils.py
@classmethod
def __get_validators__(cls):
    yield cls.validate
validate(v) classmethod
Source code in pydatalab/models/utils.py
@classmethod
def validate(cls, v):
    if isinstance(v, dict) and "$oid" in v:
        v = v["$oid"]

    if not ObjectId.is_valid(v):
        raise ValueError("Invalid ObjectId")

    return ObjectId(v)
__modify_schema__(field_schema) classmethod special
Source code in pydatalab/models/utils.py
@classmethod
def __modify_schema__(cls, field_schema):
    field_schema.update(type="string")
IsoformatDateTime (datetime)

A datetime container that is more flexible than the pydantic default.

Source code in pydatalab/models/utils.py
class IsoformatDateTime(datetime.datetime):
    """A datetime container that is more flexible than the pydantic default."""

    @classmethod
    def __get_validators__(cls):
        yield cls.validate

    @classmethod
    def validate(cls, v):
        if isinstance(v, str):
            if v in ["0", " "]:
                return None
            return datetime.datetime.fromisoformat(v)

        return v

    @classmethod
    def __modify_schema__(cls, field_schema):
        field_schema.update(type="date")
__get_validators__() classmethod special
Source code in pydatalab/models/utils.py
@classmethod
def __get_validators__(cls):
    yield cls.validate
validate(v) classmethod
Source code in pydatalab/models/utils.py
@classmethod
def validate(cls, v):
    if isinstance(v, str):
        if v in ["0", " "]:
            return None
        return datetime.datetime.fromisoformat(v)

    return v
__modify_schema__(field_schema) classmethod special
Source code in pydatalab/models/utils.py
@classmethod
def __modify_schema__(cls, field_schema):
    field_schema.update(type="date")
RefCodeFactory
Source code in pydatalab/models/utils.py
class RefCodeFactory:

    refcode_generator: Callable

    @classmethod
    def generate(self):
        from pydatalab.config import CONFIG

        return f"{CONFIG.IDENTIFIER_PREFIX}:{self.refcode_generator()}"
generate() classmethod
Source code in pydatalab/models/utils.py
@classmethod
def generate(self):
    from pydatalab.config import CONFIG

    return f"{CONFIG.IDENTIFIER_PREFIX}:{self.refcode_generator()}"
RandomAlphabeticalRefcodeFactory (RefCodeFactory)
Source code in pydatalab/models/utils.py
class RandomAlphabeticalRefcodeFactory(RefCodeFactory):

    refcode_generator = partial(random_uppercase, length=6)
refcode_generator: Callable
InlineSubstance (BaseModel) pydantic-model
Source code in pydatalab/models/utils.py
class InlineSubstance(BaseModel):
    name: str
    chemform: Optional[str]
name: str pydantic-field required
chemform: str pydantic-field
EntryReference (BaseModel) pydantic-model

A reference to a database entry by ID and type.

Can include additional arbitarary metadata useful for inlining the item data.

Source code in pydatalab/models/utils.py
class EntryReference(BaseModel):
    """A reference to a database entry by ID and type.

    Can include additional arbitarary metadata useful for
    inlining the item data.

    """

    type: str
    name: Optional[str]
    immutable_id: Optional[PyObjectId]
    item_id: Optional[HumanReadableIdentifier]
    refcode: Optional[Refcode]

    @root_validator
    def check_id_fields(cls, values):
        """Check that only one of the possible identifier fields is provided."""
        id_fields = ("immutable_id", "item_id", "refcode")

        # Temporarily remove refcodes from the list of fields to check
        # until it is fully implemented
        if values.get("refcode") is not None:
            values["refcode"] = None
        if all(values.get(f) is None for f in id_fields):
            raise ValueError(f"Must provide at least one of {id_fields!r}")

        if sum(1 for f in id_fields if values.get(f) is not None) > 1:
            raise ValueError("Must provide only one of {id_fields!r}")

        return values

    class Config:
        extra = "allow"
type: str pydantic-field required
name: str pydantic-field
immutable_id: PyObjectId pydantic-field
item_id: HumanReadableIdentifier pydantic-field
refcode: Refcode pydantic-field
Methods
check_id_fields(values) classmethod

Check that only one of the possible identifier fields is provided.

Source code in pydatalab/models/utils.py
@root_validator
def check_id_fields(cls, values):
    """Check that only one of the possible identifier fields is provided."""
    id_fields = ("immutable_id", "item_id", "refcode")

    # Temporarily remove refcodes from the list of fields to check
    # until it is fully implemented
    if values.get("refcode") is not None:
        values["refcode"] = None
    if all(values.get(f) is None for f in id_fields):
        raise ValueError(f"Must provide at least one of {id_fields!r}")

    if sum(1 for f in id_fields if values.get(f) is not None) > 1:
        raise ValueError("Must provide only one of {id_fields!r}")

    return values
Constituent (BaseModel) pydantic-model

A constituent of a sample.

Source code in pydatalab/models/utils.py
class Constituent(BaseModel):
    """A constituent of a sample."""

    item: Union[EntryReference, InlineSubstance]
    """A reference to item (sample or starting material) entry for the constituent substance."""

    quantity: Optional[float] = Field(..., ge=0)
    """The amount of the constituent material used to create the sample."""

    unit: str = Field("g")
    """The unit symbol for the value provided in `quantity`, default is mass
    in grams (g) but could also refer to volumes (mL, L, etc.) or moles (mol).
    """

    @validator("item")
    def check_itemhood(cls, v):
        """Check that the reference within the constituent is to an item type."""
        if "type" in (v.value for v in ItemType):
            raise ValueError(f"`type` must be one of {ItemType!r}")

        return v

    @validator("item", pre=True, always=True)
    def coerce_reference(cls, v):
        if isinstance(v, dict):
            id = v.pop("item_id", None)
            if id:
                return EntryReference(item_id=id, **v)
            else:
                name = v.pop("name", "")
                chemform = v.pop("chemform", None)
                if not name:
                    raise ValueError("Inline substance must have a name!")
                return InlineSubstance(name=name, chemform=chemform)
        return v
item: Union[pydatalab.models.utils.EntryReference, pydatalab.models.utils.InlineSubstance] pydantic-field required
quantity: ConstrainedFloatValue pydantic-field required
unit: str pydantic-field
Methods
check_itemhood(v) classmethod

Check that the reference within the constituent is to an item type.

Source code in pydatalab/models/utils.py
@validator("item")
def check_itemhood(cls, v):
    """Check that the reference within the constituent is to an item type."""
    if "type" in (v.value for v in ItemType):
        raise ValueError(f"`type` must be one of {ItemType!r}")

    return v
coerce_reference(v) classmethod
Source code in pydatalab/models/utils.py
@validator("item", pre=True, always=True)
def coerce_reference(cls, v):
    if isinstance(v, dict):
        id = v.pop("item_id", None)
        if id:
            return EntryReference(item_id=id, **v)
        else:
            name = v.pop("name", "")
            chemform = v.pop("chemform", None)
            if not name:
                raise ValueError("Inline substance must have a name!")
            return InlineSubstance(name=name, chemform=chemform)
    return v
Functions
random_uppercase(length: int = 6)
Source code in pydatalab/models/utils.py
def random_uppercase(length: int = 6):
    return "".join(random.choices(string.ascii_uppercase, k=length))
generate_unique_refcode()

Generates a unique refcode for an item using the configured convention.

Source code in pydatalab/models/utils.py
def generate_unique_refcode():
    """Generates a unique refcode for an item using the configured convention."""
    from pydatalab.config import CONFIG
    from pydatalab.mongo import get_database

    refcode = f"{CONFIG.REFCODE_GENERATOR.generate()}"
    try:
        while get_database().items.find_one({"refcode": refcode}):
            refcode = f"{CONFIG.IDENTIFIER_PREFIX}:{CONFIG.REFCODE_GENERATOR.generate()}"
    except Exception as exc:
        raise RuntimeError(f"Cannot check refcode for uniqueness: {exc}")

    return refcode

mongo

Attributes
flask_mongo

This is the primary database interface used by the Flask app.

Functions
insert_pydantic_model_fork_safe(model: BaseModel, collection: str) -> str

Inserts a Pydantic model into chosen collection, returning the inserted ID.

Source code in pydatalab/mongo.py
def insert_pydantic_model_fork_safe(model: BaseModel, collection: str) -> str:
    """Inserts a Pydantic model into chosen collection, returning the inserted ID."""
    return (
        get_database()[collection]
        .insert_one(model.dict(by_alias=True, exclude_none=True))
        .inserted_id
    )
get_database() -> Database

Returns the configured database.

Source code in pydatalab/mongo.py
def get_database() -> pymongo.database.Database:
    """Returns the configured database."""
    return _get_active_mongo_client().get_database()
check_mongo_connection() -> None

Checks that the configured MongoDB is available and returns a pymongo.MongoClient for the configured MONGO_URI.

Source code in pydatalab/mongo.py
def check_mongo_connection() -> None:
    """Checks that the configured MongoDB is available and returns a
    `pymongo.MongoClient` for the configured `MONGO_URI`.

    Raises:
        RuntimeError:
            If the configured MongoDB is not available.

    """
    try:
        cli = _get_active_mongo_client()
        cli.list_database_names()
    except Exception as exc:
        raise RuntimeError from exc
create_default_indices(client: Optional[pymongo.mongo_client.MongoClient] = None, background: bool = False) -> List[str]

Creates indices for the configured or passed MongoClient.

Indexes created are: - A text index over all string fields in item models, - An index over item type, - A unique index over item_id and refcode. - A text index over user names and identities.

Parameters:

Name Type Description Default
background bool

If true, indexes will be created as background jobs.

False

Returns:

Type Description
List[str]

A list of messages returned by each create_index call.

Source code in pydatalab/mongo.py
def create_default_indices(
    client: Optional[pymongo.MongoClient] = None,
    background: bool = False,
) -> List[str]:
    """Creates indices for the configured or passed MongoClient.

    Indexes created are:
        - A text index over all string fields in item models,
        - An index over item type,
        - A unique index over `item_id` and `refcode`.
        - A text index over user names and identities.

    Parameters:
        background: If true, indexes will be created as background jobs.

    Returns:
        A list of messages returned by each `create_index` call.

    """
    from pydatalab.logger import LOGGER
    from pydatalab.models import ITEM_MODELS

    if client is None:
        client = _get_active_mongo_client()
    db = client.get_database()

    item_fts_fields = set()
    for model in ITEM_MODELS:
        schema = ITEM_MODELS[model].schema()
        for f in schema["properties"]:
            if schema["properties"][f].get("type") == "string":
                item_fts_fields.add(f)

    def create_or_recreate_text_index(collection, fields, weights):

        fts_index_name = f"{collection.name} full-text search"

        def create_fts():
            return collection.create_index(
                [(k, pymongo.TEXT) for k in fields],
                name=fts_index_name,
                weights=weights,
            )

        try:
            return create_fts()
        except pymongo.errors.OperationFailure:
            collection.drop_index(fts_index_name)
            return create_fts()

    ret = []

    ret += create_or_recreate_text_index(
        db.items,
        item_fts_fields,
        weights={"refcode": 3, "item_id": 3, "name": 3, "chemform": 3},
    )

    ret += create_or_recreate_text_index(
        db.collections,
        ["collection_id", "title", "description"],
        weights={"collection_id": 3, "title": 3, "description": 3},
    )

    ret += db.items.create_index("type", name="item type", background=background)
    ret += db.items.create_index(
        "item_id", unique=True, name="unique item ID", background=background
    )
    ret += db.items.create_index(
        "refcode", unique=True, name="unique refcode", background=background
    )
    ret += db.items.create_index("last_modified", name="last modified", background=background)

    user_fts_fields = {"identities.name", "display_name"}

    ret += db.users.create_index(
        [
            ("identities.identifier", pymongo.ASCENDING),
            ("identities.identity_type", pymongo.ASCENDING),
        ],
        unique=True,
        name="unique user identifiers",
        background=background,
    )
    try:
        ret += db.users.create_index(
            [(k, pymongo.TEXT) for k in user_fts_fields],
            name="user identities full-text search",
            background=background,
        )
    except Exception as exc:
        LOGGER.warning("Failed to create text index: %s", exc)

    return ret

nmr_utils

Functions
read_bruker_1d(data, process_number = 1, verbose = True, sample_mass_mg = None)

Read a 1D bruker nmr spectrum and return it as a df.

data: The directory of the full bruker data file. You may also supply a df as this argument. In this case, the df is returned as is. process_number: The process number of the processed data you want to plot [default 1] verbose: Whether to print information such as the spectrum title to stdout (default True) sample_mass_mg: The (optional) sample mass. If provided, the resulting DataFrame will have a "intensity_per_scan_per_gram" column.

Source code in pydatalab/nmr_utils.py
def read_bruker_1d(data, process_number=1, verbose=True, sample_mass_mg=None):
    """Read a 1D bruker nmr spectrum and return it as a df.

    arguments:

    data: The directory of the full bruker data file. You may also supply a df as this argument. In this case, the df is returned as is.
    process_number: The process number of the processed data you want to plot [default 1]
    verbose: Whether to print information such as the spectrum title to stdout (default True)
    sample_mass_mg: The (optional) sample mass. If provided, the resulting DataFrame will have a "intensity_per_scan_per_gram" column.
    """

    # if df is provided, just return it as-is. This functionality is provided to make functions calling read_bruker_1d flexible by default.
    # Either the data directory or the already-processed df can always be provided with equivalent results.

    if type(data) == pd.core.frame.DataFrame:
        if verbose:
            print("data frame provided to read_bruker_1d(). Returning it as is.")
        return data
    else:
        data_dir = data

    processed_data_dir = os.path.join(data_dir, "pdata", str(process_number))

    a_dic, a_data = ng.fileio.bruker.read(data_dir)  # aquisition_data
    p_dic, p_data = ng.fileio.bruker.read_pdata(processed_data_dir)  # processing data

    try:
        with open(os.path.join(processed_data_dir, "title"), "r") as f:
            topspin_title = f.read()
    except FileNotFoundError:
        topspin_title = None

    if len(p_data.shape) > 1:
        print("data is more than one dimensional - read failed")
        return None, a_dic, topspin_title, p_data.shape

    nscans = a_dic["acqus"]["NS"]

    # create a unit convertor to get the x-axis in ppm units
    udic = ng.bruker.guess_udic(p_dic, p_data)
    uc = ng.fileiobase.uc_from_udic(udic)

    ppm_scale = uc.ppm_scale()
    hz_scale = uc.hz_scale()

    df = pd.DataFrame(
        {
            "ppm": ppm_scale,
            "hz": hz_scale,
            "intensity": p_data,
            "intensity_per_scan": p_data / nscans,
        }
    )
    if sample_mass_mg:
        df["intensity_per_scan_per_gram"] = df["intensity_per_scan"] / sample_mass_mg * 1000.0

    if verbose:
        print(f"reading bruker data file. {udic[0]['label']} 1D spectrum, {nscans} scans.")
        if sample_mass_mg:
            print(
                f'sample mass was provided: {sample_mass_mg:f} mg. "intensity_per_scan_per_gram" column included. '
            )
        if topspin_title:
            print("\nTitle:\n")
            print(topspin_title)
        else:
            print("No title found in scan")

    return df, a_dic, topspin_title, a_data.shape
read_topspin_txt(filename, sample_mass_mg = None, nscans = None)
Source code in pydatalab/nmr_utils.py
def read_topspin_txt(filename, sample_mass_mg=None, nscans=None):

    MAX_HEADER_LINES = 10
    LEFTRIGHT_REGEX = r"# LEFT = (-?\d+\.\d+) ppm. RIGHT = (-?\d+\.\d+) ppm\."
    SIZE_REGEX = r"SIZE = (\d+)"

    with open(filename, "r") as f:
        header = "".join(itertools.islice(f, MAX_HEADER_LINES))  # read the first 10 lines
    # print(header)

    leftright_match = re.search(LEFTRIGHT_REGEX, header)
    if not leftright_match:
        raise ValueError("Header improperly formatted. Could not find LEFT and/or RIGHT values")
    left = float(leftright_match.group(1))
    right = float(leftright_match.group(2))

    size_match = re.search(SIZE_REGEX, header)
    if not size_match:
        raise ValueError("Header improperly formatter. Could not find SIZE value")
    size = int(size_match.group(1))

    intensity = np.genfromtxt(filename, comments="#")
    assert len(intensity) == size, "length of intensities does not match I"

    data = {
        "ppm": np.linspace(left, right, size),
        "intensity": intensity,
        "I_norm": (intensity - intensity.min()) / (intensity.max() - intensity.min()),
    }

    if sample_mass_mg and nscans:
        data["I_per_g_per_scan"] = intensity / float(sample_mass_mg) / float(nscans) * 1000

    df = pd.DataFrame(data)
    return df
integrate_1d(data, process_number = 1, sample_mass_mg = None, left = None, right = None, plot = False, verbose = False)
Source code in pydatalab/nmr_utils.py
def integrate_1d(
    data,
    process_number=1,
    sample_mass_mg=None,
    left=None,
    right=None,
    plot=False,
    verbose=False,
):
    intensity_cols = ["intensity", "intensity_per_scan", "intensity_per_scan_per_gram"]
    df = read_bruker_1d(
        data, process_number=process_number, sample_mass_mg=sample_mass_mg, verbose=verbose
    )
    if left:
        df = df[df.ppm >= left]
    if right:
        df = df[df.ppm <= right]

    if plot:
        plt.plot(df.ppm, df.intensity, "-")
        plt.plot([left, right], [0, 0], "k-", zorder=-1)
        plt.xlim(left, right)
        plt.show()

    integrated_intensities = pd.Series()
    for c in intensity_cols:
        if c not in df:
            integrated_intensities[c] = None
            continue
        integrated_intensities[c] = -1 * integrate.trapz(df[c], df.ppm)

    return integrated_intensities

remote_filesystems

Functions
get_directory_structures(directories: List[Dict[str, str]], invalidate_cache: Optional[bool] = None, parallel: bool = True) -> List[Dict[str, Any]]

For all registered top-level directories, call tree either locally or remotely to get their directory structures, or access the cached data for that directory, if it is available and fresh.

Parameters:

Name Type Description Default
directories List[Dict[str, str]]

The directories to scan.

required
invalidate_cache Optional[bool]

If true, then the cached directory structure will be reset, provided the cache was not updated very recently. If False, the cache will not be reset, even if it is older than the maximum configured age.

None
parallel bool

If true, run each remote scraper in a new process.

True

Returns:

Type Description
List[Dict[str, Any]]

A lists of dictionaries for each specified top-level directory.

Source code in pydatalab/remote_filesystems.py
def get_directory_structures(
    directories: List[Dict[str, str]],
    invalidate_cache: Optional[bool] = None,
    parallel: bool = True,
) -> List[Dict[str, Any]]:
    """For all registered top-level directories, call tree either
    locally or remotely to get their directory structures, or access
    the cached data for that directory, if it is available and fresh.

    Args:
        directories: The directories to scan.
        invalidate_cache: If true, then the cached directory structure will
            be reset, provided the cache was not updated very recently. If `False`,
            the cache will not be reset, even if it is older than the maximum configured
            age.
        parallel: If true, run each remote scraper in a new process.

    Returns:
        A lists of dictionaries for each specified top-level directory.

    """
    if not directories:
        return []

    if parallel:
        return multiprocessing.Pool(max(min(len(directories), 8), 1)).map(
            functools.partial(
                get_directory_structure,
                invalidate_cache=invalidate_cache,
            ),
            directories,
        )
    else:
        return [get_directory_structure(d, invalidate_cache=invalidate_cache) for d in directories]
get_directory_structure(directory: Dict[str, str], invalidate_cache: Optional[bool] = False) -> Dict[str, Any]

For the given remote directory, either reconstruct the directory structure in full, or access the cached version if is it recent enough.

Any errors will be returned in the contents key for a given directory.

Parameters:

Name Type Description Default
directory Dict[str, str]

A dictionary describing the directory to scan, with keys 'name', 'path' and optionally 'hostname'.

required
invalidate_cache Optional[bool]

If True, then the cached directory structure will be reset, provided the cache was not updated very recently. If False, the cache will not be reset, even if it is older than the maximum configured age.

False

Returns:

Type Description
Dict[str, Any]

A dictionary with keys "name", "type" and "contents" for the top-level directory.

Source code in pydatalab/remote_filesystems.py
def get_directory_structure(
    directory: Dict[str, str],
    invalidate_cache: Optional[bool] = False,
) -> Dict[str, Any]:
    """For the given remote directory, either reconstruct the directory
    structure in full, or access the cached version if is it recent
    enough.

    Any errors will be returned in the `contents` key for a given
    directory.

    Args:
        directory: A dictionary describing the directory to scan, with keys
            `'name'`, `'path'` and optionally `'hostname'`.
        invalidate_cache: If `True`, then the cached directory structure will
            be reset, provided the cache was not updated very recently. If `False`,
            the cache will not be reset, even if it is older than the maximum configured
            age.

    Returns:
        A dictionary with keys "name", "type" and "contents" for the
        top-level directory.

    """

    LOGGER.debug(f"Accessing directory structure of {directory}")

    try:
        cached_dir_structure = _get_cached_directory_structure(directory)
        cache_last_updated = None
        if cached_dir_structure:
            cache_last_updated = cached_dir_structure["last_updated"]
            cache_age = datetime.datetime.now() - cached_dir_structure["last_updated"]
            if invalidate_cache and cache_age < datetime.timedelta(
                minutes=CONFIG.REMOTE_CACHE_MIN_AGE
            ):
                LOGGER.debug(
                    f"Not invalidating cache as its age ({cache_age=}) is less than the configured {CONFIG.REMOTE_CACHE_MIN_AGE=}."
                )

        # If either:
        #     1) no cache for this directory,
        #     2) the cache is older than the max cache age and
        #        `invalidate_cache` has not been explicitly set to false,
        #     3) the `invalidate_cache` parameter is true, and the cache
        #        is older than the min age,
        # then rebuild the cache.
        if (
            (not cached_dir_structure)
            or (
                invalidate_cache is not False
                and cache_age > datetime.timedelta(minutes=CONFIG.REMOTE_CACHE_MAX_AGE)
            )
            or (
                invalidate_cache
                and cache_age > datetime.timedelta(minutes=CONFIG.REMOTE_CACHE_MIN_AGE)
            )
        ):
            dir_structure = _get_latest_directory_structure(
                directory["path"], directory.get("hostname")
            )
            last_updated = _save_directory_structure(
                directory,
                dir_structure,
            )
            LOGGER.debug(
                "Remote filesystems cache miss for '%s': last updated %s",
                directory["name"],
                cache_last_updated,
            )

        else:
            last_updated = cached_dir_structure["last_updated"]
            dir_structure = cached_dir_structure["contents"]
            LOGGER.debug(
                "Remote filesystems cache hit for '%s': last updated %s",
                directory["name"],
                last_updated,
            )

    except RuntimeError as exc:
        dir_structure = [{"type": "error", "details": str(exc)}]
        last_updated = datetime.datetime.now()

    return {
        "name": directory["name"],
        "type": "toplevel",
        "contents": dir_structure,
        "last_updated": last_updated,
    }

routes special

Modules
utils
Functions
get_default_permissions(user_only: bool = True) -> Dict[str, Any]

Return the MongoDB query terms corresponding to the current user.

Will return open permissions if a) the CONFIG.TESTING parameter is True, or b) if the current user is registered as an admin.

Parameters:

Name Type Description Default
user_only bool

Whether to exclude items that also have no attached user (False), i.e., public items. This should be set to False when reading (and wanting to return public items), but left as True when modifying or removing items.

True
Source code in pydatalab/routes/utils.py
def get_default_permissions(user_only: bool = True) -> Dict[str, Any]:
    """Return the MongoDB query terms corresponding to the current user.

    Will return open permissions if a) the `CONFIG.TESTING` parameter is `True`,
    or b) if the current user is registered as an admin.

    Parameters:
        user_only: Whether to exclude items that also have no attached user (`False`),
            i.e., public items. This should be set to `False` when reading (and wanting
            to return public items), but left as `True` when modifying or removing items.

    """

    if CONFIG.TESTING:
        return {}

    if (
        current_user.is_authenticated
        and current_user.person is not None
        and current_user.role == UserRole.ADMIN
    ):
        return {}

    null_perm = {"creator_ids": {"$size": 0}}
    if current_user.is_authenticated and current_user.person is not None:
        user_perm = {"creator_ids": {"$in": [current_user.person.immutable_id]}}
        if user_only:
            return user_perm
        return {"$or": [user_perm, null_perm]}

    elif user_only:
        return {"_id": -1}

    return null_perm
v0_1 special
BLUEPRINTS
ENDPOINTS: Dict[str, Callable]
Modules
auth

This module implements functionality for authenticating users via OAuth2 providers, and associating these OAuth2 identities with their local accounts.

Attributes
ENDPOINTS: Dict[str, Callable]
KEY_LENGTH: int
OAUTH_BLUEPRINTS: Dict[pydatalab.models.people.IdentityType, flask.blueprints.Blueprint]

A dictionary of Flask blueprints corresponding to the supported OAuth2 providers.

OAUTH_PROXIES: Dict[pydatalab.models.people.IdentityType, werkzeug.local.LocalProxy]

A dictionary of proxy objects (c.f. Flask context locals) corresponding to the supported OAuth2 providers, and can be used to make further authenticated requests out to the providers.

Functions
wrapped_login_user(*args, **kwargs)
Source code in pydatalab/routes/v0_1/auth.py
def wrapped_login_user(*args, **kwargs):
    LOGGER.warning("Logging in user %s with role %s", args[0].display_name, args[0].role)
    login_user(*args, **kwargs)
find_create_or_modify_user(identifier: str, identity_type: Union[str, pydatalab.models.people.IdentityType], identity_name: str, display_name: Optional[str] = None, verified: bool = False, create_account: bool = False) -> None

Search for a user account with the given identifier and identity type, creating or connecting one if it does not exist.

1. Find any user with the given identity, if found, return it.
2. If no user exists, check if there is currently a user logged in:
    - If so, attach the identity to the current user.
    - If not, create an entry in the user database with this identity.
3. Log in as the user for this session.
Source code in pydatalab/routes/v0_1/auth.py
def find_create_or_modify_user(
    identifier: str,
    identity_type: Union[str, IdentityType],
    identity_name: str,
    display_name: Optional[str] = None,
    verified: bool = False,
    create_account: bool = False,
) -> None:
    """Search for a user account with the given identifier and identity type, creating
    or connecting one if it does not exist.

        1. Find any user with the given identity, if found, return it.
        2. If no user exists, check if there is currently a user logged in:
            - If so, attach the identity to the current user.
            - If not, create an entry in the user database with this identity.
        3. Log in as the user for this session.

    """

    def find_user_with_identity(
        identifier: str,
        identity_type: Union[str, IdentityType],
    ) -> Optional[Person]:
        """Look up the given identity in the users database."""
        user = flask_mongo.db.users.find_one(
            {"identities.identifier": identifier, "identities.identity_type": identity_type},
        )
        if user:
            person = Person(**user)
            identity_indices: list[int] = [
                ind
                for ind, _ in enumerate(person.identities)
                if (_.identity_type == identity_type and _.identifier == identifier)
            ]
            if len(identity_indices) != 1:
                raise RuntimeError(
                    "Unexpected error: multiple or no identities matched the OAuth token."
                )

            identity_index = identity_indices[0]

            if not person.identities[identity_index].verified:
                flask_mongo.db.users.update_one(
                    {"_id": person.immutable_id},
                    {"$set": {f"identities.{identity_index}.verified": True}},
                )

            return person

        return None

    def attach_identity_to_user(
        user_id: str,
        identity: Identity,
        use_display_name: bool = False,
        use_contact_email: bool = False,
    ) -> None:
        """Associates an OAuth ID with a user entry in the database.

        This function is currently brittle and would need to be updated
        if the corresponding `Person` schema changes due to the hard-coded
        field names.

        Parameters:
            user_id: The database ID of the user as a string.
            identity: The identity to associate.
            use_display_name: Whether to set the user's top-level display name with a
                display name provided by this identity.
            use_contact_email: Whether to set the user's top-level contact email with
                an email address provided by this identity.

        Raises:
            RuntimeError: If the update was unsuccessful.

        """
        update = {"$push": {"identities": identity.dict()}}
        if use_display_name and identity.display_name:
            update["$set"] = {"display_name": identity.display_name}

        if use_contact_email and identity.identity_type is IdentityType.EMAIL and identity.verified:
            update["$set"] = {"contact_email": identity.identifier}

        result = flask_mongo.db.users.update_one(
            {"_id": ObjectId(user_id)},
            update,
        )

        if result.matched_count != 1:
            raise RuntimeError(
                f"Attempted to modify user {user_id} but performed {result.matched_count} updates. Results:\n{result.raw_result}"
            )

    user = find_user_with_identity(identifier, identity_type)

    # If no user was found in the database with the OAuth ID, make or modify one:
    if not user:
        identity = Identity(
            identifier=identifier,
            identity_type=identity_type,
            name=identity_name,
            display_name=display_name,
            verified=verified,
        )

        # If there is currently a user logged in who has gone through OAuth with a new identity,
        # then update the user database with the identity
        if current_user.is_authenticated:
            attach_identity_to_user(
                current_user.id,
                identity,
                use_display_name=True if current_user.display_name is None else False,
            )
            current_user.refresh()
            user = current_user.person

        # If there is no current authenticated user, make one with the current OAuth identity
        else:
            if not create_account:
                raise UserRegistrationForbidden

            user = Person.new_user_from_identity(identity, use_display_name=True)
            wrapped_login_user(get_by_id_cached(str(user.immutable_id)))
            LOGGER.debug("Inserting new user model %s into database", user)
            insert_pydantic_model_fork_safe(user, "users")

    # Log the user into the session with this identity
    if user is not None:
        wrapped_login_user(get_by_id_cached(str(user.immutable_id)))
github_logged_in(blueprint, token)

This Flask signal hooks into any attempt to use the GitHub blueprint, and will make a user account with this identity if not already present in the database.

Makes one authorized request to the GitHub API to get the user's GitHub ID, username and display name, without storing the OAuth token.

Source code in pydatalab/routes/v0_1/auth.py
@oauth_authorized.connect_via(OAUTH_BLUEPRINTS[IdentityType.GITHUB])
def github_logged_in(blueprint, token):
    """This Flask signal hooks into any attempt to use the GitHub blueprint, and will
    make a user account with this identity if not already present in the database.

    Makes one authorized request to the GitHub API to get the user's GitHub ID,
    username and display name, without storing the OAuth token.

    """
    if not token:
        return False

    resp = blueprint.session.get("/user")
    if not resp.ok:
        return False

    github_info = resp.json()
    github_user_id = str(github_info["id"])
    username = str(github_info["login"])
    name = str(github_info["name"])

    org_membership = blueprint.session.get(f"/users/{username}/orgs").json()
    if CONFIG.GITHUB_ORG_ALLOW_LIST:
        create_account = any(
            str(org["id"]) in CONFIG.GITHUB_ORG_ALLOW_LIST for org in org_membership
        )
    else:
        create_account = False

    find_create_or_modify_user(
        github_user_id,
        IdentityType.GITHUB,
        username,
        display_name=name,
        verified=True,
        create_account=create_account,
    )

    # Return false to prevent Flask-dance from trying to store the token elsewhere
    return False
orcid_logged_in(_, token)

This signal hooks into any attempt to use the ORCID blueprint, and will associate a user account with this identity if not already present in the database.

The OAuth token is not stored alongside the user.

Source code in pydatalab/routes/v0_1/auth.py
@oauth_authorized.connect_via(OAUTH_BLUEPRINTS[IdentityType.ORCID])
def orcid_logged_in(_, token):
    """This signal hooks into any attempt to use the ORCID blueprint, and will
    associate a user account with this identity if not already present in the database.

    The OAuth token is not stored alongside the user.

    """
    if not token:
        return False

    find_create_or_modify_user(
        token["orcid"],
        IdentityType.ORCID,
        token["orcid"],
        display_name=token["name"],
        verified=True,
    )

    # Return false to prevent Flask-dance from trying to store the token elsewhere
    return False
redirect_to_ui(blueprint, token)

Intercepts the default Flask-Dance and redirects to the referring page.

Source code in pydatalab/routes/v0_1/auth.py
@oauth_authorized.connect
def redirect_to_ui(blueprint, token):  # pylint: disable=unused-argument
    """Intercepts the default Flask-Dance and redirects to the referring page."""
    from flask import request

    referer = request.headers.get("Referer", "/")
    return redirect(referer)
get_authenticated_user_info()

Returns metadata associated with the currently authenticated user.

Source code in pydatalab/routes/v0_1/auth.py
def get_authenticated_user_info():
    """Returns metadata associated with the currently authenticated user."""
    if current_user.is_authenticated:
        return jsonify(json.loads(current_user.person.json())), 200
    else:
        return jsonify({"status": "failure", "message": "User must be authenticated."}), 401
generate_user_api_key()

Returns metadata associated with the currently authenticated user.

Source code in pydatalab/routes/v0_1/auth.py
def generate_user_api_key():
    """Returns metadata associated with the currently authenticated user."""
    if current_user.is_authenticated and current_user.role == "admin":
        new_key = "".join(random.choices(ascii_letters, k=KEY_LENGTH))
        flask_mongo.db.api_keys.update_one(
            {"_id": ObjectId(current_user.id)},
            {"$set": {"hash": sha512(new_key.encode("utf-8")).hexdigest()}},
            upsert=True,
        )
        return jsonify({"key": new_key}), 200
    else:
        return (
            jsonify(
                {
                    "status": "failure",
                    "message": "User must be an authenticated admin to request an API key.",
                }
            ),
            401,
        )
blocks
ENDPOINTS: Dict[str, Callable]
Functions
add_data_block()

Call with AJAX to add a block to the sample

Source code in pydatalab/routes/v0_1/blocks.py
def add_data_block():
    """Call with AJAX to add a block to the sample"""

    request_json = request.get_json()

    # pull out required arguments from json
    block_type = request_json["block_type"]
    item_id = request_json["item_id"]
    insert_index = request_json["index"]

    if block_type not in BLOCK_TYPES:
        return jsonify(status="error", message="Invalid block type"), 400

    block = BLOCK_TYPES[block_type](item_id=item_id)

    data = block.to_db()

    # currently, adding to both blocks and blocks_obj to mantain compatibility with
    # the old site. The new site only uses blocks_obj
    if insert_index:
        display_order_update = {
            "$each": [block.block_id],
            "$position": insert_index,
        }
    else:
        display_order_update = block.block_id

    result = flask_mongo.db.items.update_one(
        {"item_id": item_id, **get_default_permissions(user_only=True)},
        {
            "$push": {"blocks": data, "display_order": display_order_update},
            "$set": {f"blocks_obj.{block.block_id}": data},
        },
    )

    if result.modified_count < 1:
        return (
            jsonify(
                status="error",
                message=f"Update failed. {item_id=} is probably incorrect.",
            ),
            400,
        )

    # get the new display_order:
    display_order_result = flask_mongo.db.items.find_one(
        {"item_id": item_id, **get_default_permissions(user_only=True)}, {"display_order": 1}
    )

    return jsonify(
        status="success",
        new_block_obj=block.to_web(),
        new_block_insert_index=insert_index
        if insert_index is None
        else len(display_order_result["display_order"]) - 1,
        new_display_order=display_order_result["display_order"],
    )
add_collection_data_block()

Call with AJAX to add a block to the collection.

Source code in pydatalab/routes/v0_1/blocks.py
def add_collection_data_block():
    """Call with AJAX to add a block to the collection."""

    request_json = request.get_json()

    # pull out required arguments from json
    block_type = request_json["block_type"]
    collection_id = request_json["collection_id"]
    insert_index = request_json["index"]

    if block_type not in BLOCK_TYPES:
        return jsonify(status="error", message="Invalid block type"), 400

    block = BLOCK_TYPES[block_type](collection_id=collection_id)

    data = block.to_db()

    # currently, adding to both blocks and blocks_obj to mantain compatibility with
    # the old site. The new site only uses blocks_obj
    if insert_index:
        display_order_update = {
            "$each": [block.block_id],
            "$position": insert_index,
        }
    else:
        display_order_update = block.block_id

    result = flask_mongo.db.collections.update_one(
        {"collection_id": collection_id, **get_default_permissions(user_only=True)},
        {
            "$push": {"blocks": data, "display_order": display_order_update},
            "$set": {f"blocks_obj.{block.block_id}": data},
        },
    )

    if result.modified_count < 1:
        return (
            jsonify(
                status="error",
                message=f"Update failed. {collection_id=} is probably incorrect.",
            ),
            400,
        )

    # get the new display_order:
    display_order_result = flask_mongo.db.items.find_one(
        {"collection_id": collection_id, **get_default_permissions(user_only=True)},
        {"display_order": 1},
    )

    return jsonify(
        status="success",
        new_block_obj=block.to_web(),
        new_block_insert_index=insert_index
        if insert_index is None
        else len(display_order_result["display_order"]) - 1,
        new_display_order=display_order_result["display_order"],
    )
update_block()

Take in json block data from site, process, and spit out updated data. May be used, for example, when the user changes plot parameters and the server needs to generate a new plot.

Source code in pydatalab/routes/v0_1/blocks.py
def update_block():
    """Take in json block data from site, process, and spit
    out updated data. May be used, for example, when the user
    changes plot parameters and the server needs to generate a new
    plot.
    """

    request_json = request.get_json()
    block_data = request_json["block_data"]
    blocktype = block_data["blocktype"]
    save_to_db = request_json.get("save_to_db", False)

    block = BLOCK_TYPES[blocktype].from_web(block_data)

    saved_successfully = False
    if save_to_db:
        saved_successfully = _save_block_to_db(block)

    return (
        jsonify(
            status="success", saved_successfully=saved_successfully, new_block_data=block.to_web()
        ),
        200,
    )
delete_block()

Completely delete a data block from the database. In the future, we may consider preserving data by moving it to a different array, or simply making it invisible

Source code in pydatalab/routes/v0_1/blocks.py
def delete_block():
    """Completely delete a data block from the database. In the future,
    we may consider preserving data by moving it to a different array,
    or simply making it invisible"""
    request_json = request.get_json()
    item_id = request_json["item_id"]
    block_id = request_json["block_id"]

    result = flask_mongo.db.items.update_one(
        {"item_id": item_id, **get_default_permissions(user_only=True)},
        {
            "$pull": {
                "blocks": {"block_id": block_id},
                "display_order": block_id,
            },
            "$unset": {f"blocks_obj.{block_id}": ""},
        },
    )

    if result.modified_count < 1:
        return (
            jsonify(
                {
                    "status": "error",
                    "message": f"Update failed. The item_id probably incorrect: {item_id}",
                }
            ),
            400,
        )
    return (
        jsonify({"status": "success"}),
        200,
    )  # could try to switch to http 204 is "No Content" success with no json
delete_collection_block()

Completely delete a data block from the database that is currently attached to a collection.

In the future, we may consider preserving data by moving it to a different array, or simply making it invisible

Source code in pydatalab/routes/v0_1/blocks.py
def delete_collection_block():
    """Completely delete a data block from the database that is currently
    attached to a collection.

    In the future, we may consider preserving data by moving it to a different array,
    or simply making it invisible"""
    request_json = request.get_json()
    collection_id = request_json["collection_id"]
    block_id = request_json["block_id"]

    result = flask_mongo.db.collections.update_one(
        {"collection_id": collection_id, **get_default_permissions(user_only=True)},
        {
            "$pull": {
                "blocks": {"block_id": block_id},
                "display_order": block_id,
            },
            "$unset": {f"blocks_obj.{block_id}": ""},
        },
    )

    if result.modified_count < 1:
        return (
            jsonify(
                {
                    "status": "error",
                    "message": f"Update failed. The collection_id probably incorrect: {collection_id}",
                }
            ),
            400,
        )
    return (
        jsonify({"status": "success"}),
        200,
    )
collections
collection
get_collections()
Source code in pydatalab/routes/v0_1/collections.py
@collection.route("/collections/")
def get_collections():

    collections = flask_mongo.db.collections.aggregate(
        [
            {"$match": get_default_permissions(user_only=True)},
            {"$lookup": creators_lookup()},
            {"$project": {"_id": 0}},
            {"$sort": {"_id": -1}},
        ]
    )

    return jsonify({"status": "success", "data": list(collections)})
get_collection(collection_id)
Source code in pydatalab/routes/v0_1/collections.py
@collection.route("/collections/<collection_id>", methods=["GET"])
def get_collection(collection_id):

    cursor = flask_mongo.db.collections.aggregate(
        [
            {
                "$match": {
                    "collection_id": collection_id,
                    **get_default_permissions(user_only=True),
                }
            },
            {"$lookup": creators_lookup()},
            {"$sort": {"_id": -1}},
        ]
    )

    try:
        doc = list(cursor)[0]
    except IndexError:
        doc = None

    if not doc or (not current_user.is_authenticated and not CONFIG.TESTING):
        return (
            jsonify(
                {
                    "status": "error",
                    "message": f"No matching collection {collection_id=} with current authorization.",
                }
            ),
            404,
        )

    collection = Collection(**doc)

    samples = list(
        get_samples_summary(
            match={
                "relationships.type": "collections",
                "relationships.immutable_id": collection.immutable_id,
            },
            project={"collections": 0},
        )
    )

    collection.num_items = len(samples)

    return jsonify(
        {
            "status": "success",
            "collection_id": collection_id,
            "data": json.loads(collection.json(exclude_unset=True)),
            "child_items": list(samples),
        }
    )
create_collection()
Source code in pydatalab/routes/v0_1/collections.py
@collection.route("/collections/", methods=["PUT"])
def create_collection():
    request_json = request.get_json()  # noqa: F821 pylint: disable=undefined-variable
    data = request_json.get("data", {})
    copy_from_id = request_json.get("copy_from_collection_id", None)
    starting_members = data.get("starting_members", [])

    if not current_user.is_authenticated and not CONFIG.TESTING:
        return (
            dict(
                status="error",
                message="Unable to create new collection without user authentication.",
                collection_id=data.get("collection_id"),
            ),
            401,
        )

    if copy_from_id:
        raise NotImplementedError("Copying collections is not yet implemented.")

    if CONFIG.TESTING:
        data["creator_ids"] = [24 * "0"]
        data["creators"] = [
            {"display_name": "Public testing user", "contact_email": "datalab@odbx.science"}
        ]
    else:
        data["creator_ids"] = [current_user.person.immutable_id]
        data["creators"] = [
            {
                "display_name": current_user.person.display_name,
                "contact_email": current_user.person.contact_email,
            }
        ]

    # check to make sure that item_id isn't taken already
    if flask_mongo.db.collections.find_one({"collection_id": data["collection_id"]}):
        return (
            dict(
                status="error",
                message=f"collection_id_validation_error: {data['collection_id']!r} already exists in database.",
                collection_id=data["collection_id"],
            ),
            409,  # 409: Conflict
        )

    data["date"] = data.get("date", datetime.datetime.now())

    try:
        data_model = Collection(**data)

    except ValidationError as error:
        return (
            dict(
                status="error",
                message=f"Unable to create new collection with ID {data['collection_id']}.",
                item_id=data["collection_id"],
                output=str(error),
            ),
            400,
        )

    result: InsertOneResult = flask_mongo.db.collections.insert_one(
        data_model.dict(exclude={"creators"})
    )
    if not result.acknowledged:
        return (
            dict(
                status="error",
                message=f"Failed to add new collection {data['collection_id']!r} to database.",
                collection_id=data["collection_id"],
                output=result.raw_result,
            ),
            400,
        )

    immutable_id = result.inserted_id

    errors = []
    if starting_members:
        item_ids = set(d.get("item_id") for d in starting_members)
        if None in item_ids:
            item_ids.remove(None)

        results: UpdateResult = flask_mongo.db.items.update_many(
            {
                "item_id": {"$in": list(item_ids)},
                **get_default_permissions(user_only=True),
            },
            {"$push": {"relationships": {"type": "collections", "immutable_id": immutable_id}}},
        )

        data_model.num_items = results.modified_count

        if results.modified_count < len(starting_members):
            errors = [
                item_id
                for item_id in starting_members
                if item_id not in results.raw_result.get("upserted", [])
            ]

    else:
        data_model.num_items = 0

    response = {
        "status": "success",
        "data": json.loads(data_model.json()),
    }

    if errors:
        response["warnings"] = [
            f"Unable to register {errors} to new collection {data_model.collection_id}"
        ]

    return (
        jsonify(response),
        201,  # 201: Created
    )
save_collection(collection_id)
Source code in pydatalab/routes/v0_1/collections.py
@collection.route("/collections/<collection_id>", methods=["PATCH"])
@logged_route
def save_collection(collection_id):

    request_json = request.get_json()  # noqa: F821 pylint: disable=undefined-variable
    updated_data = request_json.get("data")

    if not updated_data:
        return (
            jsonify(
                status="error",
                message=f"Unable to find any data in request to update {collection_id=} with.",
            ),
            204,  # 204: No content
        )

    # These keys should not be updated here and cannot be modified by the user through this endpoint
    for k in ("_id", "file_ObjectIds", "creators", "creator_ids", "collection_id"):
        if k in updated_data:
            del updated_data[k]

    updated_data["last_modified"] = datetime.datetime.now().isoformat()

    collection = flask_mongo.db.collections.find_one(
        {"collection_id": collection_id, **get_default_permissions(user_only=True)}
    )

    if not collection:
        return (
            jsonify(
                status="error",
                message=f"Unable to find item with appropriate permissions and {collection_id=}.",
            ),
            400,
        )

    collection.update(updated_data)

    try:
        collection = Collection(**collection).dict()
    except ValidationError as exc:
        return (
            jsonify(
                status="error",
                message=f"Unable to update item {collection_id=} with new data {updated_data}",
                output=str(exc),
            ),
            400,
        )

    result: UpdateResult = flask_mongo.db.collections.update_one(
        {"collection_id": collection_id},
        {"$set": collection},
    )

    if result.modified_count != 1:
        return (
            jsonify(
                status="error",
                message=f"Unable to update item {collection_id=} with new data {updated_data}",
                output=result.raw_result,
            ),
            400,
        )

    return jsonify(status="success"), 200
delete_collection(collection_id: str)
Source code in pydatalab/routes/v0_1/collections.py
@collection.route("/collections/<collection_id>", methods=["DELETE"])
def delete_collection(collection_id: str):

    result = flask_mongo.db.collections.delete_one(
        {"collection_id": collection_id, **get_default_permissions(user_only=True)}
    )

    if result.deleted_count != 1:
        return (
            jsonify(
                {
                    "status": "error",
                    "message": f"Authorization required to attempt to delete collection with {collection_id=} from the database.",
                }
            ),
            401,
        )
    return (
        jsonify(
            {
                "status": "success",
            }
        ),
        200,
    )
search_collections()
Source code in pydatalab/routes/v0_1/collections.py
@collection.route("/search-collections/", methods=["GET"])
def search_collections():
    query = request.args.get("query", type=str)
    nresults = request.args.get("nresults", default=100, type=int)

    match_obj = {"$text": {"$search": query}, **get_default_permissions(user_only=True)}

    cursor = [
        json.loads(Collection(**doc).json(exclude_unset=True))
        for doc in flask_mongo.db.collections.aggregate(
            [
                {"$match": match_obj},
                {"$sort": {"score": {"$meta": "textScore"}}},
                {"$limit": nresults},
                {
                    "$project": {
                        "collection_id": 1,
                        "title": 1,
                    }
                },
            ]
        )
    ]

    return jsonify({"status": "success", "data": list(cursor)}), 200
files
ENDPOINTS: Dict[str, Callable]
Functions
get_file(file_id, filename)
Source code in pydatalab/routes/v0_1/files.py
def get_file(file_id, filename):
    if not current_user.is_authenticated and not CONFIG.TESTING:
        return (
            jsonify(
                {
                    "status": "error",
                    "title": "Not Authorized",
                    "detail": "File access requires login.",
                }
            ),
            401,
        )
    path = os.path.join(CONFIG.FILE_DIRECTORY, secure_filename(file_id))
    return send_from_directory(path, filename)
upload()

method to upload files to the server todo: think more about security, size limits, and about nested folders

Source code in pydatalab/routes/v0_1/files.py
def upload():
    """method to upload files to the server
    todo: think more about security, size limits, and about nested folders
    """

    if not current_user.is_authenticated and not CONFIG.TESTING:
        return (
            jsonify(
                {
                    "status": "error",
                    "title": "Not Authorized",
                    "detail": "File upload requires login.",
                }
            ),
            401,
        )

    if len(request.files) == 0:
        return jsonify(error="No file in request"), 400
    if "item_id" not in request.form:
        return jsonify(error="No item id provided in form"), 400
    item_id = request.form["item_id"]
    replace_file_id = request.form["replace_file"]

    is_update = replace_file_id and replace_file_id != "null"
    for filekey in request.files:  # pretty sure there is just 1 per request
        file = request.files[
            filekey
        ]  # just a weird thing about the request that comes from uppy. The key is "files[]"
        if is_update:
            file_information = file_utils.update_uploaded_file(file, ObjectId(replace_file_id))
        else:
            file_information = file_utils.save_uploaded_file(file, item_ids=[item_id])

    return (
        jsonify(
            {
                "status": "success",
                "file_id": str(file_information["_id"]),
                "file_information": file_information,
                "is_update": is_update,  # true if update, false if new file
            }
        ),
        201,
    )
add_remote_file_to_sample()
Source code in pydatalab/routes/v0_1/files.py
def add_remote_file_to_sample():

    if not current_user.is_authenticated and not CONFIG.TESTING:
        return (
            jsonify(
                {
                    "status": "error",
                    "title": "Not Authorized",
                    "detail": "Adding a file to a sample requires login.",
                }
            ),
            401,
        )

    request_json = request.get_json()
    item_id = request_json["item_id"]
    file_entry = request_json["file_entry"]

    updated_file_entry = file_utils.add_file_from_remote_directory(file_entry, item_id)

    return (
        jsonify(
            {
                "status": "success",
                "file_id": str(updated_file_entry["_id"]),
                "file_information": updated_file_entry,
            }
        ),
        201,
    )
delete_file_from_sample()

Remove a file from a sample, but don't delete the actual file (for now)

Source code in pydatalab/routes/v0_1/files.py
def delete_file_from_sample():
    """Remove a file from a sample, but don't delete the actual file (for now)"""

    if not current_user.is_authenticated and not CONFIG.TESTING:
        return (
            jsonify(
                {
                    "status": "error",
                    "title": "Not Authorized",
                    "detail": "Adding a file to a sample requires login.",
                }
            ),
            401,
        )

    request_json = request.get_json()

    item_id = request_json["item_id"]
    file_id = ObjectId(request_json["file_id"])
    result = pydatalab.mongo.flask_mongo.db.items.update_one(
        {"item_id": item_id, **get_default_permissions(user_only=True)},
        {"$pull": {"file_ObjectIds": file_id}},
    )
    if result.modified_count != 1:
        return (
            jsonify(
                status="error",
                message=f"Not authorized to perform file removal from sample {item_id=}",
                output=result.raw_result,
            ),
            401,
        )
    updated_file_entry = pydatalab.mongo.flask_mongo.db.files.find_one_and_update(
        {"_id": file_id},
        {"$pull": {"item_ids": item_id}},
        return_document=ReturnDocument.AFTER,
    )

    if not updated_file_entry:
        return (
            jsonify(
                status="error",
                message=f"{item_id} {file_id} delete failed. Something went wrong with the db call to remove sample from file",
            ),
            400,
        )

    return (
        jsonify(
            {
                "status": "success",
                "new_file_obj": {request_json["file_id"]: updated_file_entry},
            }
        ),
        200,
    )
delete_file()

delete a data file from the uploads/item_id folder

Source code in pydatalab/routes/v0_1/files.py
def delete_file():
    """delete a data file from the uploads/item_id folder"""

    if not current_user.is_authenticated and not CONFIG.TESTING:
        return (
            jsonify(
                {
                    "status": "error",
                    "title": "Not Authorized",
                    "detail": "Adding a file to a sample requires login.",
                }
            ),
            401,
        )

    request_json = request.get_json()

    item_id = request_json["item_id"]
    filename = request_json["filename"]

    secure_item_id = secure_filename(item_id)
    secure_fname = secure_filename(filename)

    path = os.path.join(CONFIG.FILE_DIRECTORY, secure_item_id, secure_fname)

    if not os.path.isfile(path):
        return (
            jsonify(
                status="error",
                message="Delete failed. file not found: {}".format(path),
            ),
            400,
        )

    result = pydatalab.mongo.flask_mongo.db.items.update_one(
        {"item_id": item_id, **get_default_permissions(user_only=True)},
        {"$pull": {"files": filename}},
        return_document=ReturnDocument.AFTER,
    )
    if result.matched_count != 1:
        return (
            jsonify(
                status="error",
                message=f"{item_id} {filename} delete failed. Something went wrong with the db call. File not deleted.",
                output=result.raw_result,
            ),
            400,
        )
    os.remove(path)

    return jsonify({"status": "success"}), 200
graphs
ENDPOINTS: Dict[str, Callable]
get_graph_cy_format(item_id: Optional[str] = None)
Source code in pydatalab/routes/v0_1/graphs.py
def get_graph_cy_format(item_id: Optional[str] = None):

    if item_id is None:
        all_documents = flask_mongo.db.items.find(
            get_default_permissions(user_only=False),
            projection={"item_id": 1, "name": 1, "type": 1, "relationships": 1},
        )
        node_ids = {document["item_id"] for document in all_documents}
        all_documents.rewind()

    else:
        all_documents = list(
            flask_mongo.db.items.find(
                {
                    "$or": [{"item_id": item_id}, {"relationships.item_id": item_id}],
                    **get_default_permissions(user_only=False),
                },
                projection={"item_id": 1, "name": 1, "type": 1, "relationships": 1},
            )
        )

        node_ids = {document["item_id"] for document in all_documents}
        if len(node_ids) > 1:
            next_shell = flask_mongo.db.items.find(
                {
                    "$or": [
                        *[{"item_id": id} for id in node_ids if id != item_id],
                        *[{"relationships.item_id": id} for id in node_ids if id != item_id],
                    ],
                    **get_default_permissions(user_only=False),
                },
                projection={"item_id": 1, "name": 1, "type": 1, "relationships": 1},
            )

            node_ids = node_ids | {document["item_id"] for document in next_shell}
            all_documents.extend(next_shell)

    nodes = []
    edges = []
    for document in all_documents:

        nodes.append(
            {
                "data": {
                    "id": document["item_id"],
                    "name": document["name"],
                    "type": document["type"],
                    "special": document["item_id"] == item_id,
                }
            }
        )

        if not document.get("relationships"):
            continue

        for relationship in document["relationships"]:
            # only considering child-parent relationships:
            if relationship["relation"] not in ("parent", "is_part_of"):
                continue

            target = document["item_id"]
            source = relationship["item_id"]
            if source not in node_ids:
                continue
            edges.append(
                {
                    "data": {
                        "id": f"{source}->{target}",
                        "source": source,
                        "target": target,
                        "value": 1,
                    }
                }
            )

    # We want to filter out all the starting materials that don't have relationships since there are so many of them:
    whitelist = {edge["data"]["source"] for edge in edges}

    nodes = [
        node
        for node in nodes
        if node["data"]["type"] in ("samples", "cells") or node["data"]["id"] in whitelist
    ]

    return (jsonify(status="success", nodes=nodes, edges=edges), 200)
healthcheck
ENDPOINTS: Dict[str, Callable]
is_ready()
Source code in pydatalab/routes/v0_1/healthcheck.py
def is_ready():

    from pydatalab.mongo import check_mongo_connection

    try:
        check_mongo_connection()
    except RuntimeError:
        return (
            jsonify(status="error", message="Unable to connect to MongoDB at specified URI."),
            500,
        )
    return (jsonify(status="success", message="Server and database are ready"), 200)
is_alive()
Source code in pydatalab/routes/v0_1/healthcheck.py
def is_alive():
    return (jsonify(status="success", message="Server is alive"), 200)
info

This submodule defines introspective info endpoints of the API.

ENDPOINTS: Dict[str, Callable]
Attributes (BaseModel) pydantic-model
Source code in pydatalab/routes/v0_1/info.py
class Attributes(BaseModel):
    class Config:
        extra = "allow"
Meta (BaseModel) pydantic-model
Source code in pydatalab/routes/v0_1/info.py
class Meta(BaseModel):

    timestamp: datetime = Field(default_factory=datetime.now)
    query: str = ""
    api_version: str = __api_version__
    available_api_versions: List[str] = [__api_version__]
    server_version: str = __version__
    datamodel_version: str = __version__
timestamp: datetime pydantic-field
query: str pydantic-field
api_version: str pydantic-field
available_api_versions: List[str] pydantic-field
server_version: str pydantic-field
datamodel_version: str pydantic-field
Links (BaseModel) pydantic-model
Source code in pydatalab/routes/v0_1/info.py
class Links(BaseModel):

    self: AnyUrl

    class Config:
        extra = "allow"
self: AnyUrl pydantic-field required
Data (BaseModel) pydantic-model
Source code in pydatalab/routes/v0_1/info.py
class Data(BaseModel):
    id: str
    type: str
    attributes: Attributes
id: str pydantic-field required
type: str pydantic-field required
attributes: Attributes pydantic-field required
JSONAPIResponse (BaseModel) pydantic-model
Source code in pydatalab/routes/v0_1/info.py
class JSONAPIResponse(BaseModel):

    data: Union[Data, List[Data]]
    meta: Meta
    links: Links
data: Union[pydatalab.routes.v0_1.info.Data, List[pydatalab.routes.v0_1.info.Data]] pydantic-field required
meta: Meta pydantic-field required
links: Links pydantic-field required
MetaPerson (BaseModel) pydantic-model
Source code in pydatalab/routes/v0_1/info.py
class MetaPerson(BaseModel):
    dislay_name: Optional[str]
    contact_email: str
dislay_name: str pydantic-field
contact_email: str pydantic-field required
Info (Attributes, Meta) pydantic-model
Source code in pydatalab/routes/v0_1/info.py
class Info(Attributes, Meta):

    maintainer: Optional[MetaPerson]
    issue_tracker: Optional[AnyUrl]
    homepage: Optional[AnyUrl]
    source_repository: Optional[AnyUrl]

    @validator("maintainer")
    def strip_maintainer_fields(cls, v):
        if isinstance(v, Person):
            return MetaPerson(contact_email=v.contact_email, display_name=v.display_name)
        return v
maintainer: MetaPerson pydantic-field
issue_tracker: AnyUrl pydantic-field
homepage: AnyUrl pydantic-field
source_repository: AnyUrl pydantic-field
strip_maintainer_fields(v) classmethod
Source code in pydatalab/routes/v0_1/info.py
@validator("maintainer")
def strip_maintainer_fields(cls, v):
    if isinstance(v, Person):
        return MetaPerson(contact_email=v.contact_email, display_name=v.display_name)
    return v
get_info()
Source code in pydatalab/routes/v0_1/info.py
def get_info():

    metadata = _get_deployment_metadata_once()

    return (
        jsonify(
            json.loads(
                JSONAPIResponse(
                    data=Data(id="/", type="info", attributes=Info(**metadata)),
                    meta=Meta(query=request.query_string),
                    links=Links(self=request.url),
                ).json()
            )
        ),
        200,
    )
items
ENDPOINTS: Dict[str, Callable]
Functions
reserialize_blocks(display_order: List[str], blocks_obj: Dict[str, Dict]) -> Dict[str, Dict]

Create the corresponding Python objects from JSON block data, then serialize it again as JSON to populate any missing properties.

Parameters:

Name Type Description Default
blocks_obj Dict[str, Dict]

A dictionary containing the JSON block data, keyed by block ID.

required

Returns:

Type Description
Dict[str, Dict]

A dictionary with the re-serialized block data.

Source code in pydatalab/routes/v0_1/items.py
def reserialize_blocks(display_order: List[str], blocks_obj: Dict[str, Dict]) -> Dict[str, Dict]:
    """Create the corresponding Python objects from JSON block data, then
    serialize it again as JSON to populate any missing properties.

    Parameters:
        blocks_obj: A dictionary containing the JSON block data, keyed by block ID.

    Returns:
        A dictionary with the re-serialized block data.

    """
    for block_id in display_order:
        try:
            block_data = blocks_obj[block_id]
        except KeyError:
            LOGGER.warning(f"block_id {block_id} found in display order but not in blocks_obj")
            continue
        blocktype = block_data["blocktype"]
        blocks_obj[block_id] = (
            BLOCK_TYPES.get(blocktype, BLOCK_TYPES["notsupported"]).from_db(block_data).to_web()
        )

    return blocks_obj
dereference_files(file_ids: List[Union[str, bson.objectid.ObjectId]]) -> Dict[str, Dict]

For a list of Object IDs (as strings or otherwise), query the files collection and return a dictionary of the data stored under each ID.

Parameters:

Name Type Description Default
file_ids List[Union[str, bson.objectid.ObjectId]]

The list of IDs of files to return;

required

Returns:

Type Description
Dict[str, Dict]

The dereferenced data as a dictionary with (string) ID keys.

Source code in pydatalab/routes/v0_1/items.py
def dereference_files(file_ids: List[Union[str, ObjectId]]) -> Dict[str, Dict]:
    """For a list of Object IDs (as strings or otherwise), query the files collection
    and return a dictionary of the data stored under each ID.

    Parameters:
        file_ids: The list of IDs of files to return;

    Returns:
        The dereferenced data as a dictionary with (string) ID keys.

    """
    results = {
        str(f["_id"]): f
        for f in flask_mongo.db.files.find(
            {
                "_id": {"$in": [ObjectId(_id) for _id in file_ids]},
            }
        )
    }
    if len(results) != len(file_ids):
        raise RuntimeError(
            "Some file IDs did not have corresponding database entries.\n"
            f"Returned: {list(results.keys())}\n"
            f"Requested: {file_ids}\n"
        )

    return results
get_starting_materials()
Source code in pydatalab/routes/v0_1/items.py
def get_starting_materials():
    if not current_user.is_authenticated and not CONFIG.TESTING:
        return (
            jsonify(
                status="error",
                message="Authorization required to access chemical inventory.",
            ),
            401,
        )

    items = [
        doc
        for doc in flask_mongo.db.items.aggregate(
            [
                {
                    "$match": {
                        "type": "starting_materials",
                        **get_default_permissions(user_only=False),
                    }
                },
                {
                    "$project": {
                        "_id": 0,
                        "item_id": 1,
                        "nblocks": {"$size": "$display_order"},
                        "date_acquired": 1,
                        "chemform": 1,
                        "name": 1,
                        "chemical_purity": 1,
                        "supplier": 1,
                        "location": 1,
                    }
                },
            ]
        )
    ]
    return jsonify({"status": "success", "items": items})
get_samples_summary(match: Optional[Dict] = None, project: Optional[Dict] = None) -> CommandCursor

Return a summary of item entries that match some criteria.

Parameters:

Name Type Description Default
match Optional[Dict]

A MongoDB aggregation match query to filter the results.

None
project Optional[Dict]

A MongoDB aggregation project query to filter the results, relative to the default included below.

None
Source code in pydatalab/routes/v0_1/items.py
def get_samples_summary(
    match: Optional[Dict] = None, project: Optional[Dict] = None
) -> CommandCursor:
    """Return a summary of item entries that match some criteria.

    Parameters:
        match: A MongoDB aggregation match query to filter the results.
        project: A MongoDB aggregation project query to filter the results, relative
            to the default included below.

    """
    if not match:
        match = {}
    match.update(get_default_permissions(user_only=False))
    match["type"] = {"$in": ["samples", "cells"]}

    _project = {
        "_id": 0,
        "creators": {
            "display_name": 1,
            "contact_email": 1,
        },
        "collections": {
            "collection_id": 1,
            "title": 1,
        },
        "item_id": 1,
        "name": 1,
        "chemform": 1,
        "nblocks": {"$size": "$display_order"},
        "characteristic_chemical_formula": 1,
        "type": 1,
        "date": 1,
        "refcode": 1,
    }

    # Cannot mix 0 and 1 keys in MongoDB project so must loop and check
    if project:
        for key in project:
            if project[key] == 0:
                _project.pop(key, None)
            else:
                _project[key] = 1

    return flask_mongo.db.items.aggregate(
        [
            {"$match": match},
            {"$lookup": creators_lookup()},
            {"$lookup": collections_lookup()},
            {"$project": _project},
            {"$sort": {"date": -1}},
        ]
    )
creators_lookup() -> Dict
Source code in pydatalab/routes/v0_1/items.py
def creators_lookup() -> Dict:
    return {
        "from": "users",
        "let": {"creator_ids": "$creator_ids"},
        "pipeline": [
            {
                "$match": {
                    "$expr": {
                        "$in": ["$_id", "$$creator_ids"],
                    },
                }
            },
            {"$project": {"_id": 0, "display_name": 1, "contact_email": 1}},
        ],
        "as": "creators",
    }
files_lookup() -> Dict
Source code in pydatalab/routes/v0_1/items.py
def files_lookup() -> Dict:
    return {
        "from": "files",
        "localField": "file_ObjectIds",
        "foreignField": "_id",
        "as": "files",
    }
collections_lookup() -> Dict

Looks inside the relationships of the item, searches for IDs in the collections table and then projects only the collection ID and name for the response.

Source code in pydatalab/routes/v0_1/items.py
def collections_lookup() -> Dict:
    """Looks inside the relationships of the item, searches for IDs in the collections
    table and then projects only the collection ID and name for the response.

    """

    return {
        "from": "collections",
        "let": {"collection_ids": "$relationships.immutable_id"},
        "pipeline": [
            {
                "$match": {
                    "$expr": {
                        "$in": ["$_id", "$$collection_ids"],
                    },
                    "type": "collections",
                }
            },
            {"$project": {"_id": 1, "collection_id": 1}},
        ],
        "as": "collections",
    }
get_samples()
Source code in pydatalab/routes/v0_1/items.py
def get_samples():
    return jsonify({"status": "success", "samples": list(get_samples_summary())})
search_items()

Perform free text search on items and return the top results. GET parameters: query: String with the search terms. nresults: Maximum number of (default 100) !!! types "If None, search all types of items. Otherwise, a list of strings" giving the types to consider. (e.g. ["samples","starting_materials"])

Returns:

Type Description

response list of dictionaries containing the matching items in order of descending match score.

Source code in pydatalab/routes/v0_1/items.py
def search_items():
    """Perform free text search on items and return the top results.
    GET parameters:
        query: String with the search terms.
        nresults: Maximum number of  (default 100)
        types: If None, search all types of items. Otherwise, a list of strings
               giving the types to consider. (e.g. ["samples","starting_materials"])

    Returns:
        response list of dictionaries containing the matching items in order of
        descending match score.
    """

    query = request.args.get("query", type=str)
    nresults = request.args.get("nresults", default=100, type=int)
    types = request.args.get("types", default=None)
    if isinstance(types, str):
        types = types.split(",")  # should figure out how to parse as list automatically

    match_obj = {"$text": {"$search": query}, **get_default_permissions(user_only=False)}
    if types is not None:
        match_obj["type"] = {"$in": types}

    cursor = flask_mongo.db.items.aggregate(
        [
            {"$match": match_obj},
            {"$sort": {"score": {"$meta": "textScore"}}},
            {"$limit": nresults},
            {
                "$project": {
                    "_id": 0,
                    "type": 1,
                    "item_id": 1,
                    "name": 1,
                    "chemform": 1,
                    "refcode": 1,
                }
            },
        ]
    )

    return jsonify({"status": "success", "items": list(cursor)}), 200
create_sample()
Source code in pydatalab/routes/v0_1/items.py
def create_sample():
    request_json = request.get_json()  # noqa: F821 pylint: disable=undefined-variable
    if "new_sample_data" in request_json:
        response, http_code = _create_sample(
            request_json["new_sample_data"], request_json.get("copy_from_item_id")
        )
    else:
        response, http_code = _create_sample(request_json)

    return jsonify(response), http_code
create_samples()

attempt to create multiple samples at once. Because each may result in success or failure, 207 is returned along with a json field containing all the individual http_codes

Source code in pydatalab/routes/v0_1/items.py
def create_samples():
    """attempt to create multiple samples at once.
    Because each may result in success or failure, 207 is returned along with a
    json field containing all the individual http_codes"""

    request_json = request.get_json()  # noqa: F821 pylint: disable=undefined-variable

    sample_jsons = request_json["new_sample_datas"]
    copy_from_item_ids = request_json.get("copy_from_item_ids")

    if copy_from_item_ids is None:
        copy_from_item_ids = [None] * len(sample_jsons)

    outputs = [
        _create_sample(sample_json, copy_from_item_id)
        for sample_json, copy_from_item_id in zip(sample_jsons, copy_from_item_ids)
    ]
    responses, http_codes = zip(*outputs)

    statuses = [response["status"] for response in responses]
    nsuccess = statuses.count("success")
    nerror = statuses.count("error")

    return (
        jsonify(
            nsuccess=nsuccess,
            nerror=nerror,
            responses=responses,
            http_codes=http_codes,
        ),
        207,
    )  # 207: multi-status
delete_sample()
Source code in pydatalab/routes/v0_1/items.py
def delete_sample():
    request_json = request.get_json()  # noqa: F821 pylint: disable=undefined-variable
    item_id = request_json["item_id"]

    result = flask_mongo.db.items.delete_one(
        {"item_id": item_id, **get_default_permissions(user_only=True)}
    )

    if result.deleted_count != 1:
        return (
            jsonify(
                {
                    "status": "error",
                    "message": f"Authorization required to attempt to delete sample with {item_id=} from the database.",
                }
            ),
            401,
        )
    return (
        jsonify(
            {
                "status": "success",
            }
        ),
        200,
    )
get_item_data(item_id, load_blocks: bool = False)

Generates a JSON response for the item with the given item_id, additionally resolving relationships to files and other items.

Parameters:

Name Type Description Default
load_blocks bool

Whether to regenerate any data blocks associated with this sample (i.e., create the Python object corresponding to the block and call its render function).

False
Source code in pydatalab/routes/v0_1/items.py
def get_item_data(item_id, load_blocks: bool = False):
    """Generates a JSON response for the item with the given `item_id`,
    additionally resolving relationships to files and other items.

    Parameters:
       load_blocks: Whether to regenerate any data blocks associated with this
           sample (i.e., create the Python object corresponding to the block and
           call its render function).

    """

    # retrieve the entry from the database:
    cursor = flask_mongo.db.items.aggregate(
        [
            {"$match": {"item_id": item_id, **get_default_permissions(user_only=False)}},
            {"$lookup": creators_lookup()},
            {"$lookup": collections_lookup()},
            {"$lookup": files_lookup()},
        ],
    )

    try:
        doc = list(cursor)[0]
    except IndexError:
        doc = None

    if not doc or (
        not current_user.is_authenticated
        and not CONFIG.TESTING
        and not doc["type"] == "starting_materials"
    ):
        return (
            jsonify(
                {
                    "status": "error",
                    "message": f"No matching item {item_id=} with current authorization.",
                }
            ),
            404,
        )

    # determine the item type and validate according to the appropriate schema
    try:
        ItemModel = ITEM_MODELS[doc["type"]]
    except KeyError:
        if "type" in doc:
            raise KeyError(f"Item {item_id=} has invalid type: {doc['type']}")
        else:
            raise KeyError(f"Item {item_id=} has no type field in document.")

    doc = ItemModel(**doc)
    if load_blocks:
        doc.blocks_obj = reserialize_blocks(doc.display_order, doc.blocks_obj)

    # find any documents with relationships that mention this document
    relationships_query_results = flask_mongo.db.items.find(
        filter={
            "$or": [
                {"relationships.item_id": doc.item_id},
                {"relationships.refcode": doc.refcode},
                {"relationships.immutable_id": doc.immutable_id},
            ]
        },
        projection={
            "item_id": 1,
            "refcode": 1,
            "relationships": {
                "$elemMatch": {
                    "$or": [
                        {"item_id": doc.item_id},
                        {"refcode": doc.refcode},
                    ],
                },
            },
        },
    )

    # loop over and collect all 'outer' relationships presented by other items
    incoming_relationships: Dict[RelationshipType, Set[str]] = {}
    for d in relationships_query_results:
        for k in d["relationships"]:
            if k["relation"] not in incoming_relationships:
                incoming_relationships[k["relation"]] = set()
            incoming_relationships[k["relation"]].add(
                d["item_id"] or d["refcode"] or d["immutable_id"]
            )

    # loop over and aggregate all 'inner' relationships presented by this item
    inlined_relationships: Dict[RelationshipType, Set[str]] = {}
    if doc.relationships is not None:
        inlined_relationships = {
            relation: {
                d.item_id or d.refcode or d.immutable_id
                for d in doc.relationships
                if d.relation == relation
            }
            for relation in RelationshipType
        }

    # reunite parents and children from both directions of the relationships field
    parents = incoming_relationships.get(RelationshipType.CHILD, set()).union(
        inlined_relationships.get(RelationshipType.PARENT, set())
    )
    children = incoming_relationships.get(RelationshipType.PARENT, set()).union(
        inlined_relationships.get(RelationshipType.CHILD, set())
    )

    # Must be exported to JSON first to apply the custom pydantic JSON encoders
    return_dict = json.loads(doc.json(exclude_unset=True))

    # create the files_data dictionary keyed by file ObjectId
    files_data: Dict[ObjectId, Dict] = dict(
        [(f["immutable_id"], f) for f in return_dict.get("files") or []]
    )

    return jsonify(
        {
            "status": "success",
            "item_id": item_id,
            "item_data": return_dict,
            "files_data": files_data,
            "child_items": sorted(children),
            "parent_items": sorted(parents),
        }
    )
save_item()
Source code in pydatalab/routes/v0_1/items.py
def save_item():
    request_json = request.get_json()  # noqa: F821 pylint: disable=undefined-variable

    item_id = request_json["item_id"]
    updated_data = request_json["data"]

    # These keys should not be updated here and cannot be modified by the user through this endpoint
    for k in ("_id", "file_ObjectIds", "creators", "creator_ids", "item_id", "relationships"):
        if k in updated_data:
            del updated_data[k]

    updated_data["last_modified"] = datetime.datetime.now().isoformat()

    for block_id, block_data in updated_data.get("blocks_obj", {}).items():
        blocktype = block_data["blocktype"]

        block = BLOCK_TYPES.get(blocktype, BLOCK_TYPES["notsupported"]).from_web(block_data)

        updated_data["blocks_obj"][block_id] = block.to_db()

    item = flask_mongo.db.items.find_one(
        {"item_id": item_id, **get_default_permissions(user_only=True)}
    )

    if not item:
        return (
            jsonify(
                status="error",
                message=f"Unable to find item with appropriate permissions and {item_id=}.",
            ),
            400,
        )

    if updated_data.get("collections", []):
        try:
            updated_data["collections"] = _check_collections(updated_data)
        except ValueError as exc:
            return (
                dict(
                    status="error",
                    message=f"Cannot update {item_id!r} with missing collections {updated_data['collections']!r}: {exc}",
                    item_id=item_id,
                ),
                401,
            )

    item_type = item["type"]
    item.update(updated_data)

    try:
        item = ITEM_MODELS[item_type](**item).dict()
    except ValidationError as exc:
        return (
            jsonify(
                status="error",
                message=f"Unable to update item {item_id=} ({item_type=}) with new data {updated_data}",
                output=str(exc),
            ),
            400,
        )

    # remove collections and creators and any other reference fields
    item.pop("collections")
    item.pop("creators")

    result = flask_mongo.db.items.update_one(
        {"item_id": item_id},
        {"$set": item},
    )

    if result.matched_count != 1:
        return (
            jsonify(
                status="error",
                message=f"{item_id} item update failed. no subdocument matched",
                output=result.raw_result,
            ),
            400,
        )

    return jsonify(status="success", last_modified=updated_data["last_modified"]), 200
search_users()

Perform free text search on users and return the top results. GET parameters: query: String with the search terms. nresults: Maximum number of (default 100)

Returns:

Type Description

response list of dictionaries containing the matching items in order of descending match score.

Source code in pydatalab/routes/v0_1/items.py
def search_users():
    """Perform free text search on users and return the top results.
    GET parameters:
        query: String with the search terms.
        nresults: Maximum number of  (default 100)

    Returns:
        response list of dictionaries containing the matching items in order of
        descending match score.
    """

    query = request.args.get("query", type=str)
    nresults = request.args.get("nresults", default=100, type=int)
    types = request.args.get("types", default=None)

    match_obj = {"$text": {"$search": query}}
    if types is not None:
        match_obj["type"] = {"$in": types}

    cursor = flask_mongo.db.users.aggregate(
        [
            {"$match": match_obj},
            {"$sort": {"score": {"$meta": "textScore"}}},
            {"$limit": nresults},
            {
                "$project": {
                    "_id": 1,
                    "identities": 1,
                    "display_name": 1,
                }
            },
        ]
    )

    return jsonify({"status": "success", "users": list(cursor)}), 200
remotes
ENDPOINTS: Dict[str, Callable]
Functions
list_remote_directories()

Returns the most recent directory structures from the server.

If the cache is missing or is older than some configured time, then it will be reconstructed.

Source code in pydatalab/routes/v0_1/remotes.py
def list_remote_directories():
    """Returns the most recent directory structures from the server.

    If the cache is missing or is older than some configured time,
    then it will be reconstructed.

    """
    if not current_user.is_authenticated and not CONFIG.TESTING:
        return (
            jsonify(
                {
                    "status": "error",
                    "title": "Not Authorized",
                    "detail": "Listing remote directories requires authentication.",
                }
            ),
            401,
        )

    invalidate_cache = None
    if "invalidate_cache" in request.args:
        invalidate_cache = request.args["invalidate_cache"]
        if invalidate_cache not in ("1", "0"):
            return jsonify({"error": "invalidate_cache must be 0 or 1"}), 400
        invalidate_cache = bool(int(invalidate_cache))

    all_directory_structures = get_directory_structures(
        CONFIG.REMOTE_FILESYSTEMS, invalidate_cache=invalidate_cache
    )

    response = {}
    response["meta"] = {}
    response["meta"]["remotes"] = CONFIG.REMOTE_FILESYSTEMS
    if all_directory_structures:
        oldest_update = min(d["last_updated"] for d in all_directory_structures)
        response["meta"]["oldest_cache_update"] = oldest_update.isoformat()
        response["data"] = all_directory_structures
    return jsonify(response), 200

simple_bokeh_plot

FONTSIZE
TOOLS
TYPEFACE
mytheme
style
simple_bokeh_plot(xy_filename, x_label = None, y_label = None)
Source code in pydatalab/simple_bokeh_plot.py
def simple_bokeh_plot(xy_filename, x_label=None, y_label=None):

    df = pd.read_csv(xy_filename, sep=r"\s+")
    # source = ColumnDataSource(df)
    source = ColumnDataSource(
        {"x_col": df[df.columns[0]], "y_col": df[df.columns[1]]}
    )  # plot the first two columns

    kw = dict()
    p = figure(sizing_mode="scale_width", aspect_ratio=1.5, tools=TOOLS, **kw)

    p.xaxis.axis_label = x_label
    p.yaxis.axis_label = y_label

    # apply a theme. for some reason, this isn't carrying over
    # to components() calls, so use components(theme=mytheme)
    curdoc().theme = mytheme

    p.circle("x_col", "y_col", source=source)
    p.toolbar.logo = "grey"
    p.js_on_event(DoubleTap, CustomJS(args=dict(p=p), code="p.reset.emit()"))
    # show(p)
    return p

utils

This module contains utility functions that can be used anywhere in the package.

Classes
CustomJSONEncoder (JSONEncoder)

Use a JSON encoder that can handle pymongo's bson.

Source code in pydatalab/utils.py
class CustomJSONEncoder(JSONEncoder):
    """Use a JSON encoder that can handle pymongo's bson."""

    def default(self, obj):
        if isinstance(obj, datetime.datetime):
            return datetime.datetime.isoformat(obj)
        return json_util.default(obj)
Methods
default(self, obj)

Convert o to a JSON serializable type. See :meth:json.JSONEncoder.default. Python does not support overriding how basic types like str or list are serialized, they are handled before this method.

Source code in pydatalab/utils.py
def default(self, obj):
    if isinstance(obj, datetime.datetime):
        return datetime.datetime.isoformat(obj)
    return json_util.default(obj)
Functions
reduce_df_size(df: DataFrame, target_nrows: int, endpoint: bool = True) -> DataFrame

Reduce the dataframe to the number of target rows by applying a stride.

Parameters:

Name Type Description Default
df DataFrame

The dataframe to reduce.

required
target_nrows int

The target number of rows to reduce each column to.

required
endpoint bool

Whether to include the endpoint of the dataframe.

True

Returns:

Type Description
DataFrame

A copy of the input dataframe with the applied stride.

Source code in pydatalab/utils.py
def reduce_df_size(df: pd.DataFrame, target_nrows: int, endpoint: bool = True) -> pd.DataFrame:
    """Reduce the dataframe to the number of target rows by applying a stride.

    Parameters:
        df: The dataframe to reduce.
        target_nrows: The target number of rows to reduce each column to.
        endpoint: Whether to include the endpoint of the dataframe.

    Returns:
        A copy of the input dataframe with the applied stride.

    """
    num_rows = len(df)
    stride = ceil(num_rows / target_nrows)
    if endpoint:
        indices = [0] + list(range(stride, num_rows - 1, stride)) + [num_rows - 1]
    else:
        indices = list(range(0, num_rows, stride))

    return df.iloc[indices].copy()