Skip to content

Commit

Permalink
update app api with respect to marketplace capabilities
Browse files Browse the repository at this point in the history
  • Loading branch information
MBueschelberger committed Sep 6, 2023
1 parent 2ca2670 commit 5b0237e
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 140 deletions.
13 changes: 6 additions & 7 deletions osp/app/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,9 @@ def get_models() -> "Dict[str, Callable]":
}


def depends_modellist(
registry: "Dict[str, Callable]" = Depends(get_models),
) -> "List[str]":
def depends_modellist() -> "List[str]":
"""Get list of model names."""
registry = get_models()
return list(registry.keys())


Expand Down Expand Up @@ -121,16 +120,16 @@ def depends_upload(


def depends_download(
uuid: str = Query(..., title="Cache ID received after the upload."),
dataset_name: str = Query(..., title="Cache ID received after the upload."),
minio_client: Minio = Depends(depends_minio),
) -> HTTPResponse:
"""Return `HTTPResponse` from uuid through minio client via `Depends`."""
return _get_download(uuid, minio_client)
return _get_download(dataset_name, minio_client)


def depends_logs(
task_id: str = Query(..., title="task id of the submitted job"),
transformation_id: str = Query(..., title="task id of the submitted job"),
minio_client: Minio = Depends(depends_minio),
) -> HTTPResponse:
"""Return `HTTPResponse` from uuid through minio client via `Depends`."""
return _get_download(task_id, minio_client)
return _get_download(transformation_id, minio_client)
251 changes: 131 additions & 120 deletions osp/app/main.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
"""Main FastAPI-middleware for running remote celery tasks"""
import logging
import os
from typing import Any, Dict, List, Union
from datetime import datetime
from typing import Annotated, Any, Dict, List, Optional, Union
from uuid import UUID

import pkg_resources
import uvicorn
from celery import Celery
from fastapi import Body, Depends, FastAPI, HTTPException, Query, Response
from fastapi.openapi.utils import get_openapi
from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse
from fastapi.responses import JSONResponse, RedirectResponse

# from fastapi.responses import StreamingResponse
from fastapi_plugins import (
config_plugin,
get_config,
Expand All @@ -36,14 +37,17 @@
get_models,
)
from .models import (
InfoType,
RegisteredModels,
RegisteredTaskModel,
SubmissionBody,
TaskCreateModel,
TaskKillModel,
TaskResultModel,
TaskStatusModel,
TransformationStatus,
UploadDataResponse,
task_to_transformation_map,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -93,26 +97,31 @@ async def root():
return RedirectResponse(url="/docs")


@app.post("/cache/upload", operation_id="uploadDataCache")
@app.put("/data/cache", operation_id="createDataset")
async def upload_data(
object_key: UUID = Depends(depends_upload),
) -> UploadDataResponse:
"""Upload data from internal cache"""
return UploadDataResponse(cache_id=object_key)
return UploadDataResponse(id=object_key, last_modified=str(datetime.now()))


@app.get("/cache/download/{uuid}", operation_id="downloadDataCache")
@app.get("/data/cache/{dataset_name}", operation_id="getDataset")
async def download_data(
uuid: str = Query(..., title="Cache ID received after the upload."),
dataset_name: str,
response=Depends(depends_download),
) -> StreamingResponse:
) -> Response:
"""Download file via StreamingResponse"""
filename = uuid + response.headers.get("x-amz-meta-suffix")
return StreamingResponse(
iter(response.readlines()),
media_type="multipart/form-data",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
filename = str(dataset_name) + response.headers.get("x-amz-meta-suffix")
# return StreamingResponse(
# iter(response.readlines()),
# media_type="multipart/form-data",
# headers={"Content-Disposition": f"attachment; filename={filename}"},
# )
response = Response(response.data)
# Set the appropriate headers
response.headers["Content-Disposition"] = f'attachment; filename="{filename}"'
response.headers["Content-Type"] = "application/octet-stream"
return response


@app.get("/workers/registered")
Expand All @@ -128,7 +137,7 @@ async def get_workers_available(
)


@app.get("/models/registered")
@app.get("/models", operation_id="getModels")
async def get_model_available(
registry: List[str] = Depends(depends_modellist),
) -> RegisteredModels:
Expand All @@ -141,69 +150,106 @@ async def get_model_available(
)


@app.get("/models/get_schema/{modelname}")
async def get_schema(
modelname: str = Query(..., title="Name of the data model to be instanciated."),
models=Depends(get_models),
) -> Dict[Any, Any]:
"""Create a semantic model registered in the app."""
model = models.get(modelname)
return schema([model])


@app.get("/models/get_example/{modelname}")
async def get_example(
modelname: str = Query(
..., title="Name of the data model for which an example should be retrieved."
@app.get("/info", operation_id="getInfo")
async def get_info(
info_type: InfoType = Query(
..., title="Type of info to be retrieved. schema or example"
),
models=Depends(get_models),
) -> JSONResponse:
"""Retrieve an example for a model registered in the app."""
model = models.get(modelname)
return JSONResponse(model.Config.schema_extra["example"])


@app.post("/models/create/{modelname}")
model_name: Optional[str] = Query(
...,
enum=depends_modellist(),
title="Name of the data model for which info should be retrieved.",
),
) -> Union[Dict[Any, Any], Any]:
"""Get specific app info"""
if info_type == InfoType.SCHEMA:
# Retreive schema of a model registered in the app
models = get_models()
model = models.get(model_name)
response = schema([model])
elif info_type == InfoType.EXAMPLE:
# Retrieve an example for a model registered in the app
models = get_models()
model = models.get(model_name)
response = JSONResponse(model.Config.schema_extra["example"])
return response


@app.post("/transformations", operation_id="newTransformation")
async def create_model(
modelname: str = Query(..., title="Name of the data model to be instanciated."),
model_name: str = Query(
...,
enum=depends_modellist(),
title="Name of the data model to be instanciated.",
),
body=Body(..., media_type="application/json"),
models=Depends(get_models),
) -> TaskCreateModel:
"""Create a semantic model regisered in the app."""
model = models.get(modelname)
"""Initialize transformation with respect to a certain model."""
if "parameters" in body:
body = body["parameters"]
# Create a semantic model regisered in the app
model = models.get(model_name)
instance = model(**body)
return TaskCreateModel(cache_id=instance.uuid)
return TaskCreateModel(id=instance.uuid)


@app.post("/task/send")
async def send_task(
request: SubmissionBody,
@app.patch("/transformations/{transformation_id}", operation_id="updateTransformation")
async def update_task(
transformation_id: str,
body: Annotated[
SubmissionBody,
Body(),
],
celery_app: "Celery" = Depends(get_app),
settings: AppConfig = Depends(get_appconfig),
) -> TaskStatusModel:
"""Send a task to a remote celery worker"""
task = celery_app.send_task(
settings.worker_name,
kwargs={"cache_key": request.cache_id, "store_tarball": False},
queue=settings.worker_name,
)
return TaskStatusModel(
**{
"status": task.status,
"state": task.state,
"result": task.result,
"traceback": task.traceback,
"task_id": task.id,
"args": task.args,
"kwargs": task.kwargs,
"date_done": task.date_done,
}
)
"""Get status of the transformation."""
state = body.state
if state == TransformationStatus.RUNNING:
# Send a task to a remote celery worker
task = celery_app.send_task(
settings.worker_name,
kwargs={"cache_key": transformation_id, "store_tarball": False},
queue=settings.worker_name,
)
response = TaskStatusModel(
status=task.status,
state=task_to_transformation_map[task.state],
result=task.result,
traceback=task.traceback,
id=task.id,
args=task.args,
kwargs=task.kwargs,
date_done=task.date_done,
)
elif state == TransformationStatus.STOPPED:
# Kill a submitted task with certain id
task = celery_app.AsyncResult(transformation_id)
if not task.date_done:
task.revoke(terminate=True)
message = "Killing scheduled."
else:
message = "Task already terminated."
response = TaskKillModel(
message=message,
status=task.status,
state=task_to_transformation_map[task.state],
result=task.result,
traceback=task.traceback,
id=task.id,
args=task.args,
kwargs=task.kwargs,
date_done=task.date_done,
)
else:
raise HTTPException(status_code=400, detail=f"Unknown task status: {state}")
return response


@app.get("/task/log/{task_id}")
def get_task_logs(
task_id: str = Query( # pylint: disable=unused-argument
@app.get("/logs", operation_id="getLogs")
def get_logs(
transformation_id: str = Query( # pylint: disable=unused-argument
..., title="task id of the submitted job"
),
response: HTTPResponse = Depends(depends_logs),
Expand All @@ -212,69 +258,40 @@ def get_task_logs(
return Response(content=response.data, media_type="text/plain")


@app.get("/task/kill/{task_id}")
async def kill_task(
task_id: str = Query(..., title="task id of the submitted job"),
celery_app: "Celery" = Depends(get_app),
) -> TaskKillModel:
"""Kill a submitted task with certain id"""
task = celery_app.AsyncResult(task_id)
if not task.date_done:
task.revoke(terminate=True)
message = "Killing scheduled."
else:
message = "Task already terminated."
return TaskKillModel(
**{
"message": message,
"status": task.status,
"state": task.state,
"result": task.result,
"traceback": task.traceback,
"task_id": task.id,
"args": task.args,
"kwargs": task.kwargs,
"date_done": task.date_done,
}
)


@app.get("/task/status/{task_id}")
@app.get(
"/transformations/{transformation_id}/state", operation_id="getTransformationState"
)
async def get_status(
task_id: str = Query(..., title="task id of the job to be killed"),
transformation_id: str = Query(..., title="task id of the job to be killed"),
celery_app: "Celery" = Depends(get_app),
) -> TaskStatusModel:
"""Fetch the status of a submitted task with certain id"""
task = celery_app.AsyncResult(task_id)
task = celery_app.AsyncResult(transformation_id)
return TaskStatusModel(
**{
"status": task.status,
"state": task.state,
"traceback": task.traceback,
"task_id": task.id,
"args": task.args,
"kwargs": task.kwargs,
"date_done": task.date_done,
}
status=task.status,
state=task_to_transformation_map[task.state],
traceback=task.traceback,
id=task.id,
args=task.args,
kwargs=task.kwargs,
date_done=task.date_done,
)


@app.get("/task/result/{task_id}")
@app.get("/transformations/{transformation_id}", operation_id="getTransformation")
async def get_result(
task_id: str = Query(..., title="task id of the submitted job"),
transformation_id: str = Query(..., title="task id of the submitted job"),
celery_app: "Celery" = Depends(get_app),
) -> TaskResultModel:
"""Return the results for submitted task with a certain id"""
result = celery_app.AsyncResult(task_id)
result = celery_app.AsyncResult(transformation_id)
if not result.ready():
raise HTTPException(status_code=400, detail="Task is not ready yet.")
return TaskResultModel(
**{
"result": result.result,
"task_id": task_id,
"traceback": result.traceback,
"date_done": result.date_done,
}
parameters=result.result,
id=transformation_id,
traceback=result.traceback,
date_done=result.date_done,
)


Expand Down Expand Up @@ -327,9 +344,3 @@ async def on_shutdown() -> None:
"""Define functions for app during shutdown"""
await redis_plugin.terminate()
await config_plugin.terminate()


if __name__ == "__main__":
host = os.environ["REAXPRO_FASTAPI_HOST"]
port = int(os.environ["REAXPRO_FASTAPI_PORT"])
uvicorn.run(app, host=host, port=port, log_level="debug")

0 comments on commit 5b0237e

Please sign in to comment.