felicity-lims/felicity/database/mongo.py
Aurthur Musendame c09321a382 ruff format
2024-09-28 09:31:00 +02:00

108 lines
3.8 KiB
Python

import logging
from enum import StrEnum
from typing import Optional, Any
from bson import ObjectId
from motor.motor_asyncio import AsyncIOMotorClient
from felicity.core.config import settings
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
client: AsyncIOMotorClient = AsyncIOMotorClient(
f"mongodb://{settings.MONGODB_USER}:{settings.MONGODB_PASS}@{settings.MONGODB_SERVER}"
)
class MongoCollection(StrEnum):
DIAGNOSTIC_REPORT = "diagnostic-report"
INVOICE = "invoice"
AUDIT_LOG = "audit-log"
SHIPMENT = "shipment"
class MongoService:
def __init__(self) -> None:
self.db = client.felicity
async def create(
self, collection_name: MongoCollection, data: dict
) -> Optional[dict]:
logger.info(f"mongodb -- create:{collection_name} --")
collection = self.db.get_collection(collection_name)
created = await collection.insert_one(data)
return await collection.find_one({"_id": created.inserted_id})
async def upsert(
self, collection_name: MongoCollection, uid: str, data: dict
) -> Optional[dict]:
logger.info(f"mongodb -- upsert:{collection_name} --")
collection = self.db.get_collection(collection_name)
result = await collection.update_one(
{"_id": self.oid(uid)}, {"$set": data}, upsert=True
)
return await collection.find_one({"_id": result.upserted_id})
async def retrieve(self, collection_name: MongoCollection, uid: str) -> dict | None:
logger.info(f"mongodb -- retrieve:{collection_name} --")
collection = self.db.get_collection(collection_name)
item = await collection.find_one({"_id": self.oid(uid)})
if item:
item["_id"] = self.flake_id_from_hex(str(item["_id"]))
return item
async def search(
self,
collection_name: MongoCollection,
filters: dict[str, Any],
projection: dict[str, int] | None = None,
limit: int = 100,
) -> list[dict[str, Any]]:
"""
Search documents in MongoDB based on user-defined filters.
:param filters: A dictionary of filters to apply.
:param projection: A dictionary specifying fields to include or exclude (1 to include, 0 to exclude).
:param limit: Maximum number of documents to return.
:return: A list of documents matching the filters.
"""
logger.info(f"mongodb -- search:{collection_name} --")
collection = self.db.get_collection(collection_name)
cursor = collection.find(filters, projection).limit(limit)
results = []
async for document in cursor:
results.append(document)
return results
async def update(
self, collection_name: MongoCollection, uid: str, data: dict
) -> Optional[bool]:
logger.info(f"mongodb -- update:{collection_name} --")
collection = self.db.get_collection(collection_name)
if len(data) < 1:
return None
item = await collection.find_one({"_id": self.oid(uid)})
if item:
updated = await collection.update_one(
{"_id": self.oid(uid)}, {"$set": data}
)
return updated.matched_count > 0
return False
async def delete(self, collection_name: MongoCollection, uid: str) -> bool:
logger.info(f"mongodb -- delete:{collection_name} --")
collection = self.db.get_collection(collection_name)
item = await collection.find_one({"_id": self.oid(uid)})
if item:
await collection.delete_one({"_id": self.oid(uid)})
return True
return False
@staticmethod
def oid(flake_id: str) -> ObjectId:
return ObjectId(hex(int(flake_id))[2:].zfill(24))
@staticmethod
def flake_id_from_hex(hex_string: str) -> str:
return str(int(hex_string, 16))