Source code for indra_network_search.rest_api

"""
The IndraNetworkSearch REST API
"""
import logging
from datetime import date
from os import environ
from typing import List, Optional

from fastapi import FastAPI, Query as RestQuery, BackgroundTasks
from pydantic import ValidationError

from depmap_analysis.util.io_functions import file_opener
from indra.databases import get_identifiers_url
from indra_network_search.data_models.rest_models import Health, ServerStatus
from indra_network_search.rest_util import (
    load_indra_graph,
    check_existence_and_date_s3,
    dump_result_json_to_s3,
    dump_query_json_to_s3,
)
from indra_network_search.data_models import (
    Results,
    NetworkSearchQuery,
    SubgraphRestQuery,
    SubgraphResults,
    Node,
    MultiInteractorsRestQuery,
    MultiInteractorsResults,
)
from indra_network_search.autocomplete import NodesTrie, Prefixes
from indra_network_search.search_api import IndraNetworkSearchAPI
from depmap_analysis.network_functions.net_functions import bio_ontology


app = FastAPI()

logger = logging.getLogger(__name__)

DEBUG = environ.get("API_DEBUG") == "1"
USE_CACHE = environ.get("USE_CACHE") == "1"
HEALTH = Health(status="booting")
STATUS = ServerStatus(status="booting", graph_date="2021-08-09")
network_search_api: IndraNetworkSearchAPI
nsid_trie: NodesTrie
nodes_trie: NodesTrie


[docs]@app.get("/xrefs", response_model=List[List[str]]) def get_xrefs(ns: str, id: str) -> List[List[str]]: """Get all cross-refs given a namespace and ID Parameters ---------- ns : The namespace of the entity to find cross-refs for id : The identifier of the entity to find cross-regs for Returns ------- : A list of tuples containing namespace, identifier, lookup url to identifiers.org """ # Todo: offload util features and capabilities, such as this one, to a new # UtilApi class xrefs = bio_ontology.get_mappings(ns=ns, id=id) xrefs_w_lookup = [[n, i, get_identifiers_url(n, i)] for n, i in xrefs] return xrefs_w_lookup
[docs]@app.get("/node-name-in-graph", response_model=Optional[Node]) def node_name_in_graph( node_name: str = RestQuery(..., min_length=1, alias="node-name") ) -> Optional[Node]: """Check if node by provided name (case sensitive) exists in graph Parameters ---------- node_name : The name of the node to check Returns ------- : When a match is found, the full information of the node is returned """ node = network_search_api.get_node(node_name) if node: return node
[docs]@app.get("/node-id-in-graph", response_model=Optional[Node]) def node_id_in_graph( db_name: str = RestQuery(..., min_length=2, alias="db-name"), db_id: str = RestQuery(..., min_length=1, alias="db-id"), ) -> Optional[Node]: """Check if a node by provided db name and db id exists Parameters ---------- db_name : The database name, e.g. hgnc, chebi or up db_id : The identifier for the entity in the given database, e.g. 11018 Returns ------- : When a match is found, the full information of the node is returned """ node = network_search_api.get_node_by_ns_id(db_ns=db_name, db_id=db_id) if node: return node
[docs]@app.get("/autocomplete", response_model=Prefixes) def get_prefix_autocomplete( prefix: str = RestQuery(..., min_length=1), max_res: int = RestQuery(100, alias="max-results"), ) -> Prefixes: """Get the case-insensitive node names with (ns, id) starting in prefix Parameters ---------- prefix : The prefix of a node name to search for. Note: for prefixes of 1 and 2 characters, only exact matches are returned. For 3+ characters, prefix matching is done. If the prefix contains ':', an namespace:id search is done. max_res : The top ranked (by node degree) results will be returned, cut off at this many results. Returns ------- : A list of tuples of (node name, namespace, identifier) """ # Catch very short entity names if 1 <= len(prefix) <= 2 and ":" not in prefix: logger.info("Got short node name lookup") # Loop all combinations of upper and lowercase if len(prefix) == 1: nodes = [] upper_match = network_search_api.get_node(prefix.upper()) lower_match = network_search_api.get_node(prefix.lower()) if upper_match: nodes.append( [upper_match.name, upper_match.namespace, upper_match.identifier] ) if lower_match: nodes.append( [lower_match.name, lower_match.namespace, lower_match.identifier] ) else: nodes = [] n1 = prefix.upper() n2 = prefix[0].lower() + prefix.upper()[1] n3 = prefix[0].upper() + prefix.lower()[1] n4 = prefix.lower() for p in [n1, n2, n3, n4]: m = network_search_api.get_node(p) if m: nodes.append([m.name, m.namespace, m.identifier]) # Look up ns:id searches elif ":" in prefix: logger.info("Got ns:id prefix check") nodes = nsid_trie.case_items(prefix=prefix, top_n=max_res) else: logger.info("Got name prefix check") nodes = nodes_trie.case_items(prefix=prefix, top_n=max_res) logger.info(f"Prefix query resolved with {len(nodes)} suggestions") return nodes
[docs]@app.get("/health", response_model=Health) async def health(): """Returns health status Returns ------- Health """ logger.info("Got health check") return HEALTH
[docs]@app.get("/status", response_model=ServerStatus) async def server_status(): """Returns the status of the server and some info about the loaded graphs Returns ------- : """ logger.info("Got status check") return STATUS
[docs]@app.post("/query", response_model=Results) def query(search_query: NetworkSearchQuery, background_tasks: BackgroundTasks): """Interface with IndraNetworkSearchAPI.handle_query Parameters ---------- search_query : NetworkSearchQuery Query to the NetworkSearchQuery Returns ------- Results """ query_hash = search_query.get_hash() logger.info(f"Got NetworkSearchQuery #{query_hash}: {search_query.dict()}") # Check if results are on S3 keys_dict = check_existence_and_date_s3(query_hash=query_hash) if keys_dict.get("result_json_key"): logger.info("Found results cached on S3") results_json = file_opener(keys_dict["result_json_key"]) try: results = Results(**results_json) except ValidationError as verr: logger.error(verr) logger.info("Result could not be validated, re-running search") results = network_search_api.handle_query(rest_query=search_query) logger.info("Uploading results to S3") background_tasks.add_task( dump_result_json_to_s3, query_hash, results.dict() ) background_tasks.add_task( dump_query_json_to_s3, query_hash, search_query.dict() ) else: logger.info("Performing new search") results = network_search_api.handle_query(rest_query=search_query) logger.info("Uploading results to S3") background_tasks.add_task(dump_result_json_to_s3, query_hash, results.dict()) background_tasks.add_task( dump_query_json_to_s3, query_hash, search_query.dict() ) return results
@app.post("/multi_interactors", response_model=MultiInteractorsResults) def multi_interactors(search_query: MultiInteractorsRestQuery): logger.info(f"Got multi interactors query with {len(search_query.nodes)} nodes") results = network_search_api.handle_multi_interactors_query( multi_interactors_rest_query=search_query ) logger.info("Multi interactors query resolved") return results
[docs]@app.post("/subgraph", response_model=SubgraphResults) def sub_graph(search_query: SubgraphRestQuery): """Interface with IndraNetworkSearchAPI.handle_subgraph_query Parameters ---------- search_query: SubgraphRestQuery Query to for IndraNetworkSearchAPI.handle_subgraph_query Returns ------- SubgraphResults """ logger.info(f"Got subgraph query with {len(search_query.nodes)} nodes") subgraph_results = network_search_api.handle_subgraph_query( subgraph_rest_query=search_query ) logger.info("Subgraph query resolved") return subgraph_results
@app.on_event("startup") async def startup_event(): global network_search_api, nsid_trie, nodes_trie # Todo: figure out how to do all the loading async so the server is # available to respond to health checks while it's loading # See: # - https://fastapi.tiangolo.com/advanced/events/#startup-event # - https://www.starlette.io/events/ if DEBUG: from indra_network_search.tests.util import ( _setup_graph, _setup_signed_node_graph, ) dir_graph = _setup_graph() sign_node_graph = _setup_signed_node_graph(False) else: dir_graph, _, _, sign_node_graph = load_indra_graph( unsigned_graph=True, unsigned_multi_graph=False, sign_node_graph=True, sign_edge_graph=False, use_cache=USE_CACHE, ) bio_ontology.initialize() # Get a Trie for autocomplete logger.info("Loading Trie structure with unsigned graph nodes") nodes_trie = NodesTrie.from_node_names(graph=dir_graph) nsid_trie = NodesTrie.from_node_ns_id(graph=dir_graph) # Set numbers for server status STATUS.unsigned_nodes = len(dir_graph.nodes) STATUS.unsigned_edges = len(dir_graph.edges) STATUS.signed_nodes = len(sign_node_graph.nodes) STATUS.signed_edges = len(sign_node_graph.edges) dt = dir_graph.graph.get("date") STATUS.graph_date = date.fromisoformat(dt) if dt else None # Setup search API logger.info("Setting up IndraNetworkSearchAPI with signed and unsigned " "graphs") network_search_api = IndraNetworkSearchAPI( unsigned_graph=dir_graph, signed_node_graph=sign_node_graph ) logger.info("Service is available") STATUS.status = "available" HEALTH.status = "available"