mirror of
https://github.com/beak-insights/felicity-lims.git
synced 2025-02-24 00:42:59 +08:00
466 lines
16 KiB
Python
466 lines
16 KiB
Python
import logging
|
|
from typing import List, Optional
|
|
|
|
import sqlalchemy as sa
|
|
import strawberry # noqa
|
|
|
|
from felicity.api.gql import PageInfo
|
|
from felicity.api.gql.analysis.types import analysis as a_types
|
|
from felicity.api.gql.analysis.types import results as r_types
|
|
from felicity.apps.analysis import conf as analysis_conf
|
|
from felicity.apps.analysis.models import analysis as a_models
|
|
from felicity.apps.analysis.models import qc as qc_models
|
|
from felicity.apps.analysis.models import results as r_models
|
|
from felicity.apps.analysis.utils import sample_search
|
|
from felicity.core.uid_gen import FelicityID
|
|
from felicity.utils import has_value_or_is_truthy
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@strawberry.type
|
|
class AnalysisQuery:
|
|
@strawberry.field
|
|
async def sample_type_all(self, info) -> List[a_types.SampleTypeTyp]:
|
|
return await a_models.SampleType.all()
|
|
|
|
@strawberry.field
|
|
async def sample_type_by_uid(self, info, uid: FelicityID) -> a_types.SampleTypeTyp:
|
|
return await a_models.SampleType.get(uid=uid)
|
|
|
|
@strawberry.field
|
|
async def sample_all(
|
|
self,
|
|
info,
|
|
page_size: Optional[int] = None,
|
|
after_cursor: Optional[str] = None,
|
|
before_cursor: Optional[str] = None,
|
|
text: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
client_uid: Optional[FelicityID] = None,
|
|
sort_by: Optional[List[str]] = None,
|
|
) -> r_types.SampleCursorPage:
|
|
filters = []
|
|
|
|
_or_text_ = {}
|
|
if has_value_or_is_truthy(text):
|
|
arg_list = [
|
|
"sample_id__ilike",
|
|
"analysis_request___patient___first_name__ilike",
|
|
"analysis_request___patient___last_name__ilike",
|
|
"analysis_request___patient___client_patient_id__ilike",
|
|
"analysis_request___client_request_id__ilike",
|
|
]
|
|
for _arg in arg_list:
|
|
_or_text_[_arg] = f"%{text}%"
|
|
|
|
text_filters = {sa.or_: _or_text_}
|
|
filters.append(text_filters)
|
|
|
|
if client_uid:
|
|
filters.append(
|
|
{"analysis_request___client_uid__exact": client_uid})
|
|
|
|
if status:
|
|
filters.append({"status__exact": status})
|
|
|
|
# Exclude QC Sample else front-end will throw ?????
|
|
# filters.append({'internal_use__ne': True})
|
|
filters.append({"analysis_request__ne": None})
|
|
|
|
page = await a_models.Sample.paginate_with_cursors(
|
|
page_size=page_size,
|
|
after_cursor=after_cursor,
|
|
before_cursor=before_cursor,
|
|
filters=filters,
|
|
sort_by=sort_by,
|
|
)
|
|
|
|
total_count: int = page.total_count
|
|
edges: List[r_types.SampleEdge[r_types.SamplesWithResults]] = page.edges
|
|
items: List[r_types.SamplesWithResults] = page.items
|
|
page_info: PageInfo = page.page_info
|
|
|
|
print([it.uid for it in items])
|
|
return r_types.SampleCursorPage(
|
|
total_count=total_count, edges=edges, items=items, page_info=page_info
|
|
)
|
|
|
|
# awaiting deprecation since sample_all can now achieve this
|
|
@strawberry.field
|
|
async def sample_search(
|
|
self, info, status: str, text: str, client_uid: FelicityID
|
|
) -> List[a_types.SampleType]:
|
|
return await sample_search(status, text, client_uid)
|
|
|
|
# awaiting deprecation since sample_all can now achieve this
|
|
@strawberry.field
|
|
async def sample_count(
|
|
self, info, status: str, text: str, client_uid: FelicityID
|
|
) -> int:
|
|
combined = await sample_search(status, text, client_uid)
|
|
return len(combined)
|
|
|
|
@strawberry.field
|
|
async def sample_by_uid(self, info, uid: FelicityID) -> a_types.SampleType:
|
|
return await a_models.Sample.get(uid=uid)
|
|
|
|
@strawberry.field
|
|
async def sample_by_parent_id(
|
|
self, info, parent_id: FelicityID, text: Optional[str] = None
|
|
) -> List[a_types.SampleType]:
|
|
"""Retrieve associated invalidated parent - children relationship by mptt parent_id"""
|
|
samples: list[a_models.Sample] = await a_models.Sample.get_all(
|
|
parent_id=parent_id
|
|
)
|
|
|
|
if text == "repeat": # created by invalidation hence they contain "_R0"
|
|
return list(filter(lambda x: "_R0" in x.sample_id, samples))
|
|
|
|
return samples
|
|
|
|
@strawberry.field
|
|
async def samples_by_uids(
|
|
self, info, sample_uids: List[str] = []
|
|
) -> List[r_types.SamplesWithResults]:
|
|
"""Samples for publishing/ report printing"""
|
|
return await a_models.Sample.get_all(uid__in=sample_uids)
|
|
|
|
@strawberry.field
|
|
async def samples_by_storage_container_uid(
|
|
self, info, uid: FelicityID
|
|
) -> List[a_types.SampleType]:
|
|
"""Retrieve stored samples for a given container uid"""
|
|
return await a_models.Sample.get_all(storage_container_uid=uid)
|
|
|
|
@strawberry.field
|
|
async def profile_all(self, info) -> List[a_types.ProfileType]:
|
|
return await a_models.Profile.all()
|
|
|
|
@strawberry.field
|
|
async def profile_by_uid(self, info, uid: FelicityID) -> a_types.ProfileType:
|
|
return await a_models.Profile.get(uid=uid)
|
|
|
|
@strawberry.field
|
|
async def analysis_category_all(self, info) -> List[a_types.AnalysisCategoryType]:
|
|
return await a_models.AnalysisCategory.all()
|
|
|
|
@strawberry.field
|
|
async def analysis_category_by_uid(
|
|
self, info, uid: FelicityID
|
|
) -> a_types.AnalysisCategoryType:
|
|
return await a_models.AnalysisCategory.get(uid=uid)
|
|
|
|
@strawberry.field
|
|
async def analysis_all(
|
|
self,
|
|
info,
|
|
page_size: Optional[int] = None,
|
|
after_cursor: Optional[str] = None,
|
|
before_cursor: Optional[str] = None,
|
|
text: Optional[str] = None,
|
|
sort_by: Optional[List[str]] = None,
|
|
qc_only: Optional[bool] = False,
|
|
) -> a_types.AnalysisCursorPage:
|
|
|
|
filters = []
|
|
_or_text_ = {}
|
|
if has_value_or_is_truthy(text):
|
|
arg_list = ["name__ilike", "description__ilike", "keyword__ilike"]
|
|
for _arg in arg_list:
|
|
_or_text_[_arg] = f"%{text}%"
|
|
|
|
text_filters = {sa.or_: _or_text_}
|
|
filters.append(text_filters)
|
|
|
|
if qc_only:
|
|
filters.append({"category___name__exact": "Quality Control"})
|
|
|
|
page = await a_models.Analysis.paginate_with_cursors(
|
|
page_size=page_size,
|
|
after_cursor=after_cursor,
|
|
before_cursor=before_cursor,
|
|
filters=filters,
|
|
sort_by=sort_by,
|
|
get_related="profiles",
|
|
)
|
|
|
|
total_count: int = page.total_count
|
|
edges: List[a_types.AnalysisEdge[a_types.AnalysisType]] = page.edges
|
|
items: List[a_types.AnalysisType] = page.items
|
|
page_info: PageInfo = page.page_info
|
|
|
|
return a_types.AnalysisCursorPage(
|
|
total_count=total_count, edges=edges, items=items, page_info=page_info
|
|
)
|
|
|
|
@strawberry.field
|
|
async def analysis_by_uid(self, info, uid: FelicityID) -> a_types.AnalysisType:
|
|
return await a_models.Analysis.get(uid=uid)
|
|
|
|
@strawberry.field
|
|
async def analysis_request_all(
|
|
self,
|
|
info,
|
|
page_size: Optional[int] = None,
|
|
after_cursor: Optional[str] = None,
|
|
before_cursor: Optional[str] = None,
|
|
text: Optional[str] = None,
|
|
sort_by: Optional[List[str]] = None,
|
|
) -> a_types.AnalysisRequestCursorPage:
|
|
filters = []
|
|
|
|
_or_text_ = {}
|
|
if has_value_or_is_truthy(text):
|
|
arg_list = [
|
|
"client_request_id__ilike",
|
|
"patient___first_name__ilike",
|
|
"patient___last_name__ilike",
|
|
"patient___client_patient_id__ilike",
|
|
]
|
|
for _arg in arg_list:
|
|
_or_text_[_arg] = f"%{text}%"
|
|
|
|
text_filters = {sa.or_: _or_text_}
|
|
filters.append(text_filters)
|
|
|
|
filters.append({"internal_use__ne": True})
|
|
|
|
page = await a_models.AnalysisRequest.paginate_with_cursors(
|
|
page_size=page_size,
|
|
after_cursor=after_cursor,
|
|
before_cursor=before_cursor,
|
|
filters=filters,
|
|
sort_by=sort_by,
|
|
)
|
|
|
|
total_count: int = page.total_count
|
|
edges: List[
|
|
a_types.AnalysisRequestEdge[a_types.AnalysisRequestWithSamples]
|
|
] = page.edges
|
|
items: List[a_types.AnalysisRequestType] = page.items
|
|
page_info: PageInfo = page.page_info
|
|
|
|
return a_types.AnalysisRequestCursorPage(
|
|
total_count=total_count, edges=edges, items=items, page_info=page_info
|
|
)
|
|
|
|
@strawberry.field
|
|
async def analysis_request_by_uid(
|
|
self, info, uid: FelicityID
|
|
) -> a_types.AnalysisRequestWithSamples:
|
|
return await a_models.AnalysisRequest.get(uid=uid)
|
|
|
|
@strawberry.field
|
|
async def analysis_requests_by_patient_uid(
|
|
self, info, uid: FelicityID
|
|
) -> List[a_types.AnalysisRequestWithSamples]:
|
|
return await a_models.AnalysisRequest.get_all(patient_uid__exact=uid)
|
|
|
|
@strawberry.field
|
|
async def analysis_requests_by_client_uid(
|
|
self, info, uid: FelicityID
|
|
) -> List[a_types.AnalysisRequestWithSamples]:
|
|
return await a_models.AnalysisRequest.get_all(client_uid__exact=uid)
|
|
|
|
@strawberry.field
|
|
async def analysis_result_by_uid(
|
|
self, info, uid: FelicityID
|
|
) -> r_types.AnalysisResultType:
|
|
return await r_models.AnalysisResult.get(uid=uid)
|
|
|
|
@strawberry.field
|
|
async def analysis_result_by_sample_uid(
|
|
self, info, uid: FelicityID
|
|
) -> List[r_types.AnalysisResultType]:
|
|
return await r_models.AnalysisResult.get_all(sample_uid__exact=uid)
|
|
|
|
@strawberry.field
|
|
async def analysis_results_for_ws_assign(
|
|
self,
|
|
info,
|
|
page_size: Optional[int] = None,
|
|
after_cursor: Optional[str] = None,
|
|
before_cursor: Optional[str] = None,
|
|
text: Optional[str] = None,
|
|
sort_by: Optional[List[str]] = None,
|
|
analysis_uid: Optional[FelicityID] = None,
|
|
sample_type_uid: Optional[FelicityID] = None,
|
|
) -> r_types.AnalysisResultCursorPage:
|
|
|
|
filters = [{"assigned": False}]
|
|
_or_text_ = {}
|
|
if has_value_or_is_truthy(text):
|
|
arg_list = ["sample___sample_id__ilike"]
|
|
for _arg in arg_list:
|
|
_or_text_[_arg] = f"%{text}%"
|
|
|
|
text_filters = {sa.or_: _or_text_}
|
|
filters.append(text_filters)
|
|
|
|
if analysis_uid:
|
|
filters.append({"analysis_uid": analysis_uid})
|
|
|
|
if sample_type_uid:
|
|
filters.append({"sample___sample_type_uid": sample_type_uid})
|
|
|
|
filters.append(
|
|
{"sample___status": analysis_conf.states.sample.RECEIVED})
|
|
filters.append({"status": analysis_conf.states.result.PENDING})
|
|
|
|
if not sort_by:
|
|
sort_by = ["-sample___priority", "sample___uid"]
|
|
|
|
page = await r_models.AnalysisResult.paginate_with_cursors(
|
|
page_size=page_size,
|
|
after_cursor=after_cursor,
|
|
before_cursor=before_cursor,
|
|
filters=filters,
|
|
sort_by=sort_by,
|
|
get_related="sample",
|
|
)
|
|
|
|
total_count: int = page.total_count
|
|
edges: List[r_types.AnalysisResultEdge[r_types.AnalysisType]] = page.edges
|
|
items: List[r_types.AnalysisResultType] = page.items
|
|
page_info: PageInfo = page.page_info
|
|
|
|
return r_types.AnalysisResultCursorPage(
|
|
total_count=total_count, edges=edges, items=items, page_info=page_info
|
|
)
|
|
|
|
@strawberry.field
|
|
async def analysis_interim_all(self, info) -> List[a_types.AnalysisInterimType]:
|
|
return await a_models.AnalysisInterim.all()
|
|
|
|
@strawberry.field
|
|
async def analysis_interim_by_uid(
|
|
self, info, uid: FelicityID
|
|
) -> a_types.AnalysisInterimType:
|
|
return await a_models.AnalysisInterim.get(uid=uid)
|
|
|
|
@strawberry.field
|
|
async def analysis_correction_factor_all(
|
|
self, info
|
|
) -> List[a_types.AnalysisCorrectionFactorType]:
|
|
return await a_models.AnalysisCorrectionFactor.all()
|
|
|
|
@strawberry.field
|
|
async def analysis_correction_factor_by_uid(
|
|
self, info, uid: FelicityID
|
|
) -> a_types.AnalysisCorrectionFactorType:
|
|
return await a_models.AnalysisCorrectionFactor.get(uid=uid)
|
|
|
|
@strawberry.field
|
|
async def analysis_uncertainty_all(
|
|
self, info
|
|
) -> List[a_types.AnalysisUncertaintyType]:
|
|
return await a_models.AnalysisUncertainty.all()
|
|
|
|
@strawberry.field
|
|
async def analysis_uncertainty_by_uid(
|
|
self, info, uid: FelicityID
|
|
) -> a_types.AnalysisUncertaintyType:
|
|
return await a_models.AnalysisUncertainty.get(uid=uid)
|
|
|
|
@strawberry.field
|
|
async def analysis_detection_limit_all(
|
|
self, info
|
|
) -> List[a_types.AnalysisDetectionLimitType]:
|
|
return await a_models.AnalysisDetectionLimit.all()
|
|
|
|
@strawberry.field
|
|
async def analysis_detection_limit_by_uid(
|
|
self, info, uid: FelicityID
|
|
) -> a_types.AnalysisDetectionLimitType:
|
|
return await a_models.AnalysisUncertainty.get(uid=uid)
|
|
|
|
@strawberry.field
|
|
async def analysis_specification_all(
|
|
self, info
|
|
) -> List[a_types.AnalysisSpecificationType]:
|
|
return await a_models.AnalysisSpecification.all()
|
|
|
|
@strawberry.field
|
|
async def analysis_specification_uid(
|
|
self, info, uid: FelicityID
|
|
) -> a_types.AnalysisSpecificationType:
|
|
return await a_models.AnalysisSpecification.get(uid=uid)
|
|
|
|
@strawberry.field
|
|
async def qc_set_all(
|
|
self,
|
|
info,
|
|
page_size: Optional[int] = None,
|
|
after_cursor: Optional[str] = None,
|
|
before_cursor: Optional[str] = None,
|
|
text: Optional[str] = None,
|
|
sort_by: Optional[List[str]] = None,
|
|
) -> r_types.QCSetCursorPage:
|
|
|
|
filters = []
|
|
|
|
_or_text_ = {}
|
|
if has_value_or_is_truthy(text):
|
|
arg_list = ["name__ilike", "description__ilike", "keyword__ilike"]
|
|
for _arg in arg_list:
|
|
_or_text_[_arg] = f"%{text}%"
|
|
|
|
text_filters = {sa.or_: _or_text_}
|
|
filters.append(text_filters)
|
|
|
|
# filters.append({'internal_use__ne': True})
|
|
|
|
page = await a_models.QCSet.paginate_with_cursors(
|
|
page_size=page_size,
|
|
after_cursor=after_cursor,
|
|
before_cursor=before_cursor,
|
|
filters=filters,
|
|
sort_by=sort_by,
|
|
)
|
|
|
|
total_count: int = page.total_count
|
|
edges: List[r_types.QCSetEdge[r_types.QCSetWithSamples]] = page.edges
|
|
items: List[r_types.QCSetWithSamples] = page.items
|
|
page_info: PageInfo = page.page_info
|
|
|
|
return r_types.QCSetCursorPage(
|
|
total_count=total_count, edges=edges, items=items, page_info=page_info
|
|
)
|
|
|
|
@strawberry.field
|
|
async def qc_set_by_uid(self, info, uid: FelicityID) -> r_types.QCSetWithSamples:
|
|
return await qc_models.QCSet.get(uid=uid)
|
|
|
|
@strawberry.field
|
|
async def qc_level_all(self, info) -> List[a_types.QCLevelType]:
|
|
return await qc_models.QCLevel.all()
|
|
|
|
@strawberry.field
|
|
async def qc_level_by_uid(self, info, uid: FelicityID) -> a_types.QCLevelType:
|
|
return await qc_models.QCLevel.get(uid=uid)
|
|
|
|
@strawberry.field
|
|
async def qc_template_all(self, info) -> List[a_types.QCTemplateType]:
|
|
return await qc_models.QCTemplate.all()
|
|
|
|
@strawberry.field
|
|
async def qc_template_by_uid(self, info, uid: FelicityID) -> a_types.QCTemplateType:
|
|
return await qc_models.QCTemplate.get(uid=uid)
|
|
|
|
@strawberry.field
|
|
async def result_options_by_analysis_uid(
|
|
self, info, uid: FelicityID
|
|
) -> a_types.ResultOptionType:
|
|
return await a_models.ResultOption.get_all(analysis_uid__exact=uid)
|
|
|
|
@strawberry.field
|
|
async def rejection_reasons_all(self, info) -> List[a_types.RejectionReasonType]:
|
|
return await a_models.RejectionReason.all()
|
|
|
|
@strawberry.field
|
|
async def rejection_reason_by_uid(
|
|
self, info, uid: FelicityID
|
|
) -> a_types.RejectionReasonType:
|
|
return await a_models.RejectionReason.get(uid=uid)
|