Skip to content

REST API

pydatalab.routes.v0_1 special

BLUEPRINTS: tuple

admin

ADMIN

_()

Source code in pydatalab/routes/v0_1/admin.py
@ADMIN.before_request
@admin_only
def _(): ...

get_users()

Source code in pydatalab/routes/v0_1/admin.py
@ADMIN.route("/users")
def get_users():
    users = flask_mongo.db.users.aggregate(
        [
            {"$match": get_default_permissions(user_only=True)},
            {
                "$lookup": {
                    "from": "roles",
                    "localField": "_id",
                    "foreignField": "_id",
                    "as": "role",
                }
            },
            {
                "$addFields": {
                    "role": {
                        "$cond": {
                            "if": {"$eq": [{"$size": "$role"}, 0]},
                            "then": "user",
                            "else": {"$arrayElemAt": ["$role.role", 0]},
                        }
                    }
                }
            },
        ]
    )

    return jsonify({"status": "success", "data": list(users)})

save_role(user_id)

Source code in pydatalab/routes/v0_1/admin.py
@ADMIN.route("/roles/<user_id>", methods=["PATCH"])
def save_role(user_id):
    request_json = request.get_json()

    if request_json is not None:
        user_role = request_json

    if not current_user.is_authenticated and not CONFIG.TESTING:
        return (jsonify({"status": "error", "message": "No user authenticated."}), 401)

    if not CONFIG.TESTING and current_user.role != "admin":
        return (
            jsonify({"status": "error", "message": "User not allowed to edit this profile."}),
            403,
        )

    existing_user = flask_mongo.db.users.find_one({"_id": ObjectId(user_id)})

    if not existing_user:
        return (jsonify({"status": "error", "message": "User not found."}), 404)

    existing_role = flask_mongo.db.roles.find_one({"_id": ObjectId(user_id)})

    if not existing_role:
        if not user_role:
            return (jsonify({"status": "error", "message": "Role not provided for new user."}), 400)

        new_user_role = {"_id": ObjectId(user_id), **user_role}
        flask_mongo.db.roles.insert_one(new_user_role)

        return (jsonify({"status": "success", "message": "New user's role created."}), 201)

    update_result = flask_mongo.db.roles.update_one({"_id": ObjectId(user_id)}, {"$set": user_role})

    if update_result.matched_count != 1:
        return (jsonify({"status": "error", "message": "Unable to update user."}), 400)

    if update_result.modified_count != 1:
        return (
            jsonify(
                {
                    "status": "success",
                    "message": "No update was performed",
                }
            ),
            200,
        )

    return (jsonify({"status": "success"}), 200)

auth

This module implements functionality for authenticating users via OAuth2 providers and JWT, and associating these identities with their local accounts.

AUTH

EMAIL_BLUEPRINT

KEY_LENGTH: int

OAUTH: Dict[pydatalab.models.people.IdentityType, flask.blueprints.Blueprint]

A dictionary of Flask blueprints corresponding to the supported OAuth providers.

OAUTH_PROXIES: Dict[pydatalab.models.people.IdentityType, werkzeug.local.LocalProxy]

A dictionary of proxy objects (c.f. Flask context locals) corresponding to the supported OAuth2 providers, and can be used to make further authenticated requests out to the providers.

wrapped_login_user(*args, **kwargs)

Source code in pydatalab/routes/v0_1/auth.py
@logged_route
def wrapped_login_user(*args, **kwargs):
    login_user(*args, **kwargs)

find_user_with_identity(identifier: str, identity_type: Union[str, pydatalab.models.people.IdentityType], verify: bool = False) -> Optional[pydatalab.models.people.Person]

Look up the given identity in the users database.

Parameters:

Name Type Description Default
identifier str

The identifier of the identity to look up.

required
identity_type Union[str, pydatalab.models.people.IdentityType]

The type of the identity to look up.

required
verify bool

Whether to mark the identity as verified if it is found.

False
Source code in pydatalab/routes/v0_1/auth.py
@logged_route
def find_user_with_identity(
    identifier: str,
    identity_type: Union[str, IdentityType],
    verify: bool = False,
) -> Optional[Person]:
    """Look up the given identity in the users database.

    Parameters:
        identifier: The identifier of the identity to look up.
        identity_type: The type of the identity to look up.
        verify: Whether to mark the identity as verified if it is found.

    """
    user = flask_mongo.db.users.find_one(
        {"identities.identifier": identifier, "identities.identity_type": identity_type},
    )
    if user:
        person = Person(**user)
        identity_indices: list[int] = [
            ind
            for ind, _ in enumerate(person.identities)
            if (_.identity_type == identity_type and _.identifier == identifier)
        ]
        if len(identity_indices) != 1:
            raise RuntimeError(
                "Unexpected error: multiple or no identities matched the OAuth token."
            )

        identity_index = identity_indices[0]

        if verify and not person.identities[identity_index].verified:
            flask_mongo.db.users.update_one(
                {"_id": person.immutable_id},
                {"$set": {f"identities.{identity_index}.verified": True}},
            )

        return person

    return None

find_create_or_modify_user(identifier: str, identity_type: Union[str, pydatalab.models.people.IdentityType], identity_name: str, display_name: Optional[str] = None, verified: bool = False, create_account: bool | pydatalab.models.people.AccountStatus = False) -> None

Search for a user account with the given identifier and identity type, creating or connecting one if it does not exist.

1. Find any user with the given identity, if found, return it.
2. If no user exists, check if there is currently a user logged in:
    - If so, attach the identity to the current user.
    - If not, create an entry in the user database with this identity.
3. Log in as the user for this session.
Source code in pydatalab/routes/v0_1/auth.py
def find_create_or_modify_user(
    identifier: str,
    identity_type: Union[str, IdentityType],
    identity_name: str,
    display_name: Optional[str] = None,
    verified: bool = False,
    create_account: bool | AccountStatus = False,
) -> None:
    """Search for a user account with the given identifier and identity type, creating
    or connecting one if it does not exist.

        1. Find any user with the given identity, if found, return it.
        2. If no user exists, check if there is currently a user logged in:
            - If so, attach the identity to the current user.
            - If not, create an entry in the user database with this identity.
        3. Log in as the user for this session.

    """

    @logged_route
    def attach_identity_to_user(
        user_id: str,
        identity: Identity,
        use_display_name: bool = False,
        use_contact_email: bool = False,
    ) -> None:
        """Associates an OAuth ID with a user entry in the database.

        This function is currently brittle and would need to be updated
        if the corresponding `Person` schema changes due to the hard-coded
        field names.

        Parameters:
            user_id: The database ID of the user as a string.
            identity: The identity to associate.
            use_display_name: Whether to set the user's top-level display name with a
                display name provided by this identity.
            use_contact_email: Whether to set the user's top-level contact email with
                an email address provided by this identity.

        Raises:
            RuntimeError: If the update was unsuccessful.

        """
        update = {"$push": {"identities": identity.dict()}}
        if use_display_name and identity and identity.display_name:
            update["$set"] = {"display_name": identity.display_name}

        if use_contact_email and identity.identity_type is IdentityType.EMAIL and identity.verified:
            update["$set"] = {"contact_email": identity.identifier}

        result = flask_mongo.db.users.update_one(
            {"_id": ObjectId(user_id)},
            update,
        )

        if result.matched_count != 1:
            raise RuntimeError(
                f"Attempted to modify user {user_id} but performed {result.matched_count} updates. Results:\n{result.raw_result}"
            )

    user = find_user_with_identity(identifier, identity_type, verify=True)

    # If no user was found in the database with the OAuth ID, make or modify one:
    if not user:
        identity = Identity(
            identifier=identifier,
            identity_type=identity_type,
            name=identity_name,
            display_name=display_name,
            verified=verified,
        )

        # If there is currently a user logged in who has gone through OAuth with a new identity,
        # then update the user database with the identity
        if current_user.is_authenticated:
            attach_identity_to_user(
                current_user.id,
                identity,
                use_display_name=True if current_user.display_name is None else False,
                use_contact_email=True if current_user.contact_email is None else False,
            )
            current_user.refresh()
            user = current_user.person

        # If there is no current authenticated user, make one with the current OAuth identity
        else:
            if not create_account:
                raise UserRegistrationForbidden

            if isinstance(create_account, bool):
                account_status = AccountStatus.UNVERIFIED
            else:
                account_status = create_account

            user = Person.new_user_from_identity(
                identity, use_display_name=True, account_status=account_status
            )
            LOGGER.debug("Inserting new user model %s into database", user)
            insert_pydantic_model_fork_safe(user, "users")
            user_model = get_by_id(str(user.immutable_id))
            if user is None:
                raise RuntimeError("Failed to insert user into database")
            wrapped_login_user(user_model)

    # Log the user into the session with this identity
    if user is not None:
        wrapped_login_user(get_by_id(str(user.immutable_id)))

Generates a JWT-based magic link with which a user can log in, stores it in the database and sends it to the verified email address.

Source code in pydatalab/routes/v0_1/auth.py
@EMAIL_BLUEPRINT.route("/magic-link", methods=["POST"])
def generate_and_share_magic_link():
    """Generates a JWT-based magic link with which a user can log in, stores it
    in the database and sends it to the verified email address.

    """
    request_json = request.get_json()
    email = request_json.get("email")
    referrer = request_json.get("referrer")

    if not email:
        return jsonify({"status": "error", "detail": "No email provided."}), 400

    if not re.match(r"^\S+@\S+.\S+$", email):
        return jsonify({"status": "error", "detail": "Invalid email provided."}), 400

    if not referrer:
        LOGGER.warning("No referrer provided for magic link request: %s", request_json)
        return (
            jsonify(
                {
                    "status": "error",
                    "detail": "Referrer address not provided, please contact the datalab administrator.",
                }
            ),
            400,
        )

    # Generate a JWT for the user with a short expiration; the session itself
    # should persist
    # The key `exp` is a standard part of JWT; pyjwt treats this as an expiration time
    # and will correctly encode the datetime
    token = jwt.encode(
        {"exp": datetime.datetime.now(datetime.timezone.utc) + LINK_EXPIRATION, "email": email},
        CONFIG.SECRET_KEY,
        algorithm="HS256",
    )

    flask_mongo.db.magic_links.insert_one(
        {"jwt": token},
    )

    link = f"{referrer}?token={token}"

    instance_url = referrer.replace("https://", "")

    # See if the user already exists and adjust the email if so
    user = find_user_with_identity(email, IdentityType.EMAIL, verify=False)

    if not user:
        allowed = _check_email_domain(email, CONFIG.EMAIL_DOMAIN_ALLOW_LIST)
        if not allowed:
            LOGGER.info("Did not allow %s to register an account", email)
            return (
                jsonify(
                    {
                        "status": "error",
                        "detail": f"Email address {email} is not allowed to register an account. Please contact the administrator if you believe this is an error.",
                    }
                ),
                403,
            )

    if user is not None:
        subject = "Datalab Sign-in Magic Link"
        body = f"Click the link below to sign-in to the datalab instance at {instance_url}:\n\n{link}\n\nThis link is single-use and will expire in 1 hour."
    else:
        subject = "Datalab Registration Magic Link"
        body = f"Click the link below to register for the datalab instance at {instance_url}:\n\n{link}\n\nThis link is single-use and will expire in 1 hour."

    try:
        send_mail(email, subject, body)
    except Exception as exc:
        LOGGER.warning("Failed to send email to %s: %s", email, exc)
        return jsonify({"status": "error", "detail": "Email not sent successfully."}), 400

    return jsonify({"status": "success", "detail": "Email sent successfully."}), 200

email_logged_in()

Endpoint for handling magic link authentication.

  • Checks the passed token for as valid JWT in the magic_links collection
  • If found, checks if the user with the decoded email exists in the user collection.
  • If not found, make the user account and verify their email address,
  • Authenticate the user for this session.
Source code in pydatalab/routes/v0_1/auth.py
@EMAIL_BLUEPRINT.route("/email")
def email_logged_in():
    """Endpoint for handling magic link authentication.

    - Checks the passed token for as valid JWT in the `magic_links` collection
    - If found, checks if the user with the decoded email exists in the user
    collection.
    - If not found, make the user account and verify their email address,
    - Authenticate the user for this session.

    """
    args = request.args
    token = args.get("token")
    if not token:
        raise ValueError("Token not provided")

    if not flask_mongo.db.magic_links.find_one({"jwt": token}):
        raise ValueError("Token not found, please request a new one.")

    data = jwt.decode(
        token,
        CONFIG.SECRET_KEY,
        algorithms=["HS256"],
    )

    if datetime.datetime.fromtimestamp(
        data["exp"], tz=datetime.timezone.utc
    ) < datetime.datetime.now(tz=datetime.timezone.utc):
        raise ValueError("Token expired, please request a new one.")

    email = data["email"]
    if not email:
        raise RuntimeError("No email found; please request a new token.")

    # If the email domain list is explicitly configured to None, this allows any
    # email address to make an active account, otherwise the email domain must match
    # the list of allowed domains and the admin must verify the user
    allowed = _check_email_domain(email, CONFIG.EMAIL_DOMAIN_ALLOW_LIST)
    if not allowed:
        # If this point is reached, the token is valid but the server settings have
        # changed since the link was generated, so best to fail safe
        raise UserRegistrationForbidden

    create_account = AccountStatus.UNVERIFIED
    if (
        CONFIG.EMAIL_DOMAIN_ALLOW_LIST is None
        or CONFIG.EMAIL_AUTO_ACTIVATE_ACCOUNTS
        or CONFIG.AUTO_ACTIVATE_ACCOUNTS
    ):
        create_account = AccountStatus.ACTIVE

    find_create_or_modify_user(
        email,
        IdentityType.EMAIL,
        email,
        display_name=email,
        verified=True,
        create_account=create_account,
    )

    if CONFIG.APP_URL:
        return redirect(CONFIG.APP_URL, 307)
    referer = request.headers.get("Referer", "/")
    return redirect(referer, 307)

github_logged_in(blueprint, token)

This Flask signal hooks into any attempt to use the GitHub blueprint, and will make a user account with this identity if not already present in the database.

Makes one authorized request to the GitHub API to get the user's GitHub ID, username and display name, without storing the OAuth token.

Source code in pydatalab/routes/v0_1/auth.py
@oauth_authorized.connect_via(OAUTH[IdentityType.GITHUB])
def github_logged_in(blueprint, token):
    """This Flask signal hooks into any attempt to use the GitHub blueprint, and will
    make a user account with this identity if not already present in the database.

    Makes one authorized request to the GitHub API to get the user's GitHub ID,
    username and display name, without storing the OAuth token.

    """
    if not token:
        return False

    resp = blueprint.session.get("/user")
    if not resp.ok:
        return False

    github_info = resp.json()
    github_user_id = str(github_info["id"])
    username = str(github_info["login"])
    name = str(github_info["name"] if github_info["name"] is not None else github_info["login"])

    create_account: bool | AccountStatus = False
    # Use the read:org scope to check if the user is a member of at least one of the allowed orgs
    if CONFIG.GITHUB_ORG_ALLOW_LIST:
        for org in CONFIG.GITHUB_ORG_ALLOW_LIST:
            if str(int(org)) == org:
                org = int(org)
            if blueprint.session.get(f"/orgs/{org}/members/{username}").ok:
                # If this person has a GH account on the org allow list, activate their account
                create_account = AccountStatus.ACTIVE
                break

    elif CONFIG.GITHUB_ORG_ALLOW_LIST is None:
        create_account = True

    if CONFIG.GITHUB_AUTO_ACTIVATE_ACCOUNTS or CONFIG.AUTO_ACTIVATE_ACCOUNTS:
        create_account = AccountStatus.ACTIVE

    find_create_or_modify_user(
        github_user_id,
        IdentityType.GITHUB,
        username,
        display_name=name,
        verified=True,
        create_account=create_account,
    )

    # Return false to prevent Flask-dance from trying to store the token elsewhere
    return False

orcid_logged_in(_, token)

This signal hooks into any attempt to use the ORCID blueprint, and will associate a user account with this identity if not already present in the database.

The OAuth token is not stored alongside the user.

Source code in pydatalab/routes/v0_1/auth.py
@oauth_authorized.connect_via(OAUTH[IdentityType.ORCID])
def orcid_logged_in(_, token):
    """This signal hooks into any attempt to use the ORCID blueprint, and will
    associate a user account with this identity if not already present in the database.

    The OAuth token is not stored alongside the user.

    """
    if not token:
        return False

    # New ORCID accounts must be activated by an admin unless configured otherwise
    create_account = AccountStatus.UNVERIFIED
    if CONFIG.ORCID_AUTO_ACTIVATE_ACCOUNTS or CONFIG.AUTO_ACTIVATE_ACCOUNTS:
        create_account = AccountStatus.ACTIVE

    find_create_or_modify_user(
        token["orcid"],
        IdentityType.ORCID,
        token["orcid"],
        display_name=token.get("name", token["orcid"]),
        verified=True,
        create_account=create_account,
    )

    # Return false to prevent Flask-dance from trying to store the token elsewhere
    return False

redirect_to_ui(blueprint, token)

Intercepts the default Flask-Dance and redirects to the referring page.

Source code in pydatalab/routes/v0_1/auth.py
@oauth_authorized.connect
def redirect_to_ui(blueprint, token):  # pylint: disable=unused-argument
    """Intercepts the default Flask-Dance and redirects to the referring page."""
    if CONFIG.APP_URL:
        return redirect(CONFIG.APP_URL, 307)
    referer = request.headers.get("Referer", "/")
    return redirect(referer, 307)

get_authenticated_user_info()

Returns metadata associated with the currently authenticated user.

Source code in pydatalab/routes/v0_1/auth.py
@AUTH.route("/get-current-user/", methods=["GET"])
def get_authenticated_user_info():
    """Returns metadata associated with the currently authenticated user."""
    if current_user.is_authenticated:
        current_user_response = json.loads(current_user.person.json())
        current_user_response["role"] = current_user.role.value
        return jsonify(current_user_response), 200
    else:
        return jsonify({"status": "failure", "message": "User must be authenticated."}), 401

generate_user_api_key()

Returns metadata associated with the currently authenticated user.

Source code in pydatalab/routes/v0_1/auth.py
@AUTH.route("/get-api-key/", methods=["GET"])
def generate_user_api_key():
    """Returns metadata associated with the currently authenticated user."""
    if current_user.is_authenticated:
        new_key = "".join(random.choices(ascii_letters, k=KEY_LENGTH))
        flask_mongo.db.api_keys.update_one(
            {"_id": ObjectId(current_user.id)},
            {"$set": {"hash": sha512(new_key.encode("utf-8")).hexdigest()}},
            upsert=True,
        )
        return jsonify({"key": new_key}), 200
    else:
        return (
            jsonify(
                {
                    "status": "failure",
                    "message": "User must be an authenticated admin to request an API key.",
                }
            ),
            401,
        )

blocks

BLOCKS

_()

Source code in pydatalab/routes/v0_1/blocks.py
@BLOCKS.before_request
@active_users_or_get_only
def _(): ...

add_data_block()

Call with AJAX to add a block to the sample

Source code in pydatalab/routes/v0_1/blocks.py
@BLOCKS.route("/add-data-block/", methods=["POST"])
def add_data_block():
    """Call with AJAX to add a block to the sample"""

    request_json = request.get_json()

    # pull out required arguments from json
    block_type = request_json["block_type"]
    item_id = request_json["item_id"]
    insert_index = request_json["index"]

    if block_type not in BLOCK_TYPES:
        return jsonify(status="error", message="Invalid block type"), 400

    block = BLOCK_TYPES[block_type](item_id=item_id)

    data = block.to_db()

    # currently, adding to both blocks and blocks_obj to mantain compatibility with
    # the old site. The new site only uses blocks_obj
    if insert_index:
        display_order_update = {
            "$each": [block.block_id],
            "$position": insert_index,
        }
    else:
        display_order_update = block.block_id

    result = flask_mongo.db.items.update_one(
        {"item_id": item_id, **get_default_permissions(user_only=True)},
        {
            "$push": {"blocks": data, "display_order": display_order_update},
            "$set": {f"blocks_obj.{block.block_id}": data},
        },
    )

    if result.modified_count < 1:
        return (
            jsonify(
                status="error",
                message=f"Update failed. {item_id=} is probably incorrect.",
            ),
            400,
        )

    # get the new display_order:
    display_order_result = flask_mongo.db.items.find_one(
        {"item_id": item_id, **get_default_permissions(user_only=True)}, {"display_order": 1}
    )

    return jsonify(
        status="success",
        new_block_obj=block.to_web(),
        new_block_insert_index=insert_index
        if insert_index is None
        else len(display_order_result["display_order"]) - 1,
        new_display_order=display_order_result["display_order"],
    )

add_collection_data_block()

Call with AJAX to add a block to the collection.

Source code in pydatalab/routes/v0_1/blocks.py
@BLOCKS.route("/add-collection-data-block/", methods=["POST"])
def add_collection_data_block():
    """Call with AJAX to add a block to the collection."""

    request_json = request.get_json()

    # pull out required arguments from json
    block_type = request_json["block_type"]
    collection_id = request_json["collection_id"]
    insert_index = request_json["index"]

    if block_type not in BLOCK_TYPES:
        return jsonify(status="error", message="Invalid block type"), 400

    block = BLOCK_TYPES[block_type](collection_id=collection_id)

    data = block.to_db()

    # currently, adding to both blocks and blocks_obj to mantain compatibility with
    # the old site. The new site only uses blocks_obj
    if insert_index:
        display_order_update = {
            "$each": [block.block_id],
            "$position": insert_index,
        }
    else:
        display_order_update = block.block_id

    result = flask_mongo.db.collections.update_one(
        {"collection_id": collection_id, **get_default_permissions(user_only=True)},
        {
            "$push": {"blocks": data, "display_order": display_order_update},
            "$set": {f"blocks_obj.{block.block_id}": data},
        },
    )

    if result.modified_count < 1:
        return (
            jsonify(
                status="error",
                message=f"Update failed. {collection_id=} is probably incorrect.",
            ),
            400,
        )

    # get the new display_order:
    display_order_result = flask_mongo.db.collections.find_one(
        {"collection_id": collection_id, **get_default_permissions(user_only=True)},
        {"display_order": 1},
    )

    return jsonify(
        status="success",
        new_block_obj=block.to_web(),
        new_block_insert_index=insert_index
        if insert_index is None
        else len(display_order_result["display_order"]) - 1,
        new_display_order=display_order_result["display_order"],
    )

update_block()

Take in json block data from site, process, and spit out updated data. May be used, for example, when the user changes plot parameters and the server needs to generate a new plot.

Source code in pydatalab/routes/v0_1/blocks.py
@BLOCKS.route("/update-block/", methods=["POST"])
def update_block():
    """Take in json block data from site, process, and spit
    out updated data. May be used, for example, when the user
    changes plot parameters and the server needs to generate a new
    plot.
    """

    request_json = request.get_json()
    block_data = request_json["block_data"]
    blocktype = block_data["blocktype"]
    save_to_db = request_json.get("save_to_db", False)

    block = BLOCK_TYPES[blocktype].from_web(block_data)

    saved_successfully = False
    if save_to_db:
        saved_successfully = _save_block_to_db(block)

    return (
        jsonify(
            status="success", saved_successfully=saved_successfully, new_block_data=block.to_web()
        ),
        200,
    )

delete_block()

Completely delete a data block from the database. In the future, we may consider preserving data by moving it to a different array, or simply making it invisible

Source code in pydatalab/routes/v0_1/blocks.py
@BLOCKS.route("/delete-block/", methods=["POST"])
def delete_block():
    """Completely delete a data block from the database. In the future,
    we may consider preserving data by moving it to a different array,
    or simply making it invisible"""
    request_json = request.get_json()
    item_id = request_json["item_id"]
    block_id = request_json["block_id"]

    result = flask_mongo.db.items.update_one(
        {"item_id": item_id, **get_default_permissions(user_only=True)},
        {
            "$pull": {
                "blocks": {"block_id": block_id},
                "display_order": block_id,
            },
            "$unset": {f"blocks_obj.{block_id}": ""},
        },
    )

    if result.modified_count < 1:
        return (
            jsonify(
                {
                    "status": "error",
                    "message": f"Update failed. The item_id probably incorrect: {item_id}",
                }
            ),
            400,
        )
    return (
        jsonify({"status": "success"}),
        200,
    )  # could try to switch to http 204 is "No Content" success with no json

delete_collection_block()

Completely delete a data block from the database that is currently attached to a collection.

In the future, we may consider preserving data by moving it to a different array, or simply making it invisible

Source code in pydatalab/routes/v0_1/blocks.py
@BLOCKS.route("/delete-collection-block/", methods=["POST"])
def delete_collection_block():
    """Completely delete a data block from the database that is currently
    attached to a collection.

    In the future, we may consider preserving data by moving it to a different array,
    or simply making it invisible"""
    request_json = request.get_json()
    collection_id = request_json["collection_id"]
    block_id = request_json["block_id"]

    result = flask_mongo.db.collections.update_one(
        {"collection_id": collection_id, **get_default_permissions(user_only=True)},
        {
            "$pull": {
                "blocks": {"block_id": block_id},
                "display_order": block_id,
            },
            "$unset": {f"blocks_obj.{block_id}": ""},
        },
    )

    if result.modified_count < 1:
        return (
            jsonify(
                {
                    "status": "error",
                    "message": f"Update failed. The collection_id probably incorrect: {collection_id}",
                }
            ),
            400,
        )
    return (
        jsonify({"status": "success"}),
        200,
    )

collections

COLLECTIONS

_()

Source code in pydatalab/routes/v0_1/collections.py
@COLLECTIONS.before_request
@active_users_or_get_only
def _(): ...

get_collections()

Source code in pydatalab/routes/v0_1/collections.py
@COLLECTIONS.route("/collections")
def get_collections():
    collections = flask_mongo.db.collections.aggregate(
        [
            {"$match": get_default_permissions(user_only=True)},
            {"$lookup": creators_lookup()},
            {"$project": {"_id": 0}},
            {"$sort": {"_id": -1}},
        ]
    )

    return jsonify({"status": "success", "data": list(collections)})

get_collection(collection_id)

Source code in pydatalab/routes/v0_1/collections.py
@COLLECTIONS.route("/collections/<collection_id>", methods=["GET"])
def get_collection(collection_id):
    cursor = flask_mongo.db.collections.aggregate(
        [
            {
                "$match": {
                    "collection_id": collection_id,
                    **get_default_permissions(user_only=True),
                }
            },
            {"$lookup": creators_lookup()},
            {"$sort": {"_id": -1}},
        ]
    )

    try:
        doc = list(cursor)[0]
    except IndexError:
        doc = None

    if not doc or (not current_user.is_authenticated and not CONFIG.TESTING):
        return (
            jsonify(
                {
                    "status": "error",
                    "message": f"No matching collection {collection_id=} with current authorization.",
                }
            ),
            404,
        )

    collection = Collection(**doc)

    samples = list(
        get_samples_summary(
            match={
                "relationships.type": "collections",
                "relationships.immutable_id": collection.immutable_id,
            },
            project={"collections": 0},
        )
    )

    collection.num_items = len(samples)

    return jsonify(
        {
            "status": "success",
            "collection_id": collection_id,
            "data": json.loads(collection.json(exclude_unset=True)),
            "child_items": list(samples),
        }
    )

create_collection()

Source code in pydatalab/routes/v0_1/collections.py
@COLLECTIONS.route("/collections", methods=["PUT"])
def create_collection():
    request_json = request.get_json()  # noqa: F821 pylint: disable=undefined-variable
    data = request_json.get("data", {})
    copy_from_id = request_json.get("copy_from_collection_id", None)
    starting_members = data.get("starting_members", [])

    if not current_user.is_authenticated and not CONFIG.TESTING:
        return (
            dict(
                status="error",
                message="Unable to create new collection without user authentication.",
                collection_id=data.get("collection_id"),
            ),
            401,
        )

    if copy_from_id:
        raise NotImplementedError("Copying collections is not yet implemented.")

    if CONFIG.TESTING:
        data["creator_ids"] = [24 * "0"]
        data["creators"] = [{"display_name": "Public testing user"}]
    else:
        data["creator_ids"] = [current_user.person.immutable_id]
        data["creators"] = [
            {
                "display_name": current_user.person.display_name,
                "contact_email": current_user.person.contact_email,
            }
        ]

    # check to make sure that item_id isn't taken already
    if flask_mongo.db.collections.find_one({"collection_id": data["collection_id"]}):
        return (
            dict(
                status="error",
                message=f"collection_id_validation_error: {data['collection_id']!r} already exists in database.",
                collection_id=data["collection_id"],
            ),
            409,  # 409: Conflict
        )

    data["last_modified"] = data.get(
        "last_modified", datetime.datetime.now(datetime.timezone.utc).isoformat()
    )

    try:
        data_model = Collection(**data)

    except ValidationError as error:
        return (
            dict(
                status="error",
                message=f"Unable to create new collection with ID {data['collection_id']}.",
                item_id=data["collection_id"],
                output=str(error),
            ),
            400,
        )

    result: InsertOneResult = flask_mongo.db.collections.insert_one(
        data_model.dict(exclude={"creators"})
    )
    if not result.acknowledged:
        return (
            dict(
                status="error",
                message=f"Failed to add new collection {data['collection_id']!r} to database.",
                collection_id=data["collection_id"],
                output=result.raw_result,
            ),
            400,
        )

    data_model.immutable_id = result.inserted_id

    errors = []
    if starting_members:
        item_ids = {d.get("item_id") for d in starting_members}
        if None in item_ids:
            item_ids.remove(None)

        results: UpdateResult = flask_mongo.db.items.update_many(
            {
                "item_id": {"$in": list(item_ids)},
                **get_default_permissions(user_only=True),
            },
            {
                "$push": {
                    "relationships": {
                        "type": "collections",
                        "immutable_id": data_model.immutable_id,
                    }
                }
            },
        )

        data_model.num_items = results.modified_count

        if results.modified_count < len(starting_members):
            errors = [
                item_id
                for item_id in starting_members
                if item_id not in results.raw_result.get("upserted", [])
            ]

    else:
        data_model.num_items = 0

    response = {
        "status": "success",
        "data": json.loads(data_model.json()),
    }

    if errors:
        response["warnings"] = [
            f"Unable to register {errors} to new collection {data_model.collection_id}"
        ]

    return (
        jsonify(response),
        201,  # 201: Created
    )

save_collection(collection_id)

Source code in pydatalab/routes/v0_1/collections.py
@COLLECTIONS.route("/collections/<collection_id>", methods=["PATCH"])
@logged_route
def save_collection(collection_id):
    request_json = request.get_json()  # noqa: F821 pylint: disable=undefined-variable
    updated_data = request_json.get("data")

    if not updated_data:
        return (
            jsonify(
                status="error",
                message=f"Unable to find any data in request to update {collection_id=} with.",
            ),
            204,  # 204: No content
        )

    # These keys should not be updated here and cannot be modified by the user through this endpoint
    for k in ("_id", "file_ObjectIds", "creators", "creator_ids", "collection_id"):
        if k in updated_data:
            del updated_data[k]

    updated_data["last_modified"] = datetime.datetime.now(datetime.timezone.utc).isoformat()

    collection = flask_mongo.db.collections.find_one(
        {"collection_id": collection_id, **get_default_permissions(user_only=True)}
    )

    if not collection:
        return (
            jsonify(
                status="error",
                message=f"Unable to find item with appropriate permissions and {collection_id=}.",
            ),
            400,
        )

    collection.update(updated_data)

    try:
        collection = Collection(**collection).dict()
    except ValidationError as exc:
        return (
            jsonify(
                status="error",
                message=f"Unable to update item {collection_id=} with new data {updated_data}",
                output=str(exc),
            ),
            400,
        )

    result: UpdateResult = flask_mongo.db.collections.update_one(
        {"collection_id": collection_id},
        {"$set": collection},
    )

    if result.modified_count != 1:
        return (
            jsonify(
                status="error",
                message=f"Unable to update item {collection_id=} with new data {updated_data}",
                output=result.raw_result,
            ),
            400,
        )

    return jsonify(status="success"), 200

delete_collection(collection_id: str)

Source code in pydatalab/routes/v0_1/collections.py
@COLLECTIONS.route("/collections/<collection_id>", methods=["DELETE"])
def delete_collection(collection_id: str):
    with flask_mongo.cx.start_session() as session:
        with session.start_transaction():
            collection_immutable_id = flask_mongo.db.collections.find_one(
                {"collection_id": collection_id, **get_default_permissions(user_only=True)},
                projection={"_id": 1},
            )["_id"]
            result = flask_mongo.db.collections.delete_one(
                {
                    "collection_id": collection_id,
                    **get_default_permissions(user_only=True, deleting=True),
                }
            )
            if result.deleted_count != 1:
                return (
                    jsonify(
                        {
                            "status": "error",
                            "message": f"Authorization required to attempt to delete collection with {collection_id=} from the database.",
                        }
                    ),
                    401,
                )

            # If successful, remove collection from all matching items relationships
            flask_mongo.db.items.update_many(
                {
                    "relationships": {
                        "$elemMatch": {
                            "immutable_id": collection_immutable_id,
                            "type": "collections",
                        }
                    }
                },
                {
                    "$pull": {
                        "relationships": {
                            "immutable_id": collection_immutable_id,
                            "type": "collections",
                        }
                    }
                },
            )

    return (
        jsonify(
            {
                "status": "success",
            }
        ),
        200,
    )

search_collections()

Source code in pydatalab/routes/v0_1/collections.py
@COLLECTIONS.route("/search-collections", methods=["GET"])
def search_collections():
    query = request.args.get("query", type=str)
    nresults = request.args.get("nresults", default=100, type=int)

    match_obj = {"$text": {"$search": query}, **get_default_permissions(user_only=True)}

    cursor = [
        json.loads(Collection(**doc).json(exclude_unset=True))
        for doc in flask_mongo.db.collections.aggregate(
            [
                {"$match": match_obj},
                {"$sort": {"score": {"$meta": "textScore"}}},
                {"$limit": nresults},
                {
                    "$project": {
                        "collection_id": 1,
                        "title": 1,
                    }
                },
            ]
        )
    ]

    return jsonify({"status": "success", "data": list(cursor)}), 200

add_items_to_collection(collection_id)

Source code in pydatalab/routes/v0_1/collections.py
@COLLECTIONS.route("/collections/<collection_id>", methods=["POST"])
def add_items_to_collection(collection_id):
    data = request.get_json()
    refcodes = data.get("data", {}).get("refcodes", [])

    collection = flask_mongo.db.collections.find_one(
        {"collection_id": collection_id, **get_default_permissions()}
    )

    if not collection:
        return jsonify({"error": "Collection not found"}), 404

    if not refcodes:
        return jsonify({"error": "No item provided"}), 400

    item_count = flask_mongo.db.items.count_documents(
        {"refcode": {"$in": refcodes}, **get_default_permissions()}
    )

    if item_count == 0:
        return jsonify({"error": "No matching items found"}), 404

    update_result = flask_mongo.db.items.update_many(
        {"refcode": {"$in": refcodes}, **get_default_permissions()},
        {
            "$addToSet": {
                "relationships": {
                    "description": "Is a member of",
                    "relation": None,
                    "type": "collections",
                    "immutable_id": ObjectId(collection["_id"]),
                    "item_id": None,
                    "refcode": None,
                }
            }
        },
    )

    if update_result.matched_count == 0:
        return (jsonify({"status": "error", "message": "Unable to add to collection."}), 400)

    if update_result.modified_count == 0:
        return (
            jsonify(
                {
                    "status": "success",
                    "message": "No update was performed",
                }
            ),
            200,
        )

    return (jsonify({"status": "success"}), 200)

files

FILES

_()

Source code in pydatalab/routes/v0_1/files.py
@FILES.before_request
@active_users_or_get_only
def _(): ...

get_file(file_id: str, filename: str)

Source code in pydatalab/routes/v0_1/files.py
@FILES.route("/files/<string:file_id>/<string:filename>", methods=["GET"])
def get_file(file_id: str, filename: str):
    try:
        _file_id = ObjectId(file_id)
    except InvalidId:
        # If the ID is invalid, then there will be no results in the database anyway,
        # so just 401
        _file_id = file_id
    if not pydatalab.mongo.flask_mongo.db.files.find_one(
        {"_id": _file_id, **get_default_permissions(user_only=False)}
    ):
        return (
            jsonify(
                {
                    "status": "error",
                    "title": "Not Authorized",
                    "detail": "Authorization required to access file",
                }
            ),
            401,
        )
    path = os.path.join(CONFIG.FILE_DIRECTORY, secure_filename(file_id))
    return send_from_directory(path, filename)

upload()

Upload a file to the server and save it to the database.

The file is received via a multipart/form-data request. Each file is a binary octet stream. The file is saved to the server in the configured file directory, under a subdirectory named with the created database ID for the file.

Additional POST parameters are required: - item_id: the ID of the item to which the file is attached - replace_file: the database ID of the file to replace, if any

Source code in pydatalab/routes/v0_1/files.py
@FILES.route("/upload-file/", methods=["POST"])
def upload():
    """Upload a file to the server and save it to the database.

    The file is received via a `multipart/form-data` request. Each
    file is a binary octet stream. The file is saved to the server
    in the configured file directory, under a subdirectory named with
    the created database ID for the file.

    Additional POST parameters are required:
        - `item_id`: the ID of the item to which the file is attached
        - `replace_file`: the database ID of the file to replace, if any

    """

    if not current_user.is_authenticated and not CONFIG.TESTING:
        return (
            jsonify(
                {
                    "status": "error",
                    "title": "Not Authorized",
                    "detail": "File upload requires login.",
                }
            ),
            401,
        )

    if len(request.files) == 0:
        return jsonify(error="No file in request"), 400
    if "item_id" not in request.form:
        return jsonify(error="No item id provided in form"), 400
    item_id = request.form["item_id"]
    replace_file_id = request.form["replace_file"]

    if not CONFIG.TESTING:
        creator_id = current_user.person.immutable_id
    else:
        creator_id = PUBLIC_USER_ID

    is_update = replace_file_id and replace_file_id != "null"
    file = request.files[next(iter(request.files))]
    if is_update:
        file_information = file_utils.update_uploaded_file(file, ObjectId(replace_file_id))
    else:
        file_information = file_utils.save_uploaded_file(
            file, item_ids=[item_id], creator_ids=[creator_id]
        )

    return (
        jsonify(
            {
                "status": "success",
                "file_id": str(file_information["_id"]),
                "file_information": file_information,
                "is_update": is_update,  # true if update, false if new file
            }
        ),
        201,
    )

add_remote_file_to_sample()

Source code in pydatalab/routes/v0_1/files.py
@FILES.route("/add-remote-file-to-sample/", methods=["POST"])
def add_remote_file_to_sample():
    if not current_user.is_authenticated and not CONFIG.TESTING:
        return (
            jsonify(
                {
                    "status": "error",
                    "title": "Not Authorized",
                    "detail": "Adding a file to a sample requires login.",
                }
            ),
            401,
        )

    request_json = request.get_json()
    item_id = request_json["item_id"]
    file_entry = request_json["file_entry"]

    if not CONFIG.TESTING:
        creator_id = current_user.person.immutable_id
    else:
        creator_id = ObjectId(24 * "0")

    updated_file_entry = file_utils.add_file_from_remote_directory(
        file_entry, item_id, creator_ids=[creator_id]
    )

    return (
        jsonify(
            {
                "status": "success",
                "file_id": str(updated_file_entry["_id"]),
                "file_information": updated_file_entry,
            }
        ),
        201,
    )

delete_file_from_sample()

Remove a file from a sample, but don't delete the actual file (for now)

Source code in pydatalab/routes/v0_1/files.py
@FILES.route("/delete-file-from-sample/", methods=["POST"])
def delete_file_from_sample():
    """Remove a file from a sample, but don't delete the actual file (for now)"""

    if not current_user.is_authenticated and not CONFIG.TESTING:
        return (
            jsonify(
                {
                    "status": "error",
                    "title": "Not Authorized",
                    "detail": "Adding a file to a sample requires login.",
                }
            ),
            401,
        )

    request_json = request.get_json()

    item_id = request_json["item_id"]
    file_id = ObjectId(request_json["file_id"])
    result = pydatalab.mongo.flask_mongo.db.items.update_one(
        {"item_id": item_id, **get_default_permissions(user_only=True)},
        {"$pull": {"file_ObjectIds": file_id}},
    )
    if result.modified_count != 1:
        return (
            jsonify(
                status="error",
                message=f"Not authorized to perform file removal from sample {item_id=}",
                output=result.raw_result,
            ),
            401,
        )
    updated_file_entry = pydatalab.mongo.flask_mongo.db.files.find_one_and_update(
        {"_id": file_id},
        {"$pull": {"item_ids": item_id}},
        return_document=ReturnDocument.AFTER,
    )

    if not updated_file_entry:
        return (
            jsonify(
                status="error",
                message=f"{item_id} {file_id} delete failed. Something went wrong with the db call to remove sample from file",
            ),
            400,
        )

    return (
        jsonify(
            {
                "status": "success",
                "new_file_obj": {request_json["file_id"]: updated_file_entry},
            }
        ),
        200,
    )

delete_file()

delete a data file from the uploads/item_id folder

Source code in pydatalab/routes/v0_1/files.py
@FILES.route("/delete-file/", methods=["POST"])
def delete_file():
    """delete a data file from the uploads/item_id folder"""

    if not current_user.is_authenticated and not CONFIG.TESTING:
        return (
            jsonify(
                {
                    "status": "error",
                    "title": "Not Authorized",
                    "detail": "Adding a file to a sample requires login.",
                }
            ),
            401,
        )

    request_json = request.get_json()

    item_id = request_json["item_id"]
    filename = request_json["filename"]

    secure_item_id = secure_filename(item_id)
    secure_fname = secure_filename(filename)

    path = os.path.join(CONFIG.FILE_DIRECTORY, secure_item_id, secure_fname)

    if not os.path.isfile(path):
        return (
            jsonify(
                status="error",
                message="Delete failed. file not found: {}".format(path),
            ),
            400,
        )

    result = pydatalab.mongo.flask_mongo.db.items.update_one(
        {"item_id": item_id, **get_default_permissions(user_only=True)},
        {"$pull": {"files": filename}},
        return_document=ReturnDocument.AFTER,
    )
    if result.matched_count != 1:
        return (
            jsonify(
                status="error",
                message=f"{item_id} {filename} delete failed. Something went wrong with the db call. File not deleted.",
                output=result.raw_result,
            ),
            400,
        )
    os.remove(path)

    return jsonify({"status": "success"}), 200

graphs

GRAPHS

get_graph_cy_format(item_id: Optional[str] = None, collection_id: Optional[str] = None)

Source code in pydatalab/routes/v0_1/graphs.py
@GRAPHS.route("/item-graph", methods=["GET"])
@GRAPHS.route("/item-graph/<item_id>", methods=["GET"])
def get_graph_cy_format(item_id: Optional[str] = None, collection_id: Optional[str] = None):
    collection_id = request.args.get("collection_id", type=str)

    if item_id is None:
        if collection_id is not None:
            collection_immutable_id = flask_mongo.db.collections.find_one(
                {"collection_id": collection_id}, projection={"_id": 1}
            )
            if not collection_immutable_id:
                raise RuntimeError("No collection {collection_id=} found.")
            collection_immutable_id = collection_immutable_id["_id"]
            query = {
                "$and": [
                    {"relationships.immutable_id": collection_immutable_id},
                    {"relationships.type": "collections"},
                ]
            }
        else:
            query = {}
        all_documents = flask_mongo.db.items.find(
            {**query, **get_default_permissions(user_only=False)},
            projection={"item_id": 1, "name": 1, "type": 1, "relationships": 1},
        )
        node_ids: Set[str] = {document["item_id"] for document in all_documents}
        all_documents.rewind()

    else:
        all_documents = list(
            flask_mongo.db.items.find(
                {
                    "$or": [{"item_id": item_id}, {"relationships.item_id": item_id}],
                    **get_default_permissions(user_only=False),
                },
                projection={"item_id": 1, "name": 1, "type": 1, "relationships": 1},
            )
        )

        node_ids = {document["item_id"] for document in all_documents} | {
            relationship.get("item_id")
            for document in all_documents
            for relationship in document.get("relationships", [])
        }
        if len(node_ids) > 1:
            or_query = [{"item_id": id} for id in node_ids if id != item_id]
            next_shell = flask_mongo.db.items.find(
                {
                    "$or": or_query,
                    **get_default_permissions(user_only=False),
                },
                projection={"item_id": 1, "name": 1, "type": 1, "relationships": 1},
            )

            all_documents.extend(next_shell)
            node_ids = node_ids | {document["item_id"] for document in all_documents}

    nodes = []
    edges = []

    # Collect the elements that have already been added to the graph, to avoid duplication
    drawn_elements = set()
    node_collections = set()
    for document in all_documents:
        # for some reason, document["relationships"] is sometimes equal to None, so we
        # need this `or` statement.
        for relationship in document.get("relationships") or []:
            # only considering child-parent relationships
            if relationship.get("type") == "collections" and not collection_id:
                collection_data = flask_mongo.db.collections.find_one(
                    {
                        "_id": relationship["immutable_id"],
                        **get_default_permissions(user_only=False),
                    },
                    projection={"collection_id": 1, "title": 1, "type": 1},
                )
                if collection_data:
                    if relationship["immutable_id"] not in node_collections:
                        _id = f'Collection: {collection_data["collection_id"]}'
                        if _id not in drawn_elements:
                            nodes.append(
                                {
                                    "data": {
                                        "id": _id,
                                        "name": collection_data["title"],
                                        "type": collection_data["type"],
                                        "shape": "triangle",
                                    }
                                }
                            )
                            node_collections.add(relationship["immutable_id"])
                            drawn_elements.add(_id)

                    source = f'Collection: {collection_data["collection_id"]}'
                    target = document.get("item_id")
                    edges.append(
                        {
                            "data": {
                                "id": f"{source}->{target}",
                                "source": source,
                                "target": target,
                                "value": 1,
                            }
                        }
                    )
                continue

        for relationship in document.get("relationships") or []:
            # only considering child-parent relationships:
            if relationship.get("relation") not in ("parent", "is_part_of"):
                continue

            target = document["item_id"]
            source = relationship["item_id"]
            if source not in node_ids:
                continue
            edge_id = f"{source}->{target}"
            if edge_id not in drawn_elements:
                drawn_elements.add(edge_id)
                edges.append(
                    {
                        "data": {
                            "id": edge_id,
                            "source": source,
                            "target": target,
                            "value": 1,
                        }
                    }
                )

        if document["item_id"] not in drawn_elements:
            drawn_elements.add(document["item_id"])
            nodes.append(
                {
                    "data": {
                        "id": document["item_id"],
                        "name": document["name"],
                        "type": document["type"],
                        "special": document["item_id"] == item_id,
                    }
                }
            )

    # We want to filter out all the starting materials that don't have relationships since there are so many of them:
    whitelist = {edge["data"]["source"] for edge in edges}

    nodes = [
        node
        for node in nodes
        if node["data"]["type"] in ("samples", "cells") or node["data"]["id"] in whitelist
    ]

    return (jsonify(status="success", nodes=nodes, edges=edges), 200)

healthcheck

HEALTHCHECK

is_ready()

Source code in pydatalab/routes/v0_1/healthcheck.py
@HEALTHCHECK.route("/healthcheck/is_ready", methods=["GET"])
def is_ready():
    from pydatalab.mongo import check_mongo_connection

    try:
        check_mongo_connection()
    except RuntimeError:
        return (
            jsonify(status="error", message="Unable to connect to MongoDB at specified URI."),
            500,
        )
    return (jsonify(status="success", message="Server and database are ready"), 200)

is_alive()

Source code in pydatalab/routes/v0_1/healthcheck.py
@HEALTHCHECK.route("/healthcheck/is_alive", methods=["GET"])
def is_alive():
    return (jsonify(status="success", message="Server is alive"), 200)

info

This submodule defines introspective info endpoints of the API.

INFO

SCHEMAS

Attributes (BaseModel) pydantic-model

Source code in pydatalab/routes/v0_1/info.py
class Attributes(BaseModel):
    class Config:
        extra = "allow"

Meta (BaseModel) pydantic-model

Source code in pydatalab/routes/v0_1/info.py
class Meta(BaseModel):
    timestamp: datetime = Field(default_factory=datetime.now)
    query: str = ""
    api_version: str = __api_version__
    available_api_versions: List[str] = [__api_version__]
    server_version: str = __version__
    datamodel_version: str = __version__
timestamp: datetime pydantic-field
query: str pydantic-field
api_version: str pydantic-field
available_api_versions: List[str] pydantic-field
server_version: str pydantic-field
datamodel_version: str pydantic-field
Source code in pydatalab/routes/v0_1/info.py
class Links(BaseModel):
    self: AnyUrl

    class Config:
        extra = "allow"
self: AnyUrl pydantic-field required

Data (BaseModel) pydantic-model

Source code in pydatalab/routes/v0_1/info.py
class Data(BaseModel):
    id: str
    type: str
    attributes: Attributes
id: str pydantic-field required
type: str pydantic-field required
attributes: Attributes pydantic-field required

JSONAPIResponse (BaseModel) pydantic-model

Source code in pydatalab/routes/v0_1/info.py
class JSONAPIResponse(BaseModel):
    data: Union[Data, List[Data]]
    meta: Meta
    links: Optional[Links]
data: Union[pydatalab.routes.v0_1.info.Data, List[pydatalab.routes.v0_1.info.Data]] pydantic-field required
meta: Meta pydantic-field required

MetaPerson (BaseModel) pydantic-model

Source code in pydatalab/routes/v0_1/info.py
class MetaPerson(BaseModel):
    dislay_name: Optional[str]
    contact_email: str
dislay_name: str pydantic-field
contact_email: str pydantic-field required

Info (Attributes, Meta) pydantic-model

Source code in pydatalab/routes/v0_1/info.py
class Info(Attributes, Meta):
    maintainer: Optional[MetaPerson]
    issue_tracker: Optional[AnyUrl]
    homepage: Optional[AnyUrl]
    source_repository: Optional[AnyUrl]
    identifier_prefix: str
    features: FeatureFlags = FEATURE_FLAGS

    @validator("maintainer")
    def strip_maintainer_fields(cls, v):
        if isinstance(v, Person):
            return MetaPerson(contact_email=v.contact_email, display_name=v.display_name)
        return v
maintainer: MetaPerson pydantic-field
issue_tracker: AnyUrl pydantic-field
homepage: AnyUrl pydantic-field
source_repository: AnyUrl pydantic-field
identifier_prefix: str pydantic-field required
features: FeatureFlags pydantic-field
strip_maintainer_fields(v) classmethod
Source code in pydatalab/routes/v0_1/info.py
@validator("maintainer")
def strip_maintainer_fields(cls, v):
    if isinstance(v, Person):
        return MetaPerson(contact_email=v.contact_email, display_name=v.display_name)
    return v

get_info()

Returns the runtime metadata for the deployment, e.g., versions, features and so on.

Source code in pydatalab/routes/v0_1/info.py
@INFO.route("/info", methods=["GET"])
def get_info():
    """Returns the runtime metadata for the deployment, e.g.,
    versions, features and so on.

    """
    metadata = _get_deployment_metadata_once()

    return (
        jsonify(
            json.loads(
                JSONAPIResponse(
                    data=Data(id="/", type="info", attributes=Info(**metadata)),
                    meta=Meta(query=request.query_string),
                    links=Links(self=request.url),
                ).json()
            )
        ),
        200,
    )

get_stats()

Returns a dictionary of counts of each entry type in the deployment

Source code in pydatalab/routes/v0_1/info.py
@INFO.route("/info/stats", methods=["GET"])
def get_stats():
    """Returns a dictionary of counts of each entry type in the deployment"""

    user_count = flask_mongo.db.users.count_documents({})
    sample_count = flask_mongo.db.items.count_documents({"type": "samples"})
    cell_count = flask_mongo.db.items.count_documents({"type": "cells"})

    return (
        jsonify({"counts": {"users": user_count, "samples": sample_count, "cells": cell_count}}),
        200,
    )

list_block_types()

Returns a list of all blocks implemented in this server.

Source code in pydatalab/routes/v0_1/info.py
@INFO.route("/info/blocks", methods=["GET"])
def list_block_types():
    """Returns a list of all blocks implemented in this server."""
    return jsonify(
        json.loads(
            JSONAPIResponse(
                data=[
                    Data(
                        id=block_type,
                        type="block_type",
                        attributes={
                            "name": getattr(block, "name", ""),
                            "description": getattr(block, "description", ""),
                            "version": getattr(block, "version", __version__),
                            "accepted_file_extensions": getattr(
                                block, "accepted_file_extensions", []
                            ),
                        },
                    )
                    for block_type, block in BLOCK_TYPES.items()
                ],
                meta=Meta(query=request.query_string),
            ).json()
        )
    )

get_all_items_models()

Source code in pydatalab/routes/v0_1/info.py
def get_all_items_models():
    return Item.__subclasses__()

generate_schemas()

Source code in pydatalab/routes/v0_1/info.py
def generate_schemas():
    schemas: dict[str, dict] = {}

    for model_class in get_all_items_models() + [Collection]:
        model_type = model_class.schema()["properties"]["type"]["default"]

        schemas[model_type] = model_class.schema(by_alias=False)

    return schemas

list_supported_types()

Returns a list of supported schemas.

Source code in pydatalab/routes/v0_1/info.py
@INFO.route("/info/types", methods=["GET"])
def list_supported_types():
    """Returns a list of supported schemas."""

    return jsonify(
        json.loads(
            JSONAPIResponse(
                data=[
                    Data(
                        id=item_type,
                        type="item_type",
                        attributes={
                            "version": __version__,
                            "api_version": __api_version__,
                            "schema": schema,
                        },
                    )
                    for item_type, schema in SCHEMAS.items()
                ],
                meta=Meta(query=request.query_string),
            ).json()
        )
    )

get_schema_type(item_type)

Returns the schema of the given type.

Source code in pydatalab/routes/v0_1/info.py
@INFO.route("/info/types/<string:item_type>", methods=["GET"])
def get_schema_type(item_type):
    """Returns the schema of the given type."""
    if item_type not in SCHEMAS:
        return jsonify(
            {"status": "error", "detail": f"Item type {item_type} not found for this deployment"}
        ), 404

    return jsonify(
        json.loads(
            JSONAPIResponse(
                data=Data(
                    id=item_type,
                    type="item_type",
                    attributes={
                        "version": __version__,
                        "api_version": __api_version__,
                        "schema": SCHEMAS[item_type],
                    },
                ),
                meta=Meta(query=request.query_string),
            ).json()
        )
    )

items

ITEMS

_()

Source code in pydatalab/routes/v0_1/items.py
@ITEMS.before_request
@active_users_or_get_only
def _(): ...

reserialize_blocks(display_order: List[str], blocks_obj: Dict[str, Dict]) -> Dict[str, Dict]

Create the corresponding Python objects from JSON block data, then serialize it again as JSON to populate any missing properties.

Parameters:

Name Type Description Default
blocks_obj Dict[str, Dict]

A dictionary containing the JSON block data, keyed by block ID.

required

Returns:

Type Description
Dict[str, Dict]

A dictionary with the re-serialized block data.

Source code in pydatalab/routes/v0_1/items.py
def reserialize_blocks(display_order: List[str], blocks_obj: Dict[str, Dict]) -> Dict[str, Dict]:
    """Create the corresponding Python objects from JSON block data, then
    serialize it again as JSON to populate any missing properties.

    Parameters:
        blocks_obj: A dictionary containing the JSON block data, keyed by block ID.

    Returns:
        A dictionary with the re-serialized block data.

    """
    for block_id in display_order:
        try:
            block_data = blocks_obj[block_id]
        except KeyError:
            LOGGER.warning(f"block_id {block_id} found in display order but not in blocks_obj")
            continue
        blocktype = block_data["blocktype"]
        blocks_obj[block_id] = (
            BLOCK_TYPES.get(blocktype, BLOCK_TYPES["notsupported"]).from_db(block_data).to_web()
        )

    return blocks_obj

dereference_files(file_ids: List[Union[str, bson.objectid.ObjectId]]) -> Dict[str, Dict]

For a list of Object IDs (as strings or otherwise), query the files collection and return a dictionary of the data stored under each ID.

Parameters:

Name Type Description Default
file_ids List[Union[str, bson.objectid.ObjectId]]

The list of IDs of files to return;

required

Returns:

Type Description
Dict[str, Dict]

The dereferenced data as a dictionary with (string) ID keys.

Source code in pydatalab/routes/v0_1/items.py
def dereference_files(file_ids: List[Union[str, ObjectId]]) -> Dict[str, Dict]:
    """For a list of Object IDs (as strings or otherwise), query the files collection
    and return a dictionary of the data stored under each ID.

    Parameters:
        file_ids: The list of IDs of files to return;

    Returns:
        The dereferenced data as a dictionary with (string) ID keys.

    """
    results = {
        str(f["_id"]): f
        for f in flask_mongo.db.files.find(
            {
                "_id": {"$in": [ObjectId(_id) for _id in file_ids]},
            }
        )
    }
    if len(results) != len(file_ids):
        raise RuntimeError(
            "Some file IDs did not have corresponding database entries.\n"
            f"Returned: {list(results.keys())}\n"
            f"Requested: {file_ids}\n"
        )

    return results

get_equipment_summary()

Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/equipment/", methods=["GET"])
def get_equipment_summary():
    _project = {
        "_id": 0,
        "item_id": 1,
        "name": 1,
        "type": 1,
        "date": 1,
        "refcode": 1,
        "location": 1,
    }

    items = [
        doc
        for doc in flask_mongo.db.items.aggregate(
            [
                {
                    "$match": {
                        "type": "equipment",
                    }
                },
                {"$project": _project},
            ]
        )
    ]
    return jsonify({"status": "success", "items": items})

get_starting_materials()

Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/starting-materials/", methods=["GET"])
def get_starting_materials():
    items = [
        doc
        for doc in flask_mongo.db.items.aggregate(
            [
                {
                    "$match": {
                        "type": "starting_materials",
                        **get_default_permissions(user_only=False),
                    }
                },
                {
                    "$project": {
                        "_id": 0,
                        "item_id": 1,
                        "nblocks": {"$size": "$display_order"},
                        "date": 1,
                        "chemform": 1,
                        "name": 1,
                        "type": 1,
                        "chemical_purity": 1,
                        "supplier": 1,
                        "location": 1,
                    }
                },
            ]
        )
    ]
    return jsonify({"status": "success", "items": items})

get_samples_summary(match: Optional[Dict] = None, project: Optional[Dict] = None) -> CommandCursor

Return a summary of item entries that match some criteria.

Parameters:

Name Type Description Default
match Optional[Dict]

A MongoDB aggregation match query to filter the results.

None
project Optional[Dict]

A MongoDB aggregation project query to filter the results, relative to the default included below.

None
Source code in pydatalab/routes/v0_1/items.py
def get_samples_summary(
    match: Optional[Dict] = None, project: Optional[Dict] = None
) -> CommandCursor:
    """Return a summary of item entries that match some criteria.

    Parameters:
        match: A MongoDB aggregation match query to filter the results.
        project: A MongoDB aggregation project query to filter the results, relative
            to the default included below.

    """
    if not match:
        match = {}
    match.update(get_default_permissions(user_only=False))
    match["type"] = {"$in": ["samples", "cells"]}

    _project = {
        "_id": 0,
        "blocks": {"blocktype": 1, "title": 1},
        "creators": {
            "display_name": 1,
            "contact_email": 1,
        },
        "collections": {
            "collection_id": 1,
            "title": 1,
        },
        "item_id": 1,
        "name": 1,
        "chemform": 1,
        "nblocks": {"$size": "$display_order"},
        "characteristic_chemical_formula": 1,
        "type": 1,
        "date": 1,
        "refcode": 1,
    }

    # Cannot mix 0 and 1 keys in MongoDB project so must loop and check
    if project:
        for key in project:
            if project[key] == 0:
                _project.pop(key, None)
            else:
                _project[key] = 1

    return flask_mongo.db.items.aggregate(
        [
            {"$match": match},
            {"$lookup": creators_lookup()},
            {"$lookup": collections_lookup()},
            {"$project": _project},
            {"$sort": {"date": -1}},
        ]
    )

creators_lookup() -> Dict

Source code in pydatalab/routes/v0_1/items.py
def creators_lookup() -> Dict:
    return {
        "from": "users",
        "let": {"creator_ids": "$creator_ids"},
        "pipeline": [
            {"$match": {"$expr": {"$in": ["$_id", {"$ifNull": ["$$creator_ids", []]}]}}},
            {"$addFields": {"__order": {"$indexOfArray": ["$$creator_ids", "$_id"]}}},
            {"$sort": {"__order": 1}},
            {"$project": {"_id": 1, "display_name": 1, "contact_email": 1}},
        ],
        "as": "creators",
    }

files_lookup() -> Dict

Source code in pydatalab/routes/v0_1/items.py
def files_lookup() -> Dict:
    return {
        "from": "files",
        "localField": "file_ObjectIds",
        "foreignField": "_id",
        "as": "files",
    }

collections_lookup() -> Dict

Looks inside the relationships of the item, searches for IDs in the collections table and then projects only the collection ID and name for the response.

Source code in pydatalab/routes/v0_1/items.py
def collections_lookup() -> Dict:
    """Looks inside the relationships of the item, searches for IDs in the collections
    table and then projects only the collection ID and name for the response.

    """

    return {
        "from": "collections",
        "let": {"collection_ids": "$relationships.immutable_id"},
        "pipeline": [
            {
                "$match": {
                    "$expr": {
                        "$in": ["$_id", {"$ifNull": ["$$collection_ids", []]}],
                    },
                    "type": "collections",
                }
            },
            {"$project": {"_id": 1, "collection_id": 1}},
        ],
        "as": "collections",
    }

get_samples()

Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/samples/", methods=["GET"])
def get_samples():
    return jsonify({"status": "success", "samples": list(get_samples_summary())})

search_items()

Perform free text search on items and return the top results. GET parameters: query: String with the search terms. nresults: Maximum number of (default 100) !!! types "If None, search all types of items. Otherwise, a list of strings" giving the types to consider. (e.g. ["samples","starting_materials"])

Returns:

Type Description

response list of dictionaries containing the matching items in order of descending match score.

Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/search-items/", methods=["GET"])
def search_items():
    """Perform free text search on items and return the top results.
    GET parameters:
        query: String with the search terms.
        nresults: Maximum number of  (default 100)
        types: If None, search all types of items. Otherwise, a list of strings
               giving the types to consider. (e.g. ["samples","starting_materials"])

    Returns:
        response list of dictionaries containing the matching items in order of
        descending match score.
    """

    query = request.args.get("query", type=str)
    nresults = request.args.get("nresults", default=100, type=int)
    types = request.args.get("types", default=None)
    if isinstance(types, str):
        # should figure out how to parse as list automatically
        types = types.split(",")

    match_obj = {
        "$text": {"$search": query},
        **get_default_permissions(user_only=False),
    }
    if types is not None:
        match_obj["type"] = {"$in": types}

    cursor = flask_mongo.db.items.aggregate(
        [
            {"$match": match_obj},
            {"$sort": {"score": {"$meta": "textScore"}}},
            {"$limit": nresults},
            {
                "$project": {
                    "_id": 0,
                    "type": 1,
                    "item_id": 1,
                    "name": 1,
                    "chemform": 1,
                    "refcode": 1,
                }
            },
        ]
    )

    return jsonify({"status": "success", "items": list(cursor)}), 200

create_sample()

Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/new-sample/", methods=["POST"])
def create_sample():
    request_json = request.get_json()  # noqa: F821 pylint: disable=undefined-variable
    if "new_sample_data" in request_json:
        response, http_code = _create_sample(
            sample_dict=request_json["new_sample_data"],
            copy_from_item_id=request_json.get("copy_from_item_id"),
            generate_id_automatically=request_json.get("generate_id_automatically", False),
        )
    else:
        response, http_code = _create_sample(request_json)

    return jsonify(response), http_code

create_samples()

attempt to create multiple samples at once. Because each may result in success or failure, 207 is returned along with a json field containing all the individual http_codes

Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/new-samples/", methods=["POST"])
def create_samples():
    """attempt to create multiple samples at once.
    Because each may result in success or failure, 207 is returned along with a
    json field containing all the individual http_codes"""

    request_json = request.get_json()  # noqa: F821 pylint: disable=undefined-variable

    sample_jsons = request_json["new_sample_datas"]
    copy_from_item_ids = request_json.get("copy_from_item_ids")
    generate_ids_automatically = request_json.get("generate_ids_automatically")

    if copy_from_item_ids is None:
        copy_from_item_ids = [None] * len(sample_jsons)

    outputs = [
        _create_sample(
            sample_dict=sample_json,
            copy_from_item_id=copy_from_item_id,
            generate_id_automatically=generate_ids_automatically,
        )
        for sample_json, copy_from_item_id in zip(sample_jsons, copy_from_item_ids)
    ]
    responses, http_codes = zip(*outputs)

    statuses = [response["status"] for response in responses]
    nsuccess = statuses.count("success")
    nerror = statuses.count("error")

    return (
        jsonify(
            nsuccess=nsuccess,
            nerror=nerror,
            responses=responses,
            http_codes=http_codes,
        ),
        207,
    )  # 207: multi-status

update_item_permissions(refcode: str)

Update the permissions of an item with the given refcode.

Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/items/<refcode>/permissions", methods=["PATCH"])
def update_item_permissions(refcode: str):
    """Update the permissions of an item with the given refcode."""

    request_json = request.get_json()
    creator_ids: list[ObjectId] = []

    if not len(refcode.split(":")) == 2:
        refcode = f"{CONFIG.IDENTIFIER_PREFIX}:{refcode}"

    current_item = flask_mongo.db.items.find_one(
        {"refcode": refcode, **get_default_permissions(user_only=True)},
        {"_id": 1, "creator_ids": 1},
    )  # type: ignore

    if not current_item:
        return (
            jsonify(
                {
                    "status": "error",
                    "message": f"No valid item found with the given {refcode=}.",
                }
            ),
            401,
        )

    current_creator_ids = current_item["creator_ids"]

    if "creators" in request_json:
        creator_ids = [
            ObjectId(creator.get("immutable_id", None))
            for creator in request_json["creators"]
            if creator.get("immutable_id", None) is not None
        ]

    if not creator_ids:
        return (
            jsonify(
                {
                    "status": "error",
                    "message": "No valid creator IDs found in the request.",
                }
            ),
            400,
        )

    # Validate all creator IDs are present in the database
    found_ids = [d for d in flask_mongo.db.users.find({"_id": {"$in": creator_ids}}, {"_id": 1})]  # type: ignore
    if not len(found_ids) == len(creator_ids):
        return (
            jsonify(
                {
                    "status": "error",
                    "message": "One or more creator IDs not found in the database.",
                }
            ),
            400,
        )

    # Make sure a user cannot remove their own access to an item
    current_user_id = current_user.person.immutable_id
    try:
        creator_ids.remove(current_user_id)
    except ValueError:
        pass
    creator_ids.insert(0, current_user_id)

    # The first ID in the creator list takes precedence; always make sure this is included to avoid orphaned items
    if current_creator_ids:
        base_owner = current_creator_ids[0]
        try:
            creator_ids.remove(base_owner)
        except ValueError:
            pass
        creator_ids.insert(0, base_owner)

    if set(creator_ids) == set(current_creator_ids):
        # Short circuit if the creator IDs are the same
        return jsonify({"status": "success"}), 200

    LOGGER.warning("Setting permissions for item %s to %s", refcode, creator_ids)
    result = flask_mongo.db.items.update_one(
        {"refcode": refcode, **get_default_permissions(user_only=True)},
        {"$set": {"creator_ids": creator_ids}},
    )

    if not result.modified_count == 1:
        return jsonify(
            {
                "status": "error",
                "message": "Failed to update permissions: you cannot remove yourself or the base owner as a creator.",
            }
        ), 400

    return jsonify({"status": "success"}), 200

delete_sample()

Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/delete-sample/", methods=["POST"])
def delete_sample():
    request_json = request.get_json()  # noqa: F821 pylint: disable=undefined-variable
    item_id = request_json["item_id"]

    result = flask_mongo.db.items.delete_one(
        {"item_id": item_id, **get_default_permissions(user_only=True, deleting=True)}
    )

    if result.deleted_count != 1:
        return (
            jsonify(
                {
                    "status": "error",
                    "message": f"Authorization required to attempt to delete sample with {item_id=} from the database.",
                }
            ),
            401,
        )
    return (
        jsonify(
            {
                "status": "success",
            }
        ),
        200,
    )

get_item_data(item_id: str | None = None, refcode: str | None = None, load_blocks: bool = False)

Generates a JSON response for the item with the given item_id, or refcode additionally resolving relationships to files and other items.

Parameters:

Name Type Description Default
load_blocks bool

Whether to regenerate any data blocks associated with this sample (i.e., create the Python object corresponding to the block and call its render function).

False
Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/items/<refcode>", methods=["GET"])
@ITEMS.route("/get-item-data/<item_id>", methods=["GET"])
def get_item_data(
    item_id: str | None = None, refcode: str | None = None, load_blocks: bool = False
):
    """Generates a JSON response for the item with the given `item_id`,
    or `refcode` additionally resolving relationships to files and other items.

    Parameters:
       load_blocks: Whether to regenerate any data blocks associated with this
           sample (i.e., create the Python object corresponding to the block and
           call its render function).

    """
    redirect_to_ui = bool(request.args.get("redirect-to-ui", default=False, type=json.loads))
    if refcode and redirect_to_ui and CONFIG.APP_URL:
        return redirect(f"{CONFIG.APP_URL}/items/{refcode}", code=307)

    if item_id:
        match = {"item_id": item_id}
    elif refcode:
        if not len(refcode.split(":")) == 2:
            refcode = f"{CONFIG.IDENTIFIER_PREFIX}:{refcode}"

        match = {"refcode": refcode}
    else:
        return (
            jsonify(
                {
                    "status": "error",
                    "message": "No item_id or refcode provided.",
                }
            ),
            400,
        )

    # retrieve the entry from the database:
    cursor = flask_mongo.db.items.aggregate(
        [
            {
                "$match": {
                    **match,
                    **get_default_permissions(user_only=False),
                }
            },
            {"$lookup": creators_lookup()},
            {"$lookup": collections_lookup()},
            {"$lookup": files_lookup()},
        ],
    )

    try:
        doc = list(cursor)[0]
    except IndexError:
        doc = None

    if not doc or (
        not current_user.is_authenticated
        and not CONFIG.TESTING
        and not doc["type"] == "starting_materials"
    ):
        return (
            jsonify(
                {
                    "status": "error",
                    "message": f"No matching items for {match=} with current authorization.",
                }
            ),
            404,
        )

    # determine the item type and validate according to the appropriate schema
    try:
        ItemModel = ITEM_MODELS[doc["type"]]
    except KeyError:
        if "type" in doc:
            raise KeyError(f"Item {item_id=} has invalid type: {doc['type']}")
        else:
            raise KeyError(f"Item {item_id=} has no type field in document.")

    doc = ItemModel(**doc)
    if load_blocks:
        doc.blocks_obj = reserialize_blocks(doc.display_order, doc.blocks_obj)

    # find any documents with relationships that mention this document
    relationships_query_results = flask_mongo.db.items.find(
        filter={
            "$or": [
                {"relationships.item_id": doc.item_id},
                {"relationships.refcode": doc.refcode},
                {"relationships.immutable_id": doc.immutable_id},
            ]
        },
        projection={
            "item_id": 1,
            "refcode": 1,
            "relationships": {
                "$elemMatch": {
                    "$or": [
                        {"item_id": doc.item_id},
                        {"refcode": doc.refcode},
                    ],
                },
            },
        },
    )

    # loop over and collect all 'outer' relationships presented by other items
    incoming_relationships: Dict[RelationshipType, Set[str]] = {}
    for d in relationships_query_results:
        for k in d["relationships"]:
            if k["relation"] not in incoming_relationships:
                incoming_relationships[k["relation"]] = set()
            incoming_relationships[k["relation"]].add(
                d["item_id"] or d["refcode"] or d["immutable_id"]
            )

    # loop over and aggregate all 'inner' relationships presented by this item
    inlined_relationships: Dict[RelationshipType, Set[str]] = {}
    if doc.relationships is not None:
        inlined_relationships = {
            relation: {
                d.item_id or d.refcode or d.immutable_id
                for d in doc.relationships
                if d.relation == relation
            }
            for relation in RelationshipType
        }

    # reunite parents and children from both directions of the relationships field
    parents = incoming_relationships.get(RelationshipType.CHILD, set()).union(
        inlined_relationships.get(RelationshipType.PARENT, set())
    )
    children = incoming_relationships.get(RelationshipType.PARENT, set()).union(
        inlined_relationships.get(RelationshipType.CHILD, set())
    )

    # Must be exported to JSON first to apply the custom pydantic JSON encoders
    return_dict = json.loads(doc.json(exclude_unset=True))

    if item_id is None:
        item_id = return_dict["item_id"]

    # create the files_data dictionary keyed by file ObjectId
    files_data: Dict[ObjectId, Dict] = {
        f["immutable_id"]: f for f in return_dict.get("files") or []
    }

    return jsonify(
        {
            "status": "success",
            "item_id": item_id,
            "item_data": return_dict,
            "files_data": files_data,
            "child_items": sorted(children),
            "parent_items": sorted(parents),
        }
    )

save_item()

Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/save-item/", methods=["POST"])
def save_item():
    request_json = request.get_json()  # noqa: F821 pylint: disable=undefined-variable

    item_id = request_json["item_id"]
    updated_data = request_json["data"]

    # These keys should not be updated here and cannot be modified by the user through this endpoint
    for k in (
        "_id",
        "file_ObjectIds",
        "creators",
        "creator_ids",
        "item_id",
        "relationships",
    ):
        if k in updated_data:
            del updated_data[k]

    updated_data["last_modified"] = datetime.datetime.now(tz=datetime.timezone.utc).isoformat()

    for block_id, block_data in updated_data.get("blocks_obj", {}).items():
        blocktype = block_data["blocktype"]

        block = BLOCK_TYPES.get(blocktype, BLOCK_TYPES["notsupported"]).from_web(block_data)

        updated_data["blocks_obj"][block_id] = block.to_db()

    user_only = updated_data["type"] not in ("starting_materials", "equipment")

    item = flask_mongo.db.items.find_one(
        {"item_id": item_id, **get_default_permissions(user_only=user_only)}
    )

    # Bit of a hack for now: starting materials and equipment should be editable by anyone,
    # so we adjust the query above to be more permissive when the user is requesting such an item
    # but before returning we need to check that the actual item did indeed have that type
    if not item or not user_only and item["type"] not in ("starting_materials", "equipment"):
        return (
            jsonify(
                status="error",
                message=f"Unable to find item with appropriate permissions and {item_id=}.",
            ),
            400,
        )

    if updated_data.get("collections", []):
        try:
            updated_data["collections"] = _check_collections(updated_data)
        except ValueError as exc:
            return (
                dict(
                    status="error",
                    message=f"Cannot update {item_id!r} with missing collections {updated_data['collections']!r}: {exc}",
                    item_id=item_id,
                ),
                401,
            )

    item_type = item["type"]
    item.update(updated_data)

    try:
        item = ITEM_MODELS[item_type](**item).dict()
    except ValidationError as exc:
        return (
            jsonify(
                status="error",
                message=f"Unable to update item {item_id=} ({item_type=}) with new data {updated_data}",
                output=str(exc),
            ),
            400,
        )

    # remove collections and creators and any other reference fields
    item.pop("collections")
    item.pop("creators")

    result = flask_mongo.db.items.update_one(
        {"item_id": item_id},
        {"$set": item},
    )

    if result.matched_count != 1:
        return (
            jsonify(
                status="error",
                message=f"{item_id} item update failed. no subdocument matched",
                output=result.raw_result,
            ),
            400,
        )

    return jsonify(status="success", last_modified=updated_data["last_modified"]), 200

search_users()

Perform free text search on users and return the top results. GET parameters: query: String with the search terms. nresults: Maximum number of (default 100)

Returns:

Type Description

response list of dictionaries containing the matching items in order of descending match score.

Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/search-users/", methods=["GET"])
def search_users():
    """Perform free text search on users and return the top results.
    GET parameters:
        query: String with the search terms.
        nresults: Maximum number of  (default 100)

    Returns:
        response list of dictionaries containing the matching items in order of
        descending match score.
    """

    query = request.args.get("query", type=str)
    nresults = request.args.get("nresults", default=100, type=int)
    types = request.args.get("types", default=None)

    match_obj = {"$text": {"$search": query}}
    if types is not None:
        match_obj["type"] = {"$in": types}

    cursor = flask_mongo.db.users.aggregate(
        [
            {"$match": match_obj},
            {"$sort": {"score": {"$meta": "textScore"}}},
            {"$limit": nresults},
            {
                "$project": {
                    "_id": 1,
                    "identities": 1,
                    "display_name": 1,
                    "contact_email": 1,
                }
            },
        ]
    )
    return jsonify(
        {"status": "success", "users": list(json.loads(Person(**d).json()) for d in cursor)}
    ), 200

remotes

REMOTES

list_remote_directories()

Returns the most recent directory structures from the server.

If the cache is missing or is older than some configured time, then it will be reconstructed.

Source code in pydatalab/routes/v0_1/remotes.py
@REMOTES.route("/list-remote-directories", methods=["GET"])
@REMOTES.route("/remotes", methods=["GET"])
def list_remote_directories():
    """Returns the most recent directory structures from the server.

    If the cache is missing or is older than some configured time,
    then it will be reconstructed.

    """
    if not current_user.is_authenticated and not CONFIG.TESTING:
        return (
            jsonify(
                {
                    "status": "error",
                    "title": "Not Authorized",
                    "detail": "Listing remote directories requires authentication.",
                }
            ),
            401,
        )

    try:
        invalidate_cache = _check_invalidate_cache(request.args)
    except RuntimeError as e:
        return (
            jsonify(
                {
                    "status": "error",
                    "title": "Invalid Argument",
                    "detail": str(e),
                }
            ),
            400,
        )

    all_directory_structures = get_directory_structures(
        CONFIG.REMOTE_FILESYSTEMS, invalidate_cache=invalidate_cache
    )

    response = {}
    response["meta"] = {}
    response["meta"]["remotes"] = [json.loads(d.json()) for d in CONFIG.REMOTE_FILESYSTEMS]
    if all_directory_structures:
        oldest_update = min(d["last_updated"] for d in all_directory_structures)
        response["meta"]["oldest_cache_update"] = oldest_update.isoformat()
        response["data"] = all_directory_structures
    return jsonify(response), 200

get_remote_directory(remote_id: str)

Returns the directory structure from the server for the given configured remote name.

Source code in pydatalab/routes/v0_1/remotes.py
@REMOTES.route("/remotes/<path:remote_id>", methods=["GET"])
def get_remote_directory(remote_id: str):
    """Returns the directory structure from the server for the
    given configured remote name.

    """
    if not current_user.is_authenticated and not CONFIG.TESTING:
        return (
            jsonify(
                {
                    "status": "error",
                    "title": "Not Authorized",
                    "detail": "Listing remote directories requires authentication.",
                }
            ),
            401,
        )

    try:
        invalidate_cache = _check_invalidate_cache(request.args)
    except RuntimeError as e:
        return (
            jsonify(
                {
                    "status": "error",
                    "title": "Invalid Argument",
                    "detail": str(e),
                }
            ),
            400,
        )

    for d in CONFIG.REMOTE_FILESYSTEMS:
        if remote_id == d.name:
            remote_obj = d
            break
    else:
        return (
            jsonify(
                {
                    "status": "error",
                    "title": "Not Found",
                    "detail": f"No remote found with name {remote_id!r}",
                }
            ),
            404,
        )

    directory_structure = get_directory_structure(remote_obj, invalidate_cache=invalidate_cache)

    response: Dict[str, Any] = {}
    response["meta"] = {}
    response["meta"]["remote"] = json.loads(d.json())
    response["data"] = directory_structure

    return jsonify(response), 200

users

USERS

save_user(user_id)

Source code in pydatalab/routes/v0_1/users.py
@USERS.route("/users/<user_id>", methods=["PATCH"])
def save_user(user_id):
    request_json = request.get_json()

    display_name: str | None = None
    contact_email: str | None = None
    account_status: str | None = None

    if request_json is not None:
        display_name = request_json.get("display_name", False)
        contact_email = request_json.get("contact_email", False)
        account_status = request_json.get("account_status", None)

    if not current_user.is_authenticated and not CONFIG.TESTING:
        return (jsonify({"status": "error", "message": "No user authenticated."}), 401)

    if not CONFIG.TESTING and current_user.id != user_id and current_user.role != "admin":
        return (
            jsonify({"status": "error", "message": "User not allowed to edit this profile."}),
            403,
        )

    update = {}

    try:
        if display_name:
            update["display_name"] = DisplayName(display_name)

        if contact_email or contact_email in (None, ""):
            if contact_email in ("", None):
                update["contact_email"] = None
            else:
                update["contact_email"] = EmailStr(contact_email)

        if account_status:
            update["account_status"] = account_status

    except ValueError as e:
        return jsonify(
            {"status": "error", "message": f"Invalid display name or email was passed: {str(e)}"}
        ), 400

    if not update:
        return jsonify({"status": "success", "message": "No update was performed."}), 200

    update_result = flask_mongo.db.users.update_one({"_id": ObjectId(user_id)}, {"$set": update})

    if update_result.matched_count != 1:
        return (jsonify({"status": "error", "message": "Unable to update user."}), 400)

    if update_result.modified_count != 1:
        return (
            jsonify(
                {
                    "status": "success",
                    "message": "No update was performed",
                }
            ),
            200,
        )

    return (jsonify({"status": "success"}), 200)