REST API¶
pydatalab.routes.v0_1
special
¶
BLUEPRINTS: tuple
¶
admin
¶
ADMIN
¶
_()
¶
Source code in pydatalab/routes/v0_1/admin.py
@ADMIN.before_request
@admin_only
def _(): ...
get_users()
¶
Source code in pydatalab/routes/v0_1/admin.py
@ADMIN.route("/users")
def get_users():
users = flask_mongo.db.users.aggregate(
[
{"$match": get_default_permissions(user_only=True)},
{
"$lookup": {
"from": "roles",
"localField": "_id",
"foreignField": "_id",
"as": "role",
}
},
{
"$addFields": {
"role": {
"$cond": {
"if": {"$eq": [{"$size": "$role"}, 0]},
"then": "user",
"else": {"$arrayElemAt": ["$role.role", 0]},
}
}
}
},
]
)
return jsonify({"status": "success", "data": list(users)})
save_role(user_id)
¶
Source code in pydatalab/routes/v0_1/admin.py
@ADMIN.route("/roles/<user_id>", methods=["PATCH"])
def save_role(user_id):
request_json = request.get_json()
if request_json is not None:
user_role = request_json
if not current_user.is_authenticated and not CONFIG.TESTING:
return (jsonify({"status": "error", "message": "No user authenticated."}), 401)
if not CONFIG.TESTING and current_user.role != "admin":
return (
jsonify({"status": "error", "message": "User not allowed to edit this profile."}),
403,
)
existing_user = flask_mongo.db.users.find_one({"_id": ObjectId(user_id)})
if not existing_user:
return (jsonify({"status": "error", "message": "User not found."}), 404)
existing_role = flask_mongo.db.roles.find_one({"_id": ObjectId(user_id)})
if not existing_role:
if not user_role:
return (jsonify({"status": "error", "message": "Role not provided for new user."}), 400)
new_user_role = {"_id": ObjectId(user_id), **user_role}
flask_mongo.db.roles.insert_one(new_user_role)
return (jsonify({"status": "success", "message": "New user's role created."}), 201)
update_result = flask_mongo.db.roles.update_one({"_id": ObjectId(user_id)}, {"$set": user_role})
if update_result.matched_count != 1:
return (jsonify({"status": "error", "message": "Unable to update user."}), 400)
if update_result.modified_count != 1:
return (
jsonify(
{
"status": "success",
"message": "No update was performed",
}
),
200,
)
return (jsonify({"status": "success"}), 200)
auth
¶
This module implements functionality for authenticating users via OAuth2 providers and JWT, and associating these identities with their local accounts.
AUTH
¶
EMAIL_BLUEPRINT
¶
KEY_LENGTH: int
¶
LINK_EXPIRATION: timedelta
¶
OAUTH: Dict[pydatalab.models.people.IdentityType, flask.blueprints.Blueprint]
¶
A dictionary of Flask blueprints corresponding to the supported OAuth providers.
OAUTH_PROXIES: Dict[pydatalab.models.people.IdentityType, werkzeug.local.LocalProxy]
¶
A dictionary of proxy objects (c.f. Flask context locals) corresponding to the supported OAuth2 providers, and can be used to make further authenticated requests out to the providers.
wrapped_login_user(*args, **kwargs)
¶
Source code in pydatalab/routes/v0_1/auth.py
@logged_route
def wrapped_login_user(*args, **kwargs):
login_user(*args, **kwargs)
find_user_with_identity(identifier: str, identity_type: Union[str, pydatalab.models.people.IdentityType], verify: bool = False) -> Optional[pydatalab.models.people.Person]
¶
Look up the given identity in the users database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
identifier |
str |
The identifier of the identity to look up. |
required |
identity_type |
Union[str, pydatalab.models.people.IdentityType] |
The type of the identity to look up. |
required |
verify |
bool |
Whether to mark the identity as verified if it is found. |
False |
Source code in pydatalab/routes/v0_1/auth.py
@logged_route
def find_user_with_identity(
identifier: str,
identity_type: Union[str, IdentityType],
verify: bool = False,
) -> Optional[Person]:
"""Look up the given identity in the users database.
Parameters:
identifier: The identifier of the identity to look up.
identity_type: The type of the identity to look up.
verify: Whether to mark the identity as verified if it is found.
"""
user = flask_mongo.db.users.find_one(
{"identities.identifier": identifier, "identities.identity_type": identity_type},
)
if user:
person = Person(**user)
identity_indices: list[int] = [
ind
for ind, _ in enumerate(person.identities)
if (_.identity_type == identity_type and _.identifier == identifier)
]
if len(identity_indices) != 1:
raise RuntimeError(
"Unexpected error: multiple or no identities matched the OAuth token."
)
identity_index = identity_indices[0]
if verify and not person.identities[identity_index].verified:
flask_mongo.db.users.update_one(
{"_id": person.immutable_id},
{"$set": {f"identities.{identity_index}.verified": True}},
)
return person
return None
find_create_or_modify_user(identifier: str, identity_type: Union[str, pydatalab.models.people.IdentityType], identity_name: str, display_name: Optional[str] = None, verified: bool = False, create_account: bool | pydatalab.models.people.AccountStatus = False) -> None
¶
Search for a user account with the given identifier and identity type, creating or connecting one if it does not exist.
1. Find any user with the given identity, if found, return it.
2. If no user exists, check if there is currently a user logged in:
- If so, attach the identity to the current user.
- If not, create an entry in the user database with this identity.
3. Log in as the user for this session.
Source code in pydatalab/routes/v0_1/auth.py
def find_create_or_modify_user(
identifier: str,
identity_type: Union[str, IdentityType],
identity_name: str,
display_name: Optional[str] = None,
verified: bool = False,
create_account: bool | AccountStatus = False,
) -> None:
"""Search for a user account with the given identifier and identity type, creating
or connecting one if it does not exist.
1. Find any user with the given identity, if found, return it.
2. If no user exists, check if there is currently a user logged in:
- If so, attach the identity to the current user.
- If not, create an entry in the user database with this identity.
3. Log in as the user for this session.
"""
@logged_route
def attach_identity_to_user(
user_id: str,
identity: Identity,
use_display_name: bool = False,
use_contact_email: bool = False,
) -> None:
"""Associates an OAuth ID with a user entry in the database.
This function is currently brittle and would need to be updated
if the corresponding `Person` schema changes due to the hard-coded
field names.
Parameters:
user_id: The database ID of the user as a string.
identity: The identity to associate.
use_display_name: Whether to set the user's top-level display name with a
display name provided by this identity.
use_contact_email: Whether to set the user's top-level contact email with
an email address provided by this identity.
Raises:
RuntimeError: If the update was unsuccessful.
"""
update = {"$push": {"identities": identity.dict()}}
if use_display_name and identity and identity.display_name:
update["$set"] = {"display_name": identity.display_name}
if use_contact_email and identity.identity_type is IdentityType.EMAIL and identity.verified:
update["$set"] = {"contact_email": identity.identifier}
result = flask_mongo.db.users.update_one(
{"_id": ObjectId(user_id)},
update,
)
if result.matched_count != 1:
raise RuntimeError(
f"Attempted to modify user {user_id} but performed {result.matched_count} updates. Results:\n{result.raw_result}"
)
user = find_user_with_identity(identifier, identity_type, verify=True)
# If no user was found in the database with the OAuth ID, make or modify one:
if not user:
identity = Identity(
identifier=identifier,
identity_type=identity_type,
name=identity_name,
display_name=display_name,
verified=verified,
)
# If there is currently a user logged in who has gone through OAuth with a new identity,
# then update the user database with the identity
if current_user.is_authenticated:
attach_identity_to_user(
current_user.id,
identity,
use_display_name=True if current_user.display_name is None else False,
use_contact_email=True if current_user.contact_email is None else False,
)
current_user.refresh()
user = current_user.person
# If there is no current authenticated user, make one with the current OAuth identity
else:
if not create_account:
raise UserRegistrationForbidden
if isinstance(create_account, bool):
account_status = AccountStatus.UNVERIFIED
else:
account_status = create_account
user = Person.new_user_from_identity(
identity, use_display_name=True, account_status=account_status
)
LOGGER.debug("Inserting new user model %s into database", user)
insert_pydantic_model_fork_safe(user, "users")
user_model = get_by_id(str(user.immutable_id))
if user is None:
raise RuntimeError("Failed to insert user into database")
wrapped_login_user(user_model)
# Log the user into the session with this identity
if user is not None:
wrapped_login_user(get_by_id(str(user.immutable_id)))
generate_and_share_magic_link()
¶
Generates a JWT-based magic link with which a user can log in, stores it in the database and sends it to the verified email address.
Source code in pydatalab/routes/v0_1/auth.py
@EMAIL_BLUEPRINT.route("/magic-link", methods=["POST"])
def generate_and_share_magic_link():
"""Generates a JWT-based magic link with which a user can log in, stores it
in the database and sends it to the verified email address.
"""
request_json = request.get_json()
email = request_json.get("email")
referrer = request_json.get("referrer")
if not email:
return jsonify({"status": "error", "detail": "No email provided."}), 400
if not re.match(r"^\S+@\S+.\S+$", email):
return jsonify({"status": "error", "detail": "Invalid email provided."}), 400
if not referrer:
LOGGER.warning("No referrer provided for magic link request: %s", request_json)
return (
jsonify(
{
"status": "error",
"detail": "Referrer address not provided, please contact the datalab administrator.",
}
),
400,
)
# Generate a JWT for the user with a short expiration; the session itself
# should persist
# The key `exp` is a standard part of JWT; pyjwt treats this as an expiration time
# and will correctly encode the datetime
token = jwt.encode(
{"exp": datetime.datetime.now(datetime.timezone.utc) + LINK_EXPIRATION, "email": email},
CONFIG.SECRET_KEY,
algorithm="HS256",
)
flask_mongo.db.magic_links.insert_one(
{"jwt": token},
)
link = f"{referrer}?token={token}"
instance_url = referrer.replace("https://", "")
# See if the user already exists and adjust the email if so
user = find_user_with_identity(email, IdentityType.EMAIL, verify=False)
if not user:
allowed = _check_email_domain(email, CONFIG.EMAIL_DOMAIN_ALLOW_LIST)
if not allowed:
LOGGER.info("Did not allow %s to register an account", email)
return (
jsonify(
{
"status": "error",
"detail": f"Email address {email} is not allowed to register an account. Please contact the administrator if you believe this is an error.",
}
),
403,
)
if user is not None:
subject = "Datalab Sign-in Magic Link"
body = f"Click the link below to sign-in to the datalab instance at {instance_url}:\n\n{link}\n\nThis link is single-use and will expire in 1 hour."
else:
subject = "Datalab Registration Magic Link"
body = f"Click the link below to register for the datalab instance at {instance_url}:\n\n{link}\n\nThis link is single-use and will expire in 1 hour."
try:
send_mail(email, subject, body)
except Exception as exc:
LOGGER.warning("Failed to send email to %s: %s", email, exc)
return jsonify({"status": "error", "detail": "Email not sent successfully."}), 400
return jsonify({"status": "success", "detail": "Email sent successfully."}), 200
email_logged_in()
¶
Endpoint for handling magic link authentication.
- Checks the passed token for as valid JWT in the
magic_links
collection - If found, checks if the user with the decoded email exists in the user collection.
- If not found, make the user account and verify their email address,
- Authenticate the user for this session.
Source code in pydatalab/routes/v0_1/auth.py
@EMAIL_BLUEPRINT.route("/email")
def email_logged_in():
"""Endpoint for handling magic link authentication.
- Checks the passed token for as valid JWT in the `magic_links` collection
- If found, checks if the user with the decoded email exists in the user
collection.
- If not found, make the user account and verify their email address,
- Authenticate the user for this session.
"""
args = request.args
token = args.get("token")
if not token:
raise ValueError("Token not provided")
if not flask_mongo.db.magic_links.find_one({"jwt": token}):
raise ValueError("Token not found, please request a new one.")
data = jwt.decode(
token,
CONFIG.SECRET_KEY,
algorithms=["HS256"],
)
if datetime.datetime.fromtimestamp(
data["exp"], tz=datetime.timezone.utc
) < datetime.datetime.now(tz=datetime.timezone.utc):
raise ValueError("Token expired, please request a new one.")
email = data["email"]
if not email:
raise RuntimeError("No email found; please request a new token.")
# If the email domain list is explicitly configured to None, this allows any
# email address to make an active account, otherwise the email domain must match
# the list of allowed domains and the admin must verify the user
allowed = _check_email_domain(email, CONFIG.EMAIL_DOMAIN_ALLOW_LIST)
if not allowed:
# If this point is reached, the token is valid but the server settings have
# changed since the link was generated, so best to fail safe
raise UserRegistrationForbidden
create_account = AccountStatus.UNVERIFIED
if (
CONFIG.EMAIL_DOMAIN_ALLOW_LIST is None
or CONFIG.EMAIL_AUTO_ACTIVATE_ACCOUNTS
or CONFIG.AUTO_ACTIVATE_ACCOUNTS
):
create_account = AccountStatus.ACTIVE
find_create_or_modify_user(
email,
IdentityType.EMAIL,
email,
display_name=email,
verified=True,
create_account=create_account,
)
if CONFIG.APP_URL:
return redirect(CONFIG.APP_URL, 307)
referer = request.headers.get("Referer", "/")
return redirect(referer, 307)
github_logged_in(blueprint, token)
¶
This Flask signal hooks into any attempt to use the GitHub blueprint, and will make a user account with this identity if not already present in the database.
Makes one authorized request to the GitHub API to get the user's GitHub ID, username and display name, without storing the OAuth token.
Source code in pydatalab/routes/v0_1/auth.py
@oauth_authorized.connect_via(OAUTH[IdentityType.GITHUB])
def github_logged_in(blueprint, token):
"""This Flask signal hooks into any attempt to use the GitHub blueprint, and will
make a user account with this identity if not already present in the database.
Makes one authorized request to the GitHub API to get the user's GitHub ID,
username and display name, without storing the OAuth token.
"""
if not token:
return False
resp = blueprint.session.get("/user")
if not resp.ok:
return False
github_info = resp.json()
github_user_id = str(github_info["id"])
username = str(github_info["login"])
name = str(github_info["name"] if github_info["name"] is not None else github_info["login"])
create_account: bool | AccountStatus = False
# Use the read:org scope to check if the user is a member of at least one of the allowed orgs
if CONFIG.GITHUB_ORG_ALLOW_LIST:
for org in CONFIG.GITHUB_ORG_ALLOW_LIST:
if str(int(org)) == org:
org = int(org)
if blueprint.session.get(f"/orgs/{org}/members/{username}").ok:
# If this person has a GH account on the org allow list, activate their account
create_account = AccountStatus.ACTIVE
break
elif CONFIG.GITHUB_ORG_ALLOW_LIST is None:
create_account = True
if CONFIG.GITHUB_AUTO_ACTIVATE_ACCOUNTS or CONFIG.AUTO_ACTIVATE_ACCOUNTS:
create_account = AccountStatus.ACTIVE
find_create_or_modify_user(
github_user_id,
IdentityType.GITHUB,
username,
display_name=name,
verified=True,
create_account=create_account,
)
# Return false to prevent Flask-dance from trying to store the token elsewhere
return False
orcid_logged_in(_, token)
¶
This signal hooks into any attempt to use the ORCID blueprint, and will associate a user account with this identity if not already present in the database.
The OAuth token is not stored alongside the user.
Source code in pydatalab/routes/v0_1/auth.py
@oauth_authorized.connect_via(OAUTH[IdentityType.ORCID])
def orcid_logged_in(_, token):
"""This signal hooks into any attempt to use the ORCID blueprint, and will
associate a user account with this identity if not already present in the database.
The OAuth token is not stored alongside the user.
"""
if not token:
return False
# New ORCID accounts must be activated by an admin unless configured otherwise
create_account = AccountStatus.UNVERIFIED
if CONFIG.ORCID_AUTO_ACTIVATE_ACCOUNTS or CONFIG.AUTO_ACTIVATE_ACCOUNTS:
create_account = AccountStatus.ACTIVE
find_create_or_modify_user(
token["orcid"],
IdentityType.ORCID,
token["orcid"],
display_name=token.get("name", token["orcid"]),
verified=True,
create_account=create_account,
)
# Return false to prevent Flask-dance from trying to store the token elsewhere
return False
redirect_to_ui(blueprint, token)
¶
Intercepts the default Flask-Dance and redirects to the referring page.
Source code in pydatalab/routes/v0_1/auth.py
@oauth_authorized.connect
def redirect_to_ui(blueprint, token): # pylint: disable=unused-argument
"""Intercepts the default Flask-Dance and redirects to the referring page."""
if CONFIG.APP_URL:
return redirect(CONFIG.APP_URL, 307)
referer = request.headers.get("Referer", "/")
return redirect(referer, 307)
get_authenticated_user_info()
¶
Returns metadata associated with the currently authenticated user.
Source code in pydatalab/routes/v0_1/auth.py
@AUTH.route("/get-current-user/", methods=["GET"])
def get_authenticated_user_info():
"""Returns metadata associated with the currently authenticated user."""
if current_user.is_authenticated:
current_user_response = json.loads(current_user.person.json())
current_user_response["role"] = current_user.role.value
return jsonify(current_user_response), 200
else:
return jsonify({"status": "failure", "message": "User must be authenticated."}), 401
generate_user_api_key()
¶
Returns metadata associated with the currently authenticated user.
Source code in pydatalab/routes/v0_1/auth.py
@AUTH.route("/get-api-key/", methods=["GET"])
def generate_user_api_key():
"""Returns metadata associated with the currently authenticated user."""
if current_user.is_authenticated:
new_key = "".join(random.choices(ascii_letters, k=KEY_LENGTH))
flask_mongo.db.api_keys.update_one(
{"_id": ObjectId(current_user.id)},
{"$set": {"hash": sha512(new_key.encode("utf-8")).hexdigest()}},
upsert=True,
)
return jsonify({"key": new_key}), 200
else:
return (
jsonify(
{
"status": "failure",
"message": "User must be an authenticated admin to request an API key.",
}
),
401,
)
blocks
¶
BLOCKS
¶
_()
¶
Source code in pydatalab/routes/v0_1/blocks.py
@BLOCKS.before_request
@active_users_or_get_only
def _(): ...
add_data_block()
¶
Call with AJAX to add a block to the sample
Source code in pydatalab/routes/v0_1/blocks.py
@BLOCKS.route("/add-data-block/", methods=["POST"])
def add_data_block():
"""Call with AJAX to add a block to the sample"""
request_json = request.get_json()
# pull out required arguments from json
block_type = request_json["block_type"]
item_id = request_json["item_id"]
insert_index = request_json["index"]
if block_type not in BLOCK_TYPES:
return jsonify(status="error", message="Invalid block type"), 400
block = BLOCK_TYPES[block_type](item_id=item_id)
data = block.to_db()
# currently, adding to both blocks and blocks_obj to mantain compatibility with
# the old site. The new site only uses blocks_obj
if insert_index:
display_order_update = {
"$each": [block.block_id],
"$position": insert_index,
}
else:
display_order_update = block.block_id
result = flask_mongo.db.items.update_one(
{"item_id": item_id, **get_default_permissions(user_only=True)},
{
"$push": {"blocks": data, "display_order": display_order_update},
"$set": {f"blocks_obj.{block.block_id}": data},
},
)
if result.modified_count < 1:
return (
jsonify(
status="error",
message=f"Update failed. {item_id=} is probably incorrect.",
),
400,
)
# get the new display_order:
display_order_result = flask_mongo.db.items.find_one(
{"item_id": item_id, **get_default_permissions(user_only=True)}, {"display_order": 1}
)
return jsonify(
status="success",
new_block_obj=block.to_web(),
new_block_insert_index=insert_index
if insert_index is None
else len(display_order_result["display_order"]) - 1,
new_display_order=display_order_result["display_order"],
)
add_collection_data_block()
¶
Call with AJAX to add a block to the collection.
Source code in pydatalab/routes/v0_1/blocks.py
@BLOCKS.route("/add-collection-data-block/", methods=["POST"])
def add_collection_data_block():
"""Call with AJAX to add a block to the collection."""
request_json = request.get_json()
# pull out required arguments from json
block_type = request_json["block_type"]
collection_id = request_json["collection_id"]
insert_index = request_json["index"]
if block_type not in BLOCK_TYPES:
return jsonify(status="error", message="Invalid block type"), 400
block = BLOCK_TYPES[block_type](collection_id=collection_id)
data = block.to_db()
# currently, adding to both blocks and blocks_obj to mantain compatibility with
# the old site. The new site only uses blocks_obj
if insert_index:
display_order_update = {
"$each": [block.block_id],
"$position": insert_index,
}
else:
display_order_update = block.block_id
result = flask_mongo.db.collections.update_one(
{"collection_id": collection_id, **get_default_permissions(user_only=True)},
{
"$push": {"blocks": data, "display_order": display_order_update},
"$set": {f"blocks_obj.{block.block_id}": data},
},
)
if result.modified_count < 1:
return (
jsonify(
status="error",
message=f"Update failed. {collection_id=} is probably incorrect.",
),
400,
)
# get the new display_order:
display_order_result = flask_mongo.db.collections.find_one(
{"collection_id": collection_id, **get_default_permissions(user_only=True)},
{"display_order": 1},
)
return jsonify(
status="success",
new_block_obj=block.to_web(),
new_block_insert_index=insert_index
if insert_index is None
else len(display_order_result["display_order"]) - 1,
new_display_order=display_order_result["display_order"],
)
update_block()
¶
Take in json block data from site, process, and spit out updated data. May be used, for example, when the user changes plot parameters and the server needs to generate a new plot.
Source code in pydatalab/routes/v0_1/blocks.py
@BLOCKS.route("/update-block/", methods=["POST"])
def update_block():
"""Take in json block data from site, process, and spit
out updated data. May be used, for example, when the user
changes plot parameters and the server needs to generate a new
plot.
"""
request_json = request.get_json()
block_data = request_json["block_data"]
blocktype = block_data["blocktype"]
save_to_db = request_json.get("save_to_db", False)
block = BLOCK_TYPES[blocktype].from_web(block_data)
saved_successfully = False
if save_to_db:
saved_successfully = _save_block_to_db(block)
return (
jsonify(
status="success", saved_successfully=saved_successfully, new_block_data=block.to_web()
),
200,
)
delete_block()
¶
Completely delete a data block from the database. In the future, we may consider preserving data by moving it to a different array, or simply making it invisible
Source code in pydatalab/routes/v0_1/blocks.py
@BLOCKS.route("/delete-block/", methods=["POST"])
def delete_block():
"""Completely delete a data block from the database. In the future,
we may consider preserving data by moving it to a different array,
or simply making it invisible"""
request_json = request.get_json()
item_id = request_json["item_id"]
block_id = request_json["block_id"]
result = flask_mongo.db.items.update_one(
{"item_id": item_id, **get_default_permissions(user_only=True)},
{
"$pull": {
"blocks": {"block_id": block_id},
"display_order": block_id,
},
"$unset": {f"blocks_obj.{block_id}": ""},
},
)
if result.modified_count < 1:
return (
jsonify(
{
"status": "error",
"message": f"Update failed. The item_id probably incorrect: {item_id}",
}
),
400,
)
return (
jsonify({"status": "success"}),
200,
) # could try to switch to http 204 is "No Content" success with no json
delete_collection_block()
¶
Completely delete a data block from the database that is currently attached to a collection.
In the future, we may consider preserving data by moving it to a different array, or simply making it invisible
Source code in pydatalab/routes/v0_1/blocks.py
@BLOCKS.route("/delete-collection-block/", methods=["POST"])
def delete_collection_block():
"""Completely delete a data block from the database that is currently
attached to a collection.
In the future, we may consider preserving data by moving it to a different array,
or simply making it invisible"""
request_json = request.get_json()
collection_id = request_json["collection_id"]
block_id = request_json["block_id"]
result = flask_mongo.db.collections.update_one(
{"collection_id": collection_id, **get_default_permissions(user_only=True)},
{
"$pull": {
"blocks": {"block_id": block_id},
"display_order": block_id,
},
"$unset": {f"blocks_obj.{block_id}": ""},
},
)
if result.modified_count < 1:
return (
jsonify(
{
"status": "error",
"message": f"Update failed. The collection_id probably incorrect: {collection_id}",
}
),
400,
)
return (
jsonify({"status": "success"}),
200,
)
collections
¶
COLLECTIONS
¶
_()
¶
Source code in pydatalab/routes/v0_1/collections.py
@COLLECTIONS.before_request
@active_users_or_get_only
def _(): ...
get_collections()
¶
Source code in pydatalab/routes/v0_1/collections.py
@COLLECTIONS.route("/collections")
def get_collections():
collections = flask_mongo.db.collections.aggregate(
[
{"$match": get_default_permissions(user_only=True)},
{"$lookup": creators_lookup()},
{"$project": {"_id": 0}},
{"$sort": {"_id": -1}},
]
)
return jsonify({"status": "success", "data": list(collections)})
get_collection(collection_id)
¶
Source code in pydatalab/routes/v0_1/collections.py
@COLLECTIONS.route("/collections/<collection_id>", methods=["GET"])
def get_collection(collection_id):
cursor = flask_mongo.db.collections.aggregate(
[
{
"$match": {
"collection_id": collection_id,
**get_default_permissions(user_only=True),
}
},
{"$lookup": creators_lookup()},
{"$sort": {"_id": -1}},
]
)
try:
doc = list(cursor)[0]
except IndexError:
doc = None
if not doc or (not current_user.is_authenticated and not CONFIG.TESTING):
return (
jsonify(
{
"status": "error",
"message": f"No matching collection {collection_id=} with current authorization.",
}
),
404,
)
collection = Collection(**doc)
samples = list(
get_samples_summary(
match={
"relationships.type": "collections",
"relationships.immutable_id": collection.immutable_id,
},
project={"collections": 0},
)
)
collection.num_items = len(samples)
return jsonify(
{
"status": "success",
"collection_id": collection_id,
"data": json.loads(collection.json(exclude_unset=True)),
"child_items": list(samples),
}
)
create_collection()
¶
Source code in pydatalab/routes/v0_1/collections.py
@COLLECTIONS.route("/collections", methods=["PUT"])
def create_collection():
request_json = request.get_json() # noqa: F821 pylint: disable=undefined-variable
data = request_json.get("data", {})
copy_from_id = request_json.get("copy_from_collection_id", None)
starting_members = data.get("starting_members", [])
if not current_user.is_authenticated and not CONFIG.TESTING:
return (
dict(
status="error",
message="Unable to create new collection without user authentication.",
collection_id=data.get("collection_id"),
),
401,
)
if copy_from_id:
raise NotImplementedError("Copying collections is not yet implemented.")
if CONFIG.TESTING:
data["creator_ids"] = [24 * "0"]
data["creators"] = [{"display_name": "Public testing user"}]
else:
data["creator_ids"] = [current_user.person.immutable_id]
data["creators"] = [
{
"display_name": current_user.person.display_name,
"contact_email": current_user.person.contact_email,
}
]
# check to make sure that item_id isn't taken already
if flask_mongo.db.collections.find_one({"collection_id": data["collection_id"]}):
return (
dict(
status="error",
message=f"collection_id_validation_error: {data['collection_id']!r} already exists in database.",
collection_id=data["collection_id"],
),
409, # 409: Conflict
)
data["last_modified"] = data.get(
"last_modified", datetime.datetime.now(datetime.timezone.utc).isoformat()
)
try:
data_model = Collection(**data)
except ValidationError as error:
return (
dict(
status="error",
message=f"Unable to create new collection with ID {data['collection_id']}.",
item_id=data["collection_id"],
output=str(error),
),
400,
)
result: InsertOneResult = flask_mongo.db.collections.insert_one(
data_model.dict(exclude={"creators"})
)
if not result.acknowledged:
return (
dict(
status="error",
message=f"Failed to add new collection {data['collection_id']!r} to database.",
collection_id=data["collection_id"],
output=result.raw_result,
),
400,
)
data_model.immutable_id = result.inserted_id
errors = []
if starting_members:
item_ids = {d.get("item_id") for d in starting_members}
if None in item_ids:
item_ids.remove(None)
results: UpdateResult = flask_mongo.db.items.update_many(
{
"item_id": {"$in": list(item_ids)},
**get_default_permissions(user_only=True),
},
{
"$push": {
"relationships": {
"type": "collections",
"immutable_id": data_model.immutable_id,
}
}
},
)
data_model.num_items = results.modified_count
if results.modified_count < len(starting_members):
errors = [
item_id
for item_id in starting_members
if item_id not in results.raw_result.get("upserted", [])
]
else:
data_model.num_items = 0
response = {
"status": "success",
"data": json.loads(data_model.json()),
}
if errors:
response["warnings"] = [
f"Unable to register {errors} to new collection {data_model.collection_id}"
]
return (
jsonify(response),
201, # 201: Created
)
save_collection(collection_id)
¶
Source code in pydatalab/routes/v0_1/collections.py
@COLLECTIONS.route("/collections/<collection_id>", methods=["PATCH"])
@logged_route
def save_collection(collection_id):
request_json = request.get_json() # noqa: F821 pylint: disable=undefined-variable
updated_data = request_json.get("data")
if not updated_data:
return (
jsonify(
status="error",
message=f"Unable to find any data in request to update {collection_id=} with.",
),
204, # 204: No content
)
# These keys should not be updated here and cannot be modified by the user through this endpoint
for k in ("_id", "file_ObjectIds", "creators", "creator_ids", "collection_id"):
if k in updated_data:
del updated_data[k]
updated_data["last_modified"] = datetime.datetime.now(datetime.timezone.utc).isoformat()
collection = flask_mongo.db.collections.find_one(
{"collection_id": collection_id, **get_default_permissions(user_only=True)}
)
if not collection:
return (
jsonify(
status="error",
message=f"Unable to find item with appropriate permissions and {collection_id=}.",
),
400,
)
collection.update(updated_data)
try:
collection = Collection(**collection).dict()
except ValidationError as exc:
return (
jsonify(
status="error",
message=f"Unable to update item {collection_id=} with new data {updated_data}",
output=str(exc),
),
400,
)
result: UpdateResult = flask_mongo.db.collections.update_one(
{"collection_id": collection_id},
{"$set": collection},
)
if result.modified_count != 1:
return (
jsonify(
status="error",
message=f"Unable to update item {collection_id=} with new data {updated_data}",
output=result.raw_result,
),
400,
)
return jsonify(status="success"), 200
delete_collection(collection_id: str)
¶
Source code in pydatalab/routes/v0_1/collections.py
@COLLECTIONS.route("/collections/<collection_id>", methods=["DELETE"])
def delete_collection(collection_id: str):
with flask_mongo.cx.start_session() as session:
with session.start_transaction():
collection_immutable_id = flask_mongo.db.collections.find_one(
{"collection_id": collection_id, **get_default_permissions(user_only=True)},
projection={"_id": 1},
)["_id"]
result = flask_mongo.db.collections.delete_one(
{
"collection_id": collection_id,
**get_default_permissions(user_only=True, deleting=True),
}
)
if result.deleted_count != 1:
return (
jsonify(
{
"status": "error",
"message": f"Authorization required to attempt to delete collection with {collection_id=} from the database.",
}
),
401,
)
# If successful, remove collection from all matching items relationships
flask_mongo.db.items.update_many(
{
"relationships": {
"$elemMatch": {
"immutable_id": collection_immutable_id,
"type": "collections",
}
}
},
{
"$pull": {
"relationships": {
"immutable_id": collection_immutable_id,
"type": "collections",
}
}
},
)
return (
jsonify(
{
"status": "success",
}
),
200,
)
search_collections()
¶
Source code in pydatalab/routes/v0_1/collections.py
@COLLECTIONS.route("/search-collections", methods=["GET"])
def search_collections():
query = request.args.get("query", type=str)
nresults = request.args.get("nresults", default=100, type=int)
match_obj = {"$text": {"$search": query}, **get_default_permissions(user_only=True)}
cursor = [
json.loads(Collection(**doc).json(exclude_unset=True))
for doc in flask_mongo.db.collections.aggregate(
[
{"$match": match_obj},
{"$sort": {"score": {"$meta": "textScore"}}},
{"$limit": nresults},
{
"$project": {
"collection_id": 1,
"title": 1,
}
},
]
)
]
return jsonify({"status": "success", "data": list(cursor)}), 200
add_items_to_collection(collection_id)
¶
Source code in pydatalab/routes/v0_1/collections.py
@COLLECTIONS.route("/collections/<collection_id>", methods=["POST"])
def add_items_to_collection(collection_id):
data = request.get_json()
refcodes = data.get("data", {}).get("refcodes", [])
collection = flask_mongo.db.collections.find_one(
{"collection_id": collection_id, **get_default_permissions()}
)
if not collection:
return jsonify({"error": "Collection not found"}), 404
if not refcodes:
return jsonify({"error": "No item provided"}), 400
item_count = flask_mongo.db.items.count_documents(
{"refcode": {"$in": refcodes}, **get_default_permissions()}
)
if item_count == 0:
return jsonify({"error": "No matching items found"}), 404
update_result = flask_mongo.db.items.update_many(
{"refcode": {"$in": refcodes}, **get_default_permissions()},
{
"$addToSet": {
"relationships": {
"description": "Is a member of",
"relation": None,
"type": "collections",
"immutable_id": ObjectId(collection["_id"]),
"item_id": None,
"refcode": None,
}
}
},
)
if update_result.matched_count == 0:
return (jsonify({"status": "error", "message": "Unable to add to collection."}), 400)
if update_result.modified_count == 0:
return (
jsonify(
{
"status": "success",
"message": "No update was performed",
}
),
200,
)
return (jsonify({"status": "success"}), 200)
files
¶
FILES
¶
_()
¶
Source code in pydatalab/routes/v0_1/files.py
@FILES.before_request
@active_users_or_get_only
def _(): ...
get_file(file_id: str, filename: str)
¶
Source code in pydatalab/routes/v0_1/files.py
@FILES.route("/files/<string:file_id>/<string:filename>", methods=["GET"])
def get_file(file_id: str, filename: str):
try:
_file_id = ObjectId(file_id)
except InvalidId:
# If the ID is invalid, then there will be no results in the database anyway,
# so just 401
_file_id = file_id
if not pydatalab.mongo.flask_mongo.db.files.find_one(
{"_id": _file_id, **get_default_permissions(user_only=False)}
):
return (
jsonify(
{
"status": "error",
"title": "Not Authorized",
"detail": "Authorization required to access file",
}
),
401,
)
path = os.path.join(CONFIG.FILE_DIRECTORY, secure_filename(file_id))
return send_from_directory(path, filename)
upload()
¶
Upload a file to the server and save it to the database.
The file is received via a multipart/form-data
request. Each
file is a binary octet stream. The file is saved to the server
in the configured file directory, under a subdirectory named with
the created database ID for the file.
Additional POST parameters are required:
- item_id
: the ID of the item to which the file is attached
- replace_file
: the database ID of the file to replace, if any
Source code in pydatalab/routes/v0_1/files.py
@FILES.route("/upload-file/", methods=["POST"])
def upload():
"""Upload a file to the server and save it to the database.
The file is received via a `multipart/form-data` request. Each
file is a binary octet stream. The file is saved to the server
in the configured file directory, under a subdirectory named with
the created database ID for the file.
Additional POST parameters are required:
- `item_id`: the ID of the item to which the file is attached
- `replace_file`: the database ID of the file to replace, if any
"""
if not current_user.is_authenticated and not CONFIG.TESTING:
return (
jsonify(
{
"status": "error",
"title": "Not Authorized",
"detail": "File upload requires login.",
}
),
401,
)
if len(request.files) == 0:
return jsonify(error="No file in request"), 400
if "item_id" not in request.form:
return jsonify(error="No item id provided in form"), 400
item_id = request.form["item_id"]
replace_file_id = request.form["replace_file"]
if not CONFIG.TESTING:
creator_id = current_user.person.immutable_id
else:
creator_id = PUBLIC_USER_ID
is_update = replace_file_id and replace_file_id != "null"
file = request.files[next(iter(request.files))]
if is_update:
file_information = file_utils.update_uploaded_file(file, ObjectId(replace_file_id))
else:
file_information = file_utils.save_uploaded_file(
file, item_ids=[item_id], creator_ids=[creator_id]
)
return (
jsonify(
{
"status": "success",
"file_id": str(file_information["_id"]),
"file_information": file_information,
"is_update": is_update, # true if update, false if new file
}
),
201,
)
add_remote_file_to_sample()
¶
Source code in pydatalab/routes/v0_1/files.py
@FILES.route("/add-remote-file-to-sample/", methods=["POST"])
def add_remote_file_to_sample():
if not current_user.is_authenticated and not CONFIG.TESTING:
return (
jsonify(
{
"status": "error",
"title": "Not Authorized",
"detail": "Adding a file to a sample requires login.",
}
),
401,
)
request_json = request.get_json()
item_id = request_json["item_id"]
file_entry = request_json["file_entry"]
if not CONFIG.TESTING:
creator_id = current_user.person.immutable_id
else:
creator_id = ObjectId(24 * "0")
updated_file_entry = file_utils.add_file_from_remote_directory(
file_entry, item_id, creator_ids=[creator_id]
)
return (
jsonify(
{
"status": "success",
"file_id": str(updated_file_entry["_id"]),
"file_information": updated_file_entry,
}
),
201,
)
delete_file_from_sample()
¶
Remove a file from a sample, but don't delete the actual file (for now)
Source code in pydatalab/routes/v0_1/files.py
@FILES.route("/delete-file-from-sample/", methods=["POST"])
def delete_file_from_sample():
"""Remove a file from a sample, but don't delete the actual file (for now)"""
if not current_user.is_authenticated and not CONFIG.TESTING:
return (
jsonify(
{
"status": "error",
"title": "Not Authorized",
"detail": "Adding a file to a sample requires login.",
}
),
401,
)
request_json = request.get_json()
item_id = request_json["item_id"]
file_id = ObjectId(request_json["file_id"])
result = pydatalab.mongo.flask_mongo.db.items.update_one(
{"item_id": item_id, **get_default_permissions(user_only=True)},
{"$pull": {"file_ObjectIds": file_id}},
)
if result.modified_count != 1:
return (
jsonify(
status="error",
message=f"Not authorized to perform file removal from sample {item_id=}",
output=result.raw_result,
),
401,
)
updated_file_entry = pydatalab.mongo.flask_mongo.db.files.find_one_and_update(
{"_id": file_id},
{"$pull": {"item_ids": item_id}},
return_document=ReturnDocument.AFTER,
)
if not updated_file_entry:
return (
jsonify(
status="error",
message=f"{item_id} {file_id} delete failed. Something went wrong with the db call to remove sample from file",
),
400,
)
return (
jsonify(
{
"status": "success",
"new_file_obj": {request_json["file_id"]: updated_file_entry},
}
),
200,
)
delete_file()
¶
delete a data file from the uploads/item_id folder
Source code in pydatalab/routes/v0_1/files.py
@FILES.route("/delete-file/", methods=["POST"])
def delete_file():
"""delete a data file from the uploads/item_id folder"""
if not current_user.is_authenticated and not CONFIG.TESTING:
return (
jsonify(
{
"status": "error",
"title": "Not Authorized",
"detail": "Adding a file to a sample requires login.",
}
),
401,
)
request_json = request.get_json()
item_id = request_json["item_id"]
filename = request_json["filename"]
secure_item_id = secure_filename(item_id)
secure_fname = secure_filename(filename)
path = os.path.join(CONFIG.FILE_DIRECTORY, secure_item_id, secure_fname)
if not os.path.isfile(path):
return (
jsonify(
status="error",
message="Delete failed. file not found: {}".format(path),
),
400,
)
result = pydatalab.mongo.flask_mongo.db.items.update_one(
{"item_id": item_id, **get_default_permissions(user_only=True)},
{"$pull": {"files": filename}},
return_document=ReturnDocument.AFTER,
)
if result.matched_count != 1:
return (
jsonify(
status="error",
message=f"{item_id} {filename} delete failed. Something went wrong with the db call. File not deleted.",
output=result.raw_result,
),
400,
)
os.remove(path)
return jsonify({"status": "success"}), 200
graphs
¶
GRAPHS
¶
get_graph_cy_format(item_id: Optional[str] = None, collection_id: Optional[str] = None)
¶
Source code in pydatalab/routes/v0_1/graphs.py
@GRAPHS.route("/item-graph", methods=["GET"])
@GRAPHS.route("/item-graph/<item_id>", methods=["GET"])
def get_graph_cy_format(item_id: Optional[str] = None, collection_id: Optional[str] = None):
collection_id = request.args.get("collection_id", type=str)
if item_id is None:
if collection_id is not None:
collection_immutable_id = flask_mongo.db.collections.find_one(
{"collection_id": collection_id}, projection={"_id": 1}
)
if not collection_immutable_id:
raise RuntimeError("No collection {collection_id=} found.")
collection_immutable_id = collection_immutable_id["_id"]
query = {
"$and": [
{"relationships.immutable_id": collection_immutable_id},
{"relationships.type": "collections"},
]
}
else:
query = {}
all_documents = flask_mongo.db.items.find(
{**query, **get_default_permissions(user_only=False)},
projection={"item_id": 1, "name": 1, "type": 1, "relationships": 1},
)
node_ids: Set[str] = {document["item_id"] for document in all_documents}
all_documents.rewind()
else:
all_documents = list(
flask_mongo.db.items.find(
{
"$or": [{"item_id": item_id}, {"relationships.item_id": item_id}],
**get_default_permissions(user_only=False),
},
projection={"item_id": 1, "name": 1, "type": 1, "relationships": 1},
)
)
node_ids = {document["item_id"] for document in all_documents} | {
relationship.get("item_id")
for document in all_documents
for relationship in document.get("relationships", [])
}
if len(node_ids) > 1:
or_query = [{"item_id": id} for id in node_ids if id != item_id]
next_shell = flask_mongo.db.items.find(
{
"$or": or_query,
**get_default_permissions(user_only=False),
},
projection={"item_id": 1, "name": 1, "type": 1, "relationships": 1},
)
all_documents.extend(next_shell)
node_ids = node_ids | {document["item_id"] for document in all_documents}
nodes = []
edges = []
# Collect the elements that have already been added to the graph, to avoid duplication
drawn_elements = set()
node_collections = set()
for document in all_documents:
# for some reason, document["relationships"] is sometimes equal to None, so we
# need this `or` statement.
for relationship in document.get("relationships") or []:
# only considering child-parent relationships
if relationship.get("type") == "collections" and not collection_id:
collection_data = flask_mongo.db.collections.find_one(
{
"_id": relationship["immutable_id"],
**get_default_permissions(user_only=False),
},
projection={"collection_id": 1, "title": 1, "type": 1},
)
if collection_data:
if relationship["immutable_id"] not in node_collections:
_id = f'Collection: {collection_data["collection_id"]}'
if _id not in drawn_elements:
nodes.append(
{
"data": {
"id": _id,
"name": collection_data["title"],
"type": collection_data["type"],
"shape": "triangle",
}
}
)
node_collections.add(relationship["immutable_id"])
drawn_elements.add(_id)
source = f'Collection: {collection_data["collection_id"]}'
target = document.get("item_id")
edges.append(
{
"data": {
"id": f"{source}->{target}",
"source": source,
"target": target,
"value": 1,
}
}
)
continue
for relationship in document.get("relationships") or []:
# only considering child-parent relationships:
if relationship.get("relation") not in ("parent", "is_part_of"):
continue
target = document["item_id"]
source = relationship["item_id"]
if source not in node_ids:
continue
edge_id = f"{source}->{target}"
if edge_id not in drawn_elements:
drawn_elements.add(edge_id)
edges.append(
{
"data": {
"id": edge_id,
"source": source,
"target": target,
"value": 1,
}
}
)
if document["item_id"] not in drawn_elements:
drawn_elements.add(document["item_id"])
nodes.append(
{
"data": {
"id": document["item_id"],
"name": document["name"],
"type": document["type"],
"special": document["item_id"] == item_id,
}
}
)
# We want to filter out all the starting materials that don't have relationships since there are so many of them:
whitelist = {edge["data"]["source"] for edge in edges}
nodes = [
node
for node in nodes
if node["data"]["type"] in ("samples", "cells") or node["data"]["id"] in whitelist
]
return (jsonify(status="success", nodes=nodes, edges=edges), 200)
healthcheck
¶
HEALTHCHECK
¶
is_ready()
¶
Source code in pydatalab/routes/v0_1/healthcheck.py
@HEALTHCHECK.route("/healthcheck/is_ready", methods=["GET"])
def is_ready():
from pydatalab.mongo import check_mongo_connection
try:
check_mongo_connection()
except RuntimeError:
return (
jsonify(status="error", message="Unable to connect to MongoDB at specified URI."),
500,
)
return (jsonify(status="success", message="Server and database are ready"), 200)
is_alive()
¶
Source code in pydatalab/routes/v0_1/healthcheck.py
@HEALTHCHECK.route("/healthcheck/is_alive", methods=["GET"])
def is_alive():
return (jsonify(status="success", message="Server is alive"), 200)
info
¶
This submodule defines introspective info endpoints of the API.
INFO
¶
SCHEMAS
¶
Attributes (BaseModel)
pydantic-model
¶
Source code in pydatalab/routes/v0_1/info.py
class Attributes(BaseModel):
class Config:
extra = "allow"
Meta (BaseModel)
pydantic-model
¶
Source code in pydatalab/routes/v0_1/info.py
class Meta(BaseModel):
timestamp: datetime = Field(default_factory=datetime.now)
query: str = ""
api_version: str = __api_version__
available_api_versions: List[str] = [__api_version__]
server_version: str = __version__
datamodel_version: str = __version__
Links (BaseModel)
pydantic-model
¶
Source code in pydatalab/routes/v0_1/info.py
class Links(BaseModel):
self: AnyUrl
class Config:
extra = "allow"
self: AnyUrl
pydantic-field
required
¶
Data (BaseModel)
pydantic-model
¶
JSONAPIResponse (BaseModel)
pydantic-model
¶
Source code in pydatalab/routes/v0_1/info.py
class JSONAPIResponse(BaseModel):
data: Union[Data, List[Data]]
meta: Meta
links: Optional[Links]
MetaPerson (BaseModel)
pydantic-model
¶
Info (Attributes, Meta)
pydantic-model
¶
Source code in pydatalab/routes/v0_1/info.py
class Info(Attributes, Meta):
maintainer: Optional[MetaPerson]
issue_tracker: Optional[AnyUrl]
homepage: Optional[AnyUrl]
source_repository: Optional[AnyUrl]
identifier_prefix: str
features: FeatureFlags = FEATURE_FLAGS
@validator("maintainer")
def strip_maintainer_fields(cls, v):
if isinstance(v, Person):
return MetaPerson(contact_email=v.contact_email, display_name=v.display_name)
return v
maintainer: MetaPerson
pydantic-field
¶
issue_tracker: AnyUrl
pydantic-field
¶
homepage: AnyUrl
pydantic-field
¶
source_repository: AnyUrl
pydantic-field
¶
identifier_prefix: str
pydantic-field
required
¶
features: FeatureFlags
pydantic-field
¶
strip_maintainer_fields(v)
classmethod
¶
Source code in pydatalab/routes/v0_1/info.py
@validator("maintainer")
def strip_maintainer_fields(cls, v):
if isinstance(v, Person):
return MetaPerson(contact_email=v.contact_email, display_name=v.display_name)
return v
get_info()
¶
Returns the runtime metadata for the deployment, e.g., versions, features and so on.
Source code in pydatalab/routes/v0_1/info.py
@INFO.route("/info", methods=["GET"])
def get_info():
"""Returns the runtime metadata for the deployment, e.g.,
versions, features and so on.
"""
metadata = _get_deployment_metadata_once()
return (
jsonify(
json.loads(
JSONAPIResponse(
data=Data(id="/", type="info", attributes=Info(**metadata)),
meta=Meta(query=request.query_string),
links=Links(self=request.url),
).json()
)
),
200,
)
get_stats()
¶
Returns a dictionary of counts of each entry type in the deployment
Source code in pydatalab/routes/v0_1/info.py
@INFO.route("/info/stats", methods=["GET"])
def get_stats():
"""Returns a dictionary of counts of each entry type in the deployment"""
user_count = flask_mongo.db.users.count_documents({})
sample_count = flask_mongo.db.items.count_documents({"type": "samples"})
cell_count = flask_mongo.db.items.count_documents({"type": "cells"})
return (
jsonify({"counts": {"users": user_count, "samples": sample_count, "cells": cell_count}}),
200,
)
list_block_types()
¶
Returns a list of all blocks implemented in this server.
Source code in pydatalab/routes/v0_1/info.py
@INFO.route("/info/blocks", methods=["GET"])
def list_block_types():
"""Returns a list of all blocks implemented in this server."""
return jsonify(
json.loads(
JSONAPIResponse(
data=[
Data(
id=block_type,
type="block_type",
attributes={
"name": getattr(block, "name", ""),
"description": getattr(block, "description", ""),
"version": getattr(block, "version", __version__),
"accepted_file_extensions": getattr(
block, "accepted_file_extensions", []
),
},
)
for block_type, block in BLOCK_TYPES.items()
],
meta=Meta(query=request.query_string),
).json()
)
)
get_all_items_models()
¶
Source code in pydatalab/routes/v0_1/info.py
def get_all_items_models():
return Item.__subclasses__()
generate_schemas()
¶
Source code in pydatalab/routes/v0_1/info.py
def generate_schemas():
schemas: dict[str, dict] = {}
for model_class in get_all_items_models() + [Collection]:
model_type = model_class.schema()["properties"]["type"]["default"]
schemas[model_type] = model_class.schema(by_alias=False)
return schemas
list_supported_types()
¶
Returns a list of supported schemas.
Source code in pydatalab/routes/v0_1/info.py
@INFO.route("/info/types", methods=["GET"])
def list_supported_types():
"""Returns a list of supported schemas."""
return jsonify(
json.loads(
JSONAPIResponse(
data=[
Data(
id=item_type,
type="item_type",
attributes={
"version": __version__,
"api_version": __api_version__,
"schema": schema,
},
)
for item_type, schema in SCHEMAS.items()
],
meta=Meta(query=request.query_string),
).json()
)
)
get_schema_type(item_type)
¶
Returns the schema of the given type.
Source code in pydatalab/routes/v0_1/info.py
@INFO.route("/info/types/<string:item_type>", methods=["GET"])
def get_schema_type(item_type):
"""Returns the schema of the given type."""
if item_type not in SCHEMAS:
return jsonify(
{"status": "error", "detail": f"Item type {item_type} not found for this deployment"}
), 404
return jsonify(
json.loads(
JSONAPIResponse(
data=Data(
id=item_type,
type="item_type",
attributes={
"version": __version__,
"api_version": __api_version__,
"schema": SCHEMAS[item_type],
},
),
meta=Meta(query=request.query_string),
).json()
)
)
items
¶
ITEMS
¶
_()
¶
Source code in pydatalab/routes/v0_1/items.py
@ITEMS.before_request
@active_users_or_get_only
def _(): ...
reserialize_blocks(display_order: List[str], blocks_obj: Dict[str, Dict]) -> Dict[str, Dict]
¶
Create the corresponding Python objects from JSON block data, then serialize it again as JSON to populate any missing properties.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
blocks_obj |
Dict[str, Dict] |
A dictionary containing the JSON block data, keyed by block ID. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Dict] |
A dictionary with the re-serialized block data. |
Source code in pydatalab/routes/v0_1/items.py
def reserialize_blocks(display_order: List[str], blocks_obj: Dict[str, Dict]) -> Dict[str, Dict]:
"""Create the corresponding Python objects from JSON block data, then
serialize it again as JSON to populate any missing properties.
Parameters:
blocks_obj: A dictionary containing the JSON block data, keyed by block ID.
Returns:
A dictionary with the re-serialized block data.
"""
for block_id in display_order:
try:
block_data = blocks_obj[block_id]
except KeyError:
LOGGER.warning(f"block_id {block_id} found in display order but not in blocks_obj")
continue
blocktype = block_data["blocktype"]
blocks_obj[block_id] = (
BLOCK_TYPES.get(blocktype, BLOCK_TYPES["notsupported"]).from_db(block_data).to_web()
)
return blocks_obj
dereference_files(file_ids: List[Union[str, bson.objectid.ObjectId]]) -> Dict[str, Dict]
¶
For a list of Object IDs (as strings or otherwise), query the files collection and return a dictionary of the data stored under each ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_ids |
List[Union[str, bson.objectid.ObjectId]] |
The list of IDs of files to return; |
required |
Returns:
Type | Description |
---|---|
Dict[str, Dict] |
The dereferenced data as a dictionary with (string) ID keys. |
Source code in pydatalab/routes/v0_1/items.py
def dereference_files(file_ids: List[Union[str, ObjectId]]) -> Dict[str, Dict]:
"""For a list of Object IDs (as strings or otherwise), query the files collection
and return a dictionary of the data stored under each ID.
Parameters:
file_ids: The list of IDs of files to return;
Returns:
The dereferenced data as a dictionary with (string) ID keys.
"""
results = {
str(f["_id"]): f
for f in flask_mongo.db.files.find(
{
"_id": {"$in": [ObjectId(_id) for _id in file_ids]},
}
)
}
if len(results) != len(file_ids):
raise RuntimeError(
"Some file IDs did not have corresponding database entries.\n"
f"Returned: {list(results.keys())}\n"
f"Requested: {file_ids}\n"
)
return results
get_equipment_summary()
¶
Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/equipment/", methods=["GET"])
def get_equipment_summary():
_project = {
"_id": 0,
"item_id": 1,
"name": 1,
"type": 1,
"date": 1,
"refcode": 1,
"location": 1,
}
items = [
doc
for doc in flask_mongo.db.items.aggregate(
[
{
"$match": {
"type": "equipment",
}
},
{"$project": _project},
]
)
]
return jsonify({"status": "success", "items": items})
get_starting_materials()
¶
Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/starting-materials/", methods=["GET"])
def get_starting_materials():
items = [
doc
for doc in flask_mongo.db.items.aggregate(
[
{
"$match": {
"type": "starting_materials",
**get_default_permissions(user_only=False),
}
},
{
"$project": {
"_id": 0,
"item_id": 1,
"nblocks": {"$size": "$display_order"},
"date": 1,
"chemform": 1,
"name": 1,
"type": 1,
"chemical_purity": 1,
"supplier": 1,
"location": 1,
}
},
]
)
]
return jsonify({"status": "success", "items": items})
get_samples_summary(match: Optional[Dict] = None, project: Optional[Dict] = None) -> CommandCursor
¶
Return a summary of item entries that match some criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
match |
Optional[Dict] |
A MongoDB aggregation match query to filter the results. |
None |
project |
Optional[Dict] |
A MongoDB aggregation project query to filter the results, relative to the default included below. |
None |
Source code in pydatalab/routes/v0_1/items.py
def get_samples_summary(
match: Optional[Dict] = None, project: Optional[Dict] = None
) -> CommandCursor:
"""Return a summary of item entries that match some criteria.
Parameters:
match: A MongoDB aggregation match query to filter the results.
project: A MongoDB aggregation project query to filter the results, relative
to the default included below.
"""
if not match:
match = {}
match.update(get_default_permissions(user_only=False))
match["type"] = {"$in": ["samples", "cells"]}
_project = {
"_id": 0,
"blocks": {"blocktype": 1, "title": 1},
"creators": {
"display_name": 1,
"contact_email": 1,
},
"collections": {
"collection_id": 1,
"title": 1,
},
"item_id": 1,
"name": 1,
"chemform": 1,
"nblocks": {"$size": "$display_order"},
"characteristic_chemical_formula": 1,
"type": 1,
"date": 1,
"refcode": 1,
}
# Cannot mix 0 and 1 keys in MongoDB project so must loop and check
if project:
for key in project:
if project[key] == 0:
_project.pop(key, None)
else:
_project[key] = 1
return flask_mongo.db.items.aggregate(
[
{"$match": match},
{"$lookup": creators_lookup()},
{"$lookup": collections_lookup()},
{"$project": _project},
{"$sort": {"date": -1}},
]
)
creators_lookup() -> Dict
¶
Source code in pydatalab/routes/v0_1/items.py
def creators_lookup() -> Dict:
return {
"from": "users",
"let": {"creator_ids": "$creator_ids"},
"pipeline": [
{"$match": {"$expr": {"$in": ["$_id", {"$ifNull": ["$$creator_ids", []]}]}}},
{"$addFields": {"__order": {"$indexOfArray": ["$$creator_ids", "$_id"]}}},
{"$sort": {"__order": 1}},
{"$project": {"_id": 1, "display_name": 1, "contact_email": 1}},
],
"as": "creators",
}
files_lookup() -> Dict
¶
Source code in pydatalab/routes/v0_1/items.py
def files_lookup() -> Dict:
return {
"from": "files",
"localField": "file_ObjectIds",
"foreignField": "_id",
"as": "files",
}
collections_lookup() -> Dict
¶
Looks inside the relationships of the item, searches for IDs in the collections table and then projects only the collection ID and name for the response.
Source code in pydatalab/routes/v0_1/items.py
def collections_lookup() -> Dict:
"""Looks inside the relationships of the item, searches for IDs in the collections
table and then projects only the collection ID and name for the response.
"""
return {
"from": "collections",
"let": {"collection_ids": "$relationships.immutable_id"},
"pipeline": [
{
"$match": {
"$expr": {
"$in": ["$_id", {"$ifNull": ["$$collection_ids", []]}],
},
"type": "collections",
}
},
{"$project": {"_id": 1, "collection_id": 1}},
],
"as": "collections",
}
get_samples()
¶
Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/samples/", methods=["GET"])
def get_samples():
return jsonify({"status": "success", "samples": list(get_samples_summary())})
search_items()
¶
Perform free text search on items and return the top results. GET parameters: query: String with the search terms. nresults: Maximum number of (default 100) !!! types "If None, search all types of items. Otherwise, a list of strings" giving the types to consider. (e.g. ["samples","starting_materials"])
Returns:
Type | Description |
---|---|
response list of dictionaries containing the matching items in order of descending match score. |
Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/search-items/", methods=["GET"])
def search_items():
"""Perform free text search on items and return the top results.
GET parameters:
query: String with the search terms.
nresults: Maximum number of (default 100)
types: If None, search all types of items. Otherwise, a list of strings
giving the types to consider. (e.g. ["samples","starting_materials"])
Returns:
response list of dictionaries containing the matching items in order of
descending match score.
"""
query = request.args.get("query", type=str)
nresults = request.args.get("nresults", default=100, type=int)
types = request.args.get("types", default=None)
if isinstance(types, str):
# should figure out how to parse as list automatically
types = types.split(",")
match_obj = {
"$text": {"$search": query},
**get_default_permissions(user_only=False),
}
if types is not None:
match_obj["type"] = {"$in": types}
cursor = flask_mongo.db.items.aggregate(
[
{"$match": match_obj},
{"$sort": {"score": {"$meta": "textScore"}}},
{"$limit": nresults},
{
"$project": {
"_id": 0,
"type": 1,
"item_id": 1,
"name": 1,
"chemform": 1,
"refcode": 1,
}
},
]
)
return jsonify({"status": "success", "items": list(cursor)}), 200
create_sample()
¶
Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/new-sample/", methods=["POST"])
def create_sample():
request_json = request.get_json() # noqa: F821 pylint: disable=undefined-variable
if "new_sample_data" in request_json:
response, http_code = _create_sample(
sample_dict=request_json["new_sample_data"],
copy_from_item_id=request_json.get("copy_from_item_id"),
generate_id_automatically=request_json.get("generate_id_automatically", False),
)
else:
response, http_code = _create_sample(request_json)
return jsonify(response), http_code
create_samples()
¶
attempt to create multiple samples at once. Because each may result in success or failure, 207 is returned along with a json field containing all the individual http_codes
Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/new-samples/", methods=["POST"])
def create_samples():
"""attempt to create multiple samples at once.
Because each may result in success or failure, 207 is returned along with a
json field containing all the individual http_codes"""
request_json = request.get_json() # noqa: F821 pylint: disable=undefined-variable
sample_jsons = request_json["new_sample_datas"]
copy_from_item_ids = request_json.get("copy_from_item_ids")
generate_ids_automatically = request_json.get("generate_ids_automatically")
if copy_from_item_ids is None:
copy_from_item_ids = [None] * len(sample_jsons)
outputs = [
_create_sample(
sample_dict=sample_json,
copy_from_item_id=copy_from_item_id,
generate_id_automatically=generate_ids_automatically,
)
for sample_json, copy_from_item_id in zip(sample_jsons, copy_from_item_ids)
]
responses, http_codes = zip(*outputs)
statuses = [response["status"] for response in responses]
nsuccess = statuses.count("success")
nerror = statuses.count("error")
return (
jsonify(
nsuccess=nsuccess,
nerror=nerror,
responses=responses,
http_codes=http_codes,
),
207,
) # 207: multi-status
update_item_permissions(refcode: str)
¶
Update the permissions of an item with the given refcode.
Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/items/<refcode>/permissions", methods=["PATCH"])
def update_item_permissions(refcode: str):
"""Update the permissions of an item with the given refcode."""
request_json = request.get_json()
creator_ids: list[ObjectId] = []
if not len(refcode.split(":")) == 2:
refcode = f"{CONFIG.IDENTIFIER_PREFIX}:{refcode}"
current_item = flask_mongo.db.items.find_one(
{"refcode": refcode, **get_default_permissions(user_only=True)},
{"_id": 1, "creator_ids": 1},
) # type: ignore
if not current_item:
return (
jsonify(
{
"status": "error",
"message": f"No valid item found with the given {refcode=}.",
}
),
401,
)
current_creator_ids = current_item["creator_ids"]
if "creators" in request_json:
creator_ids = [
ObjectId(creator.get("immutable_id", None))
for creator in request_json["creators"]
if creator.get("immutable_id", None) is not None
]
if not creator_ids:
return (
jsonify(
{
"status": "error",
"message": "No valid creator IDs found in the request.",
}
),
400,
)
# Validate all creator IDs are present in the database
found_ids = [d for d in flask_mongo.db.users.find({"_id": {"$in": creator_ids}}, {"_id": 1})] # type: ignore
if not len(found_ids) == len(creator_ids):
return (
jsonify(
{
"status": "error",
"message": "One or more creator IDs not found in the database.",
}
),
400,
)
# Make sure a user cannot remove their own access to an item
current_user_id = current_user.person.immutable_id
try:
creator_ids.remove(current_user_id)
except ValueError:
pass
creator_ids.insert(0, current_user_id)
# The first ID in the creator list takes precedence; always make sure this is included to avoid orphaned items
if current_creator_ids:
base_owner = current_creator_ids[0]
try:
creator_ids.remove(base_owner)
except ValueError:
pass
creator_ids.insert(0, base_owner)
if set(creator_ids) == set(current_creator_ids):
# Short circuit if the creator IDs are the same
return jsonify({"status": "success"}), 200
LOGGER.warning("Setting permissions for item %s to %s", refcode, creator_ids)
result = flask_mongo.db.items.update_one(
{"refcode": refcode, **get_default_permissions(user_only=True)},
{"$set": {"creator_ids": creator_ids}},
)
if not result.modified_count == 1:
return jsonify(
{
"status": "error",
"message": "Failed to update permissions: you cannot remove yourself or the base owner as a creator.",
}
), 400
return jsonify({"status": "success"}), 200
delete_sample()
¶
Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/delete-sample/", methods=["POST"])
def delete_sample():
request_json = request.get_json() # noqa: F821 pylint: disable=undefined-variable
item_id = request_json["item_id"]
result = flask_mongo.db.items.delete_one(
{"item_id": item_id, **get_default_permissions(user_only=True, deleting=True)}
)
if result.deleted_count != 1:
return (
jsonify(
{
"status": "error",
"message": f"Authorization required to attempt to delete sample with {item_id=} from the database.",
}
),
401,
)
return (
jsonify(
{
"status": "success",
}
),
200,
)
get_item_data(item_id: str | None = None, refcode: str | None = None, load_blocks: bool = False)
¶
Generates a JSON response for the item with the given item_id
,
or refcode
additionally resolving relationships to files and other items.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
load_blocks |
bool |
Whether to regenerate any data blocks associated with this sample (i.e., create the Python object corresponding to the block and call its render function). |
False |
Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/items/<refcode>", methods=["GET"])
@ITEMS.route("/get-item-data/<item_id>", methods=["GET"])
def get_item_data(
item_id: str | None = None, refcode: str | None = None, load_blocks: bool = False
):
"""Generates a JSON response for the item with the given `item_id`,
or `refcode` additionally resolving relationships to files and other items.
Parameters:
load_blocks: Whether to regenerate any data blocks associated with this
sample (i.e., create the Python object corresponding to the block and
call its render function).
"""
redirect_to_ui = bool(request.args.get("redirect-to-ui", default=False, type=json.loads))
if refcode and redirect_to_ui and CONFIG.APP_URL:
return redirect(f"{CONFIG.APP_URL}/items/{refcode}", code=307)
if item_id:
match = {"item_id": item_id}
elif refcode:
if not len(refcode.split(":")) == 2:
refcode = f"{CONFIG.IDENTIFIER_PREFIX}:{refcode}"
match = {"refcode": refcode}
else:
return (
jsonify(
{
"status": "error",
"message": "No item_id or refcode provided.",
}
),
400,
)
# retrieve the entry from the database:
cursor = flask_mongo.db.items.aggregate(
[
{
"$match": {
**match,
**get_default_permissions(user_only=False),
}
},
{"$lookup": creators_lookup()},
{"$lookup": collections_lookup()},
{"$lookup": files_lookup()},
],
)
try:
doc = list(cursor)[0]
except IndexError:
doc = None
if not doc or (
not current_user.is_authenticated
and not CONFIG.TESTING
and not doc["type"] == "starting_materials"
):
return (
jsonify(
{
"status": "error",
"message": f"No matching items for {match=} with current authorization.",
}
),
404,
)
# determine the item type and validate according to the appropriate schema
try:
ItemModel = ITEM_MODELS[doc["type"]]
except KeyError:
if "type" in doc:
raise KeyError(f"Item {item_id=} has invalid type: {doc['type']}")
else:
raise KeyError(f"Item {item_id=} has no type field in document.")
doc = ItemModel(**doc)
if load_blocks:
doc.blocks_obj = reserialize_blocks(doc.display_order, doc.blocks_obj)
# find any documents with relationships that mention this document
relationships_query_results = flask_mongo.db.items.find(
filter={
"$or": [
{"relationships.item_id": doc.item_id},
{"relationships.refcode": doc.refcode},
{"relationships.immutable_id": doc.immutable_id},
]
},
projection={
"item_id": 1,
"refcode": 1,
"relationships": {
"$elemMatch": {
"$or": [
{"item_id": doc.item_id},
{"refcode": doc.refcode},
],
},
},
},
)
# loop over and collect all 'outer' relationships presented by other items
incoming_relationships: Dict[RelationshipType, Set[str]] = {}
for d in relationships_query_results:
for k in d["relationships"]:
if k["relation"] not in incoming_relationships:
incoming_relationships[k["relation"]] = set()
incoming_relationships[k["relation"]].add(
d["item_id"] or d["refcode"] or d["immutable_id"]
)
# loop over and aggregate all 'inner' relationships presented by this item
inlined_relationships: Dict[RelationshipType, Set[str]] = {}
if doc.relationships is not None:
inlined_relationships = {
relation: {
d.item_id or d.refcode or d.immutable_id
for d in doc.relationships
if d.relation == relation
}
for relation in RelationshipType
}
# reunite parents and children from both directions of the relationships field
parents = incoming_relationships.get(RelationshipType.CHILD, set()).union(
inlined_relationships.get(RelationshipType.PARENT, set())
)
children = incoming_relationships.get(RelationshipType.PARENT, set()).union(
inlined_relationships.get(RelationshipType.CHILD, set())
)
# Must be exported to JSON first to apply the custom pydantic JSON encoders
return_dict = json.loads(doc.json(exclude_unset=True))
if item_id is None:
item_id = return_dict["item_id"]
# create the files_data dictionary keyed by file ObjectId
files_data: Dict[ObjectId, Dict] = {
f["immutable_id"]: f for f in return_dict.get("files") or []
}
return jsonify(
{
"status": "success",
"item_id": item_id,
"item_data": return_dict,
"files_data": files_data,
"child_items": sorted(children),
"parent_items": sorted(parents),
}
)
save_item()
¶
Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/save-item/", methods=["POST"])
def save_item():
request_json = request.get_json() # noqa: F821 pylint: disable=undefined-variable
item_id = request_json["item_id"]
updated_data = request_json["data"]
# These keys should not be updated here and cannot be modified by the user through this endpoint
for k in (
"_id",
"file_ObjectIds",
"creators",
"creator_ids",
"item_id",
"relationships",
):
if k in updated_data:
del updated_data[k]
updated_data["last_modified"] = datetime.datetime.now(tz=datetime.timezone.utc).isoformat()
for block_id, block_data in updated_data.get("blocks_obj", {}).items():
blocktype = block_data["blocktype"]
block = BLOCK_TYPES.get(blocktype, BLOCK_TYPES["notsupported"]).from_web(block_data)
updated_data["blocks_obj"][block_id] = block.to_db()
user_only = updated_data["type"] not in ("starting_materials", "equipment")
item = flask_mongo.db.items.find_one(
{"item_id": item_id, **get_default_permissions(user_only=user_only)}
)
# Bit of a hack for now: starting materials and equipment should be editable by anyone,
# so we adjust the query above to be more permissive when the user is requesting such an item
# but before returning we need to check that the actual item did indeed have that type
if not item or not user_only and item["type"] not in ("starting_materials", "equipment"):
return (
jsonify(
status="error",
message=f"Unable to find item with appropriate permissions and {item_id=}.",
),
400,
)
if updated_data.get("collections", []):
try:
updated_data["collections"] = _check_collections(updated_data)
except ValueError as exc:
return (
dict(
status="error",
message=f"Cannot update {item_id!r} with missing collections {updated_data['collections']!r}: {exc}",
item_id=item_id,
),
401,
)
item_type = item["type"]
item.update(updated_data)
try:
item = ITEM_MODELS[item_type](**item).dict()
except ValidationError as exc:
return (
jsonify(
status="error",
message=f"Unable to update item {item_id=} ({item_type=}) with new data {updated_data}",
output=str(exc),
),
400,
)
# remove collections and creators and any other reference fields
item.pop("collections")
item.pop("creators")
result = flask_mongo.db.items.update_one(
{"item_id": item_id},
{"$set": item},
)
if result.matched_count != 1:
return (
jsonify(
status="error",
message=f"{item_id} item update failed. no subdocument matched",
output=result.raw_result,
),
400,
)
return jsonify(status="success", last_modified=updated_data["last_modified"]), 200
search_users()
¶
Perform free text search on users and return the top results. GET parameters: query: String with the search terms. nresults: Maximum number of (default 100)
Returns:
Type | Description |
---|---|
response list of dictionaries containing the matching items in order of descending match score. |
Source code in pydatalab/routes/v0_1/items.py
@ITEMS.route("/search-users/", methods=["GET"])
def search_users():
"""Perform free text search on users and return the top results.
GET parameters:
query: String with the search terms.
nresults: Maximum number of (default 100)
Returns:
response list of dictionaries containing the matching items in order of
descending match score.
"""
query = request.args.get("query", type=str)
nresults = request.args.get("nresults", default=100, type=int)
types = request.args.get("types", default=None)
match_obj = {"$text": {"$search": query}}
if types is not None:
match_obj["type"] = {"$in": types}
cursor = flask_mongo.db.users.aggregate(
[
{"$match": match_obj},
{"$sort": {"score": {"$meta": "textScore"}}},
{"$limit": nresults},
{
"$project": {
"_id": 1,
"identities": 1,
"display_name": 1,
"contact_email": 1,
}
},
]
)
return jsonify(
{"status": "success", "users": list(json.loads(Person(**d).json()) for d in cursor)}
), 200
remotes
¶
REMOTES
¶
list_remote_directories()
¶
Returns the most recent directory structures from the server.
If the cache is missing or is older than some configured time, then it will be reconstructed.
Source code in pydatalab/routes/v0_1/remotes.py
@REMOTES.route("/list-remote-directories", methods=["GET"])
@REMOTES.route("/remotes", methods=["GET"])
def list_remote_directories():
"""Returns the most recent directory structures from the server.
If the cache is missing or is older than some configured time,
then it will be reconstructed.
"""
if not current_user.is_authenticated and not CONFIG.TESTING:
return (
jsonify(
{
"status": "error",
"title": "Not Authorized",
"detail": "Listing remote directories requires authentication.",
}
),
401,
)
try:
invalidate_cache = _check_invalidate_cache(request.args)
except RuntimeError as e:
return (
jsonify(
{
"status": "error",
"title": "Invalid Argument",
"detail": str(e),
}
),
400,
)
all_directory_structures = get_directory_structures(
CONFIG.REMOTE_FILESYSTEMS, invalidate_cache=invalidate_cache
)
response = {}
response["meta"] = {}
response["meta"]["remotes"] = [json.loads(d.json()) for d in CONFIG.REMOTE_FILESYSTEMS]
if all_directory_structures:
oldest_update = min(d["last_updated"] for d in all_directory_structures)
response["meta"]["oldest_cache_update"] = oldest_update.isoformat()
response["data"] = all_directory_structures
return jsonify(response), 200
get_remote_directory(remote_id: str)
¶
Returns the directory structure from the server for the given configured remote name.
Source code in pydatalab/routes/v0_1/remotes.py
@REMOTES.route("/remotes/<path:remote_id>", methods=["GET"])
def get_remote_directory(remote_id: str):
"""Returns the directory structure from the server for the
given configured remote name.
"""
if not current_user.is_authenticated and not CONFIG.TESTING:
return (
jsonify(
{
"status": "error",
"title": "Not Authorized",
"detail": "Listing remote directories requires authentication.",
}
),
401,
)
try:
invalidate_cache = _check_invalidate_cache(request.args)
except RuntimeError as e:
return (
jsonify(
{
"status": "error",
"title": "Invalid Argument",
"detail": str(e),
}
),
400,
)
for d in CONFIG.REMOTE_FILESYSTEMS:
if remote_id == d.name:
remote_obj = d
break
else:
return (
jsonify(
{
"status": "error",
"title": "Not Found",
"detail": f"No remote found with name {remote_id!r}",
}
),
404,
)
directory_structure = get_directory_structure(remote_obj, invalidate_cache=invalidate_cache)
response: Dict[str, Any] = {}
response["meta"] = {}
response["meta"]["remote"] = json.loads(d.json())
response["data"] = directory_structure
return jsonify(response), 200
users
¶
USERS
¶
save_user(user_id)
¶
Source code in pydatalab/routes/v0_1/users.py
@USERS.route("/users/<user_id>", methods=["PATCH"])
def save_user(user_id):
request_json = request.get_json()
display_name: str | None = None
contact_email: str | None = None
account_status: str | None = None
if request_json is not None:
display_name = request_json.get("display_name", False)
contact_email = request_json.get("contact_email", False)
account_status = request_json.get("account_status", None)
if not current_user.is_authenticated and not CONFIG.TESTING:
return (jsonify({"status": "error", "message": "No user authenticated."}), 401)
if not CONFIG.TESTING and current_user.id != user_id and current_user.role != "admin":
return (
jsonify({"status": "error", "message": "User not allowed to edit this profile."}),
403,
)
update = {}
try:
if display_name:
update["display_name"] = DisplayName(display_name)
if contact_email or contact_email in (None, ""):
if contact_email in ("", None):
update["contact_email"] = None
else:
update["contact_email"] = EmailStr(contact_email)
if account_status:
update["account_status"] = account_status
except ValueError as e:
return jsonify(
{"status": "error", "message": f"Invalid display name or email was passed: {str(e)}"}
), 400
if not update:
return jsonify({"status": "success", "message": "No update was performed."}), 200
update_result = flask_mongo.db.users.update_one({"_id": ObjectId(user_id)}, {"$set": update})
if update_result.matched_count != 1:
return (jsonify({"status": "error", "message": "Unable to update user."}), 400)
if update_result.modified_count != 1:
return (
jsonify(
{
"status": "success",
"message": "No update was performed",
}
),
200,
)
return (jsonify({"status": "success"}), 200)