Updated: 2022-11-27
Description
I have come across many issues where I need async API calls, but not just to run multiple calls, but to also run long running tasks that I just need to check the status of. I have been leveraging NATS to handle message queuing between systems see: Python NATS Github Repo
and information on NATS Project
. Unfortunately, I’ve run into some issues with how to handle messaging through API calls. As this really helps sending messages from servers to servers or between microservices, but recently I’ve run into a problem where I need to just check on a job that I do not want my microservice holding onto. As the jobs can take anywhere from 5 minutes to 45 minutes depending on the amount of work being done in the backend. And sending this through a NATS queue has proven challenging. So, I sought out a way of handling this through an async job and came across Celery and monitoring it via Flower.
I eventually came across this project here hosted by TestDriven.io
The project includes a lot of excess that was not required for my perdicment, but I found this as a really amazing tool that could really be leveraged. Hosting a seperate microservice with a Redis container and then sending those tasks to that server monitoring them and being able to offload long tasks.
Project Documents:
The Code
My version of the code leverags a long API call that I’ve been having trouble getting to work leveraging a NATS queuing system that sends messags between my microservice and our orchestration system (long story there, but wish we were leveraging more modern tools). So, with some modifications of the code (honestly I could have taken out a lot more since I’m not leveraging the DB, but I ran it quickly in under an hour to get my point across)
Allt he code can be found here: flask-celery-project-prisma-access
Running the Code
You’ll need Docker running and within your environment run the following (again the DB isn’t necessary and can be removed was just apart of the main project):
- FLASK_APP=app.py flask db init
- FLASK_APP=app.py flask db migrate -m “Initial migration.”
- FLASK_APP=app.py flask db upgrade
- FLASK_APP=app.py flask shell (if you want to interact directly for shell testing)
Once the App is set up you can run:
- docker-compose build
- docker-compose up -d
Example:
(.env) ➜ flask-celery-project git:(main) docker-compose build
db uses an image, skipping
redis uses an image, skipping
Building web
[+] Building 1.1s (26/26) FINISHED
=> [internal] load build definition from Dockerfile 0.0s
=> => transferring dockerfile: 37B 0.0s
=> [internal] load .dockerignore 0.0s
=> => transferring context: 2B 0.0s
=> [internal] load metadata for docker.io/library/python:3.10-slim-buster 0.9s
=> [auth] library/python:pull token for registry-1.docker.io 0.0s
=> [internal] load build context 0.0s
=> => transferring context: 1.19kB 0.0s
=> [ 1/20] FROM docker.io/library/python:3.10-slim-buster@sha256:95f2dd6b380d0762cba090f5167259543dc156f08b8a85262926a2919604e2b5 0.0s
=> CACHED [ 2/20] RUN apt-get update && apt-get install -y build-essential && apt-get install -y libpq-dev && apt-get install -y telnet netcat && apt-get purge -y --auto-rem 0.0s
=> CACHED [ 3/20] COPY ./requirements.txt /requirements.txt 0.0s
=> CACHED [ 4/20] RUN pip install -r /requirements.txt 0.0s
=> CACHED [ 5/20] COPY ./compose/local/flask/entrypoint /entrypoint 0.0s
=> CACHED [ 6/20] RUN sed -i 's/\r$//g' /entrypoint 0.0s
=> CACHED [ 7/20] RUN chmod +x /entrypoint 0.0s
=> CACHED [ 8/20] COPY ./compose/local/flask/start /start 0.0s
=> CACHED [ 9/20] RUN sed -i 's/\r$//g' /start 0.0s
=> CACHED [10/20] RUN chmod +x /start 0.0s
=> CACHED [11/20] COPY ./compose/local/flask/celery/worker/start /start-celeryworker 0.0s
=> CACHED [12/20] RUN sed -i 's/\r$//g' /start-celeryworker 0.0s
=> CACHED [13/20] RUN chmod +x /start-celeryworker 0.0s
=> CACHED [14/20] COPY ./compose/local/flask/celery/beat/start /start-celerybeat 0.0s
=> CACHED [15/20] RUN sed -i 's/\r$//g' /start-celerybeat 0.0s
=> CACHED [16/20] RUN chmod +x /start-celerybeat 0.0s
=> CACHED [17/20] COPY ./compose/local/flask/celery/flower/start /start-flower 0.0s
=> CACHED [18/20] RUN sed -i 's/\r$//g' /start-flower 0.0s
=> CACHED [19/20] RUN chmod +x /start-flower 0.0s
=> CACHED [20/20] WORKDIR /app 0.0s
=> exporting to image 0.0s
=> => exporting layers 0.0s
=> => writing image sha256:83d96bf5c8e4231a657c0610e9a137a1b2a2621cbd7b59fdb1809441417350b6 0.0s
=> => naming to docker.io/library/flask_celery_example_web 0.0s
Use 'docker scan' to run Snyk tests against images to find vulnerabilities and learn how to fix them
Building celery_worker
[+] Building 0.3s (25/25) FINISHED
=> [internal] load build definition from Dockerfile 0.0s
=> => transferring dockerfile: 37B 0.0s
=> [internal] load .dockerignore 0.0s
=> => transferring context: 2B 0.0s
=> [internal] load metadata for docker.io/library/python:3.10-slim-buster 0.2s
=> [ 1/20] FROM docker.io/library/python:3.10-slim-buster@sha256:95f2dd6b380d0762cba090f5167259543dc156f08b8a85262926a2919604e2b5 0.0s
=> [internal] load build context 0.0s
=> => transferring context: 1.19kB 0.0s
=> CACHED [ 2/20] RUN apt-get update && apt-get install -y build-essential && apt-get install -y libpq-dev && apt-get install -y telnet netcat && apt-get purge -y --auto-rem 0.0s
=> CACHED [ 3/20] COPY ./requirements.txt /requirements.txt 0.0s
=> CACHED [ 4/20] RUN pip install -r /requirements.txt 0.0s
=> CACHED [ 5/20] COPY ./compose/local/flask/entrypoint /entrypoint 0.0s
=> CACHED [ 6/20] RUN sed -i 's/\r$//g' /entrypoint 0.0s
=> CACHED [ 7/20] RUN chmod +x /entrypoint 0.0s
=> CACHED [ 8/20] COPY ./compose/local/flask/start /start 0.0s
=> CACHED [ 9/20] RUN sed -i 's/\r$//g' /start 0.0s
=> CACHED [10/20] RUN chmod +x /start 0.0s
=> CACHED [11/20] COPY ./compose/local/flask/celery/worker/start /start-celeryworker 0.0s
=> CACHED [12/20] RUN sed -i 's/\r$//g' /start-celeryworker 0.0s
=> CACHED [13/20] RUN chmod +x /start-celeryworker 0.0s
=> CACHED [14/20] COPY ./compose/local/flask/celery/beat/start /start-celerybeat 0.0s
=> CACHED [15/20] RUN sed -i 's/\r$//g' /start-celerybeat 0.0s
=> CACHED [16/20] RUN chmod +x /start-celerybeat 0.0s
=> CACHED [17/20] COPY ./compose/local/flask/celery/flower/start /start-flower 0.0s
=> CACHED [18/20] RUN sed -i 's/\r$//g' /start-flower 0.0s
=> CACHED [19/20] RUN chmod +x /start-flower 0.0s
=> CACHED [20/20] WORKDIR /app 0.0s
=> exporting to image 0.0s
=> => exporting layers 0.0s
=> => writing image sha256:83d96bf5c8e4231a657c0610e9a137a1b2a2621cbd7b59fdb1809441417350b6 0.0s
=> => naming to docker.io/library/flask_celery_example_celery_worker 0.0s
Use 'docker scan' to run Snyk tests against images to find vulnerabilities and learn how to fix them
Building celery_beat
[+] Building 0.5s (25/25) FINISHED
=> [internal] load build definition from Dockerfile 0.0s
=> => transferring dockerfile: 37B 0.0s
=> [internal] load .dockerignore 0.0s
=> => transferring context: 2B 0.0s
=> [internal] load metadata for docker.io/library/python:3.10-slim-buster 0.2s
=> [ 1/20] FROM docker.io/library/python:3.10-slim-buster@sha256:95f2dd6b380d0762cba090f5167259543dc156f08b8a85262926a2919604e2b5 0.0s
=> [internal] load build context 0.0s
=> => transferring context: 1.19kB 0.0s
=> CACHED [ 2/20] RUN apt-get update && apt-get install -y build-essential && apt-get install -y libpq-dev && apt-get install -y telnet netcat && apt-get purge -y --auto-rem 0.0s
=> CACHED [ 3/20] COPY ./requirements.txt /requirements.txt 0.0s
=> CACHED [ 4/20] RUN pip install -r /requirements.txt 0.0s
=> CACHED [ 5/20] COPY ./compose/local/flask/entrypoint /entrypoint 0.0s
=> CACHED [ 6/20] RUN sed -i 's/\r$//g' /entrypoint 0.0s
=> CACHED [ 7/20] RUN chmod +x /entrypoint 0.0s
=> CACHED [ 8/20] COPY ./compose/local/flask/start /start 0.0s
=> CACHED [ 9/20] RUN sed -i 's/\r$//g' /start 0.0s
=> CACHED [10/20] RUN chmod +x /start 0.0s
=> CACHED [11/20] COPY ./compose/local/flask/celery/worker/start /start-celeryworker 0.0s
=> CACHED [12/20] RUN sed -i 's/\r$//g' /start-celeryworker 0.0s
=> CACHED [13/20] RUN chmod +x /start-celeryworker 0.0s
=> CACHED [14/20] COPY ./compose/local/flask/celery/beat/start /start-celerybeat 0.0s
=> CACHED [15/20] RUN sed -i 's/\r$//g' /start-celerybeat 0.0s
=> CACHED [16/20] RUN chmod +x /start-celerybeat 0.0s
=> CACHED [17/20] COPY ./compose/local/flask/celery/flower/start /start-flower 0.0s
=> CACHED [18/20] RUN sed -i 's/\r$//g' /start-flower 0.0s
=> CACHED [19/20] RUN chmod +x /start-flower 0.0s
=> CACHED [20/20] WORKDIR /app 0.0s
=> exporting to image 0.1s
=> => exporting layers 0.0s
=> => writing image sha256:83d96bf5c8e4231a657c0610e9a137a1b2a2621cbd7b59fdb1809441417350b6 0.0s
=> => naming to docker.io/library/flask_celery_example_celery_beat 0.0s
Use 'docker scan' to run Snyk tests against images to find vulnerabilities and learn how to fix them
Building flower
[+] Building 0.3s (25/25) FINISHED
=> [internal] load build definition from Dockerfile 0.0s
=> => transferring dockerfile: 37B 0.0s
=> [internal] load .dockerignore 0.0s
=> => transferring context: 2B 0.0s
=> [internal] load metadata for docker.io/library/python:3.10-slim-buster 0.2s
=> [ 1/20] FROM docker.io/library/python:3.10-slim-buster@sha256:95f2dd6b380d0762cba090f5167259543dc156f08b8a85262926a2919604e2b5 0.0s
=> [internal] load build context 0.0s
=> => transferring context: 1.19kB 0.0s
=> CACHED [ 2/20] RUN apt-get update && apt-get install -y build-essential && apt-get install -y libpq-dev && apt-get install -y telnet netcat && apt-get purge -y --auto-rem 0.0s
=> CACHED [ 3/20] COPY ./requirements.txt /requirements.txt 0.0s
=> CACHED [ 4/20] RUN pip install -r /requirements.txt 0.0s
=> CACHED [ 5/20] COPY ./compose/local/flask/entrypoint /entrypoint 0.0s
=> CACHED [ 6/20] RUN sed -i 's/\r$//g' /entrypoint 0.0s
=> CACHED [ 7/20] RUN chmod +x /entrypoint 0.0s
=> CACHED [ 8/20] COPY ./compose/local/flask/start /start 0.0s
=> CACHED [ 9/20] RUN sed -i 's/\r$//g' /start 0.0s
=> CACHED [10/20] RUN chmod +x /start 0.0s
=> CACHED [11/20] COPY ./compose/local/flask/celery/worker/start /start-celeryworker 0.0s
=> CACHED [12/20] RUN sed -i 's/\r$//g' /start-celeryworker 0.0s
=> CACHED [13/20] RUN chmod +x /start-celeryworker 0.0s
=> CACHED [14/20] COPY ./compose/local/flask/celery/beat/start /start-celerybeat 0.0s
=> CACHED [15/20] RUN sed -i 's/\r$//g' /start-celerybeat 0.0s
=> CACHED [16/20] RUN chmod +x /start-celerybeat 0.0s
=> CACHED [17/20] COPY ./compose/local/flask/celery/flower/start /start-flower 0.0s
=> CACHED [18/20] RUN sed -i 's/\r$//g' /start-flower 0.0s
=> CACHED [19/20] RUN chmod +x /start-flower 0.0s
=> CACHED [20/20] WORKDIR /app 0.0s
=> exporting to image 0.0s
=> => exporting layers 0.0s
=> => writing image sha256:83d96bf5c8e4231a657c0610e9a137a1b2a2621cbd7b59fdb1809441417350b6 0.0s
=> => naming to docker.io/library/flask_celery_example_celery_flower 0.0s
Use 'docker scan' to run Snyk tests against images to find vulnerabilities and learn how to fix them
(.env) ➜ flask-celery-project git:(main) docker-compose up -d
Creating network "flask-celery-project_default" with the default driver
Creating flask-celery-project_db_1 ... done
Creating flask-celery-project_redis_1 ... done
Creating flask-celery-project_flower_1 ... done
Creating flask-celery-project_celery_worker_1 ... done
Creating flask-celery-project_celery_beat_1 ... done
Creating flask-celery-project_web_1 ... done
Sample Requests
With this running locally you can connect to Flower to monitor tasks using:
The Flask App is running in Debug under:
Which will provide you with a standard “Hello World!” output. The key would be to interact with the tool. If you read about my Prisma Access SASE project
, which should go over the point of the project and how long the commits occur which my code is handling due to the parent and children created without the ability to monitor it directly via Palo’s API
Here is a sample Python Code Snippet (please use your own passwords see docs on prismasase):
import requests
url = "http://localhost:5010/commit"
querystring = {"tsg_id":"123456789"}
payload = ""
headers = {"Authorization": "Basic cG9zdG1hbjpwYXNzd29yZA=="}
response = requests.request("POST", url, data=payload, headers=headers, params=querystring)
print(response.json())
Response:
{
"state": "PENDING",
"task_id": "ed6b17ea-80da-4b52-8c6f-20aa20e9c104"
}
Note the state is sent back as “PENDING” or “FAILURE”. You can also build out a termination or someother advanced ways of handling it. You can monitor the task through the api by calling the task_id via the JobID API endpoint created in the project.
The python code would look like:
import requests
url = "http://localhost:5010/task/ed6b17ea-80da-4b52-8c6f-20aa20e9c104"
payload = ""
response = requests.request("GET", url, data=payload)
print(response.json())
Response:
{
"results": null,
"state": "PENDING",
"task_id": "ed6b17ea-80da-4b52-8c6f-20aa20e9c104"
}
You can now continue to query the job as it is running in the background sent to the Redis server to handle the task until it completes. You can leverage Flower frontend to visually monitor the tasks like below:
Eventually the response would come back. Since the response is wrapped around the Task response it shows as success, but looking at the results within the task response you can see it failed. This is an issue with Prisma API and a bug with the way the Remote Network works, but is being worked on. Either way this proves out the process of being able to send your API tasks to a backend to not tie up your services and be able to handle mulple jobs and control them in an easy to deploy fashion.
Final Response:
{
"results": {
"job_id": {
"314": {
"details": "{\"info\":[\"Configuration committed successfully\"],\"errors\":[],\"warnings\":[],\"description\":\"automation test with celery\"}",
"end_ts": "2022-11-27 16:47:27",
"id": "314",
"insert_ts": "2022-11-27 16:46:19",
"job_result": "2",
"job_status": "2",
"job_type": "53",
"last_update": "2022-11-27 16:47:28",
"opaque_int": "0",
"opaque_str": "",
"owner": "cfgserv",
"parent_id": "0",
"percent": "100",
"result_i": "2",
"result_str": "OK",
"session_id": "",
"start_ts": "2022-11-27 16:46:19",
"status_i": "2",
"status_str": "FIN",
"summary": "",
"total_time": "91",
"type_i": "53",
"type_str": "CommitAndPush",
"uname": "APIGateway@ProdInternal.com"
},
"315": {
"details": "{\"status\": \"FAILED\", \"info\": [], \"errors\": [\"Config Validation Failed, Errors : RN regions {'us-southwest'} are not valid values\"], \"warnings\": [], \"description\": \"Remote Networks configuration pushed to cloud\"}",
"end_ts": "2022-11-27 16:47:40",
"id": "315",
"insert_ts": "2022-11-27 16:47:27",
"job_result": "1",
"job_status": "2",
"job_type": "22",
"last_update": "2022-11-27 16:47:40",
"opaque_int": "",
"opaque_str": "",
"owner": "gpcs-ext",
"parent_id": "314",
"percent": "0",
"result_i": "1",
"result_str": "FAIL",
"session_id": "",
"start_ts": "2022-11-27 16:47:27",
"status_i": "2",
"status_str": "FIN",
"summary": "Config Validation Failed, Errors : RN regions {'us-southwest'} are not valid values",
"total_time": "0",
"type_i": "22",
"type_str": "CommitAll",
"uname": "APIGateway@ProdInternal.com"
},
"316": {
"details": "{\"status\": \"FAILED\", \"info\": [], \"errors\": [\"Config Validation Failed, Errors : RN regions {'us-southwest'} are not valid values\"], \"warnings\": [], \"description\": \"Service Connections configuration pushed to cloud\"}",
"end_ts": "2022-11-27 16:47:54",
"id": "316",
"insert_ts": "2022-11-27 16:47:40",
"job_result": "1",
"job_status": "2",
"job_type": "22",
"last_update": "2022-11-27 16:47:54",
"opaque_int": "",
"opaque_str": "",
"owner": "gpcs-ext",
"parent_id": "314",
"percent": "0",
"result_i": "1",
"result_str": "FAIL",
"session_id": "",
"start_ts": "2022-11-27 16:47:41",
"status_i": "2",
"status_str": "FIN",
"summary": "Config Validation Failed, Errors : RN regions {'us-southwest'} are not valid values",
"total_time": "0",
"type_i": "22",
"type_str": "CommitAll",
"uname": "APIGateway@ProdInternal.com"
}
},
"message": "CommitAndPush job enqueued with jobid 314, jobid 316: {\"status\": \"FAILED\", \"info\": [], \"errors\": [\"Config Validation Failed, Errors : RN regions {'us-southwest'} are not valid values\"], \"warnings\": [], \"description\": \"Service Connections configuration pushed to cloud\"}, jobid 315: {\"status\": \"FAILED\", \"info\": [], \"errors\": [\"Config Validation Failed, Errors : RN regions {'us-southwest'} are not valid values\"], \"warnings\": [], \"description\": \"Remote Networks configuration pushed to cloud\"}",
"parent_job": "314",
"status": "failure",
"version_info": [
{
"date": "2022-11-27T16:47:25.000Z",
"device": "Remote Networks",
"version": 105
},
{
"date": "2022-11-27T16:47:25.000Z",
"device": "Service Connections",
"version": 105
}
]
},
"state": "SUCCESS",
"task_id": "ed6b17ea-80da-4b52-8c6f-20aa20e9c104"
}