User Owned Resources in FastAPI

undraw svg icon

Welcome to Part 10 of Up and Running with FastAPI. If you missed part 9, you can find it here.

This series is focused on building a full-stack application with the FastAPI framework. The app allows users to post requests to have their residence cleaned, and other users can select a cleaning project for a given hourly rate.

Up And Running With FastAPI

In the previous post we built out user profiles and ownership, and made sure our API returned user profiles when necessary. Now we'll be getting to the meat of this application's functionality. In this post, we'll make users own cleaning resources they create and ensure that only the creators can manage their own cleanings jobs.

User-Owned Cleaning Jobs

Our database setup for the cleanings resource is rather naive. At the moment, we have no way of tracking who created a cleaning job. We're going to fix that. And in doing so, we're going to make it easy to create marketplace-style functionality.

Again, we'll begin with the migrations file.

Migrations

Just like before, let's start by rolling back our migrations.

docker ps
docker exec -it [CONTAINER_ID] bash
alembic downgrade base

Now go ahead and open up the file that looks like: db/migrations/versions/12345678654_create_main_tables.py.

"""create main tables
Revision ID: 12345678654
Revises:
Create Date: 2020-05-05 10:41:35.468471
"""
from typing import Tuple
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic
revision = "895636233437"
down_revision = None
branch_labels = None
depends_on = None
def create_updated_at_trigger() -> None:
op.execute(
"""
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS
$$
BEGIN
NEW.updated_at = now();
RETURN NEW;
END;
$$ language 'plpgsql';
"""
)
def timestamps() -> Tuple[sa.Column, sa.Column]:
return (
sa.Column("created_at", sa.TIMESTAMP(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.TIMESTAMP(timezone=True), server_default=sa.func.now(), nullable=False),
)
def create_users_table() -> None:
op.create_table(
"users",
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("username", sa.Text, unique=True, nullable=False, index=True),
sa.Column("email", sa.Text, unique=True, nullable=False, index=True),
sa.Column("email_verified", sa.Boolean, nullable=False, server_default="False"),
sa.Column("salt", sa.Text, nullable=False),
sa.Column("password", sa.Text, nullable=False),
sa.Column("is_active", sa.Boolean(), nullable=False),
sa.Column("is_superuser", sa.Boolean(), nullable=False),
*timestamps(),
)
op.execute(
"""
CREATE TRIGGER update_user_modtime
BEFORE UPDATE
ON users
FOR EACH ROW
EXECUTE PROCEDURE update_updated_at_column();
"""
)
def create_profiles_table() -> None:
op.create_table(
"profiles",
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("full_name", sa.Text, nullable=True),
sa.Column("phone_number", sa.Text, nullable=True),
sa.Column("bio", sa.Text, nullable=True, server_default=""),
sa.Column("image", sa.Text, nullable=True),
sa.Column("user_id", sa.Integer, sa.ForeignKey("users.id", ondelete="CASCADE")),
*timestamps(),
)
op.execute(
"""
CREATE TRIGGER update_profiles_modtime
BEFORE UPDATE
ON profiles
FOR EACH ROW
EXECUTE PROCEDURE update_updated_at_column();
"""
)
def create_cleanings_table() -> None:
op.create_table(
"cleanings",
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("name", sa.Text, nullable=False, index=True),
sa.Column("description", sa.Text, nullable=True),
sa.Column("cleaning_type", sa.Text, nullable=False, server_default="spot_clean"),
sa.Column("price", sa.Numeric(10, 5), nullable=False),
sa.Column("owner", sa.Integer, sa.ForeignKey("users.id", ondelete="CASCADE")),
*timestamps(),
)
op.execute(
"""
CREATE TRIGGER update_cleanings_modtime
BEFORE UPDATE
ON cleanings
FOR EACH ROW
EXECUTE PROCEDURE update_updated_at_column();
"""
)
def upgrade() -> None:
create_updated_at_trigger()
create_users_table()
create_profiles_table()
create_cleanings_table()
def downgrade() -> None:
op.drop_table("cleanings")
op.drop_table("profiles")
op.drop_table("users")
op.execute("DROP FUNCTION update_updated_at_column")

We've moved the creation of the cleanings table to after we've created the users and profiles tables. The reason being is that we want to reference the users table when we define an owner column on the cleanings table. Here we set the value of that column equal to the id of the user that created it. This has multiple repercussions, but we'll get to those in a minute. If we already had cleanings in the database when we made this update, we'd want to delete them first.

First, enter the postgres container.

docker-compose exec db psql -h localhost -U postgres --dbname=postgres

Then, once we're in psql:

DELETE FROM cleanings;

With that out the way, it's time to migrate the database by entering the container like before and running the upgrade head command.

docker ps
docker exec -it [CONTAINER_ID] bash
alembic upgrade head

Now that we've given users ownership of a cleaning resource in our database, we'll want to refactor our code a bit. Any user should be able to access a cleaning resource, but only the user that created them should be able to update it and delete it. Users that want to list all cleanings should only recieve the ones that they themselves have created.

Usually, we'd start with testing. But since we've modified the database, let's dip into the models/cleaning.py file first.

Modeling User Ownership

models/cleaning.py
from typing import Optional, Union
from enum import Enum
from app.models.core import IDModelMixin, DateTimeModelMixin, CoreModel
from app.models.user import UserPublic
# ...other code
class CleaningInDB(IDModelMixin, DateTimeModelMixin, CleaningBase):
name: str
price: float
cleaning_type: CleaningType
owner: Union[int, UserPublic]
class CleaningPublic(CleaningInDB):
pass

We're adding the owner attribute to our CleaningInDB model that will be either an int id of the user, or the actual UserPublic model itself. On top of that, we're finally taking advantage of timestamps by utilizing DateTimeModelMixin and having the CleaningPublic model inherit everything from CleaningInDB for the time being.

If we were to run our tests at this point, most of them would error out. Feel free to try it out.

Let's go ahead and fix that. We're going to make quite a few updates, so don't be concerned when our tests start breaking.

Starting with our test_cleaning fixture in the conftest.py file, modify it with the following code:

tests/conftest.py
# ...other code
@pytest.fixture
async def test_cleaning(db: Database, test_user: UserInDB) -> CleaningInDB:
cleaning_repo = CleaningRepository(db)
new_cleaning = CleaningCreate(
name="fake cleaning name", description="fake cleaning description", price=9.99, cleaning_type="spot_clean"
)
return await cleaning_repo.create_cleaning(new_cleaning=new_cleaning, requesting_user=test_user)
# ...other code

We're taking in the test_user fixture and sending it to the CleaningsRepository whenever a new cleaning is created. We'll want to take that user and pass their id to the database as the owner attribute.

Let's do that now.

Create Cleanings

Make the following changes to the CleaningsRepository.

db/repositories/cleanings.py
from typing import List
from fastapi import HTTPException, status
from app.db.repositories.base import BaseRepository
from app.models.cleaning import CleaningCreate, CleaningUpdate, CleaningInDB
from app.models.user import UserInDB
CREATE_CLEANING_QUERY = """
INSERT INTO cleanings (name, description, price, cleaning_type, owner)
VALUES (:name, :description, :price, :cleaning_type, :owner)
RETURNING id, name, description, price, cleaning_type, owner, created_at, updated_at;
"""
# ...other code
class CleaningsRepository(BaseRepository):
""""
All database actions associated with the Cleaning resource
"""
async def create_cleaning(self, *, new_cleaning: CleaningCreate, requesting_user: UserInDB) -> CleaningInDB:
cleaning = await self.db.fetch_one(
query=CREATE_CLEANING_QUERY, values={**new_cleaning.dict(), "owner": requesting_user.id}
)
return CleaningInDB(**cleaning)
# ...other code

Though we've only updated the CREATE_CLEANING_QUERY, we're going to need to change most of the SQL. We've added in the owner attribute, and we're returning the timestamps as well. If we were to run our tests now, we'd still error out as we're not passing the currently logged in user to the CleaningRepository. We'll get to that in a moment.

First, let's update our tests/test_cleanings.py file. We're going to basically start from scratch, keeping what we like and discarding what we don't need anymore.

tests/test_cleanings.py
from typing import List, Dict, Union, Optional
import pytest
from httpx import AsyncClient
from fastapi import FastAPI, status
from databases import Database
from app.db.repositories.cleanings import CleaningsRepository
from app.models.cleaning import CleaningCreate, CleaningInDB, CleaningPublic
from app.models.user import UserInDB
pytestmark = pytest.mark.asyncio
@pytest.fixture
def new_cleaning():
return CleaningCreate(
name="test cleaning",
description="test description",
price=10.00,
cleaning_type="spot_clean",
)
@pytest.fixture
async def test_cleanings_list(db: Database, test_user2: UserInDB) -> List[CleaningInDB]:
cleaning_repo = CleaningsRepository(db)
return [
await cleaning_repo.create_cleaning(
new_cleaning=CleaningCreate(
name=f"test cleaning {i}", description="test description", price=20.00, cleaning_type="full_clean"
),
requesting_user=test_user2,
)
for i in range(5)
]
class TestCleaningsRoutes:
"""
Check each cleaning route to ensure none return 404s
"""
async def test_routes_exist(self, app: FastAPI, client: AsyncClient) -> None:
res = await client.post(app.url_path_for("cleanings:create-cleaning"), json={})
assert res.status_code != status.HTTP_404_NOT_FOUND
res = await client.post(app.url_path_for("cleanings:get-cleaning-by-id", cleaning_id=1))
assert res.status_code != status.HTTP_404_NOT_FOUND
res = await client.post(app.url_path_for("cleanings:get-all-cleanings"))
assert res.status_code != status.HTTP_404_NOT_FOUND
res = await client.post(app.url_path_for("cleanings:update-cleaning-by-id", cleaning_id=1))
assert res.status_code != status.HTTP_404_NOT_FOUND
res = await client.post(app.url_path_for("cleanings:delete-cleaning-by-id", cleaning_id=0))
assert res.status_code != status.HTTP_404_NOT_FOUND
class TestCreateCleaning:
async def test_valid_input_creates_cleaning_belonging_to_user(
self, app: FastAPI, authorized_client: AsyncClient, test_user: UserInDB, new_cleaning: CleaningCreate
) -> None:
res = await authorized_client.post(
app.url_path_for("cleanings:create-cleaning"), json={"new_cleaning": new_cleaning.dict()}
)
assert res.status_code == status.HTTP_201_CREATED
created_cleaning = CleaningPublic(**res.json())
assert created_cleaning.name == new_cleaning.name
assert created_cleaning.price == new_cleaning.price
assert created_cleaning.cleaning_type == new_cleaning.cleaning_type
assert created_cleaning.owner == test_user.id
async def test_unauthorized_user_unable_to_create_cleaning(
self, app: FastAPI, client: AsyncClient, new_cleaning: CleaningCreate
) -> None:
res = await client.post(
app.url_path_for("cleanings:create-cleaning"), json={"new_cleaning": new_cleaning.dict()}
)
assert res.status_code == status.HTTP_401_UNAUTHORIZED
@pytest.mark.parametrize(
"invalid_payload, status_code",
(
(None, 422),
({}, 422),
({"name": "test"}, 422),
({"price": 10.00}, 422),
({"name": "test", "description": "test"}, 422),
),
)
async def test_invalid_input_raises_error(
self,
app: FastAPI,
authorized_client: AsyncClient,
invalid_payload: Dict[str, Union[str, float]],
test_cleaning: CleaningCreate,
status_code: int,
) -> None:
res = await authorized_client.post(
app.url_path_for("cleanings:create-cleaning"), json={"new_cleaning": invalid_payload}
)
assert res.status_code == status_code

We've added a new test to ensure that unauthenticated users can't create cleaning opportunities, and we're checking that any newly created cleanings have the currently logged in user as their owner.

Open up the api/routes/cleanings.py file and make that happen. We'll start from scratch here too, going route by route.

api/routes/cleanings.py
from typing import List
from fastapi import APIRouter, Body, Path, Depends, HTTPException, status
from app.models.user import UserCreate, UserUpdate, UserInDB, UserPublic
from app.models.cleaning import CleaningCreate, CleaningUpdate, CleaningPublic
from app.db.repositories.cleanings import CleaningsRepository
from app.api.dependencies.database import get_repository
from app.api.dependencies.auth import get_current_active_user
router = APIRouter()
@router.post("/", response_model=CleaningPublic, name="cleanings:create-cleaning", status_code=status.HTTP_201_CREATED)
async def create_new_cleaning(
new_cleaning: CleaningCreate = Body(..., embed=True),
current_user: UserInDB = Depends(get_current_active_user),
cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),
) -> CleaningPublic:
created_cleaning = await cleanings_repo.create_cleaning(new_cleaning=new_cleaning, requesting_user=current_user)
return created_cleaning

We're now using the same auth dependencies we defined in one of our previous posts, and we're passing the logged in user to the CleaningsRepository. This way any new cleaning will have the currently authenticated user attached as the owner of the resource.

Run the test again and see that we're a step closer. All the TestCreateCleaning tests are passing.

Get Cleanings

Now we're moving on to the GET requests.

Let's first add the next set of cleaning tests.

tests/test_cleanings.py
# ...other code
class TestGetCleaning:
async def test_get_cleaning_by_id(
self, app: FastAPI, authorized_client: AsyncClient, test_cleaning: CleaningInDB
) -> None:
res = await authorized_client.get(app.url_path_for("cleanings:get-cleaning-by-id", cleaning_id=test_cleaning.id))
assert res.status_code == status.HTTP_200_OK
cleaning = CleaningInDB(**res.json())
assert cleaning == test_cleaning
async def test_unauthorized_users_cant_access_cleanings(
self, app: FastAPI, client: AsyncClient, test_cleaning: CleaningInDB
) -> None:
res = await client.get(app.url_path_for("cleanings:get-cleaning-by-id", cleaning_id=test_cleaning.id))
assert res.status_code == status.HTTP_401_UNAUTHORIZED
@pytest.mark.parametrize(
"id, status_code", ((50000, 404), (-1, 422), (None, 422)),
)
async def test_wrong_id_returns_error(
self, app: FastAPI, authorized_client: AsyncClient, id: int, status_code: int
) -> None:
res = await authorized_client.get(app.url_path_for("cleanings:get-cleaning-by-id", cleaning_id=id))
assert res.status_code == status_code
async def test_get_all_cleanings_returns_only_user_owned_cleanings(
self,
app: FastAPI,
authorized_client: AsyncClient,
test_user: UserInDB,
db: Database,
test_cleaning: CleaningInDB,
test_cleanings_list: List[CleaningInDB],
) -> None:
res = await authorized_client.get(app.url_path_for("cleanings:get-all-cleanings"))
assert res.status_code == status.HTTP_200_OK
assert isinstance(res.json(), list)
assert len(res.json()) > 0
cleanings = [CleaningInDB(**l) for l in res.json()]
# check that a cleaning created by our user is returned
assert test_cleaning in cleanings
# test that all cleanings returned are owned by this user
for cleaning in cleanings:
assert cleaning.owner == test_user.id
# assert all cleanings created by another user not included (redundant, but fine)
assert all(c not in cleanings for c in test_cleanings_list)

We're doing a lot of the same thing here with fetching cleaning jobs. Users should only be able to get a cleaning resource if they're authenticated, and when users ask to list all cleaning jobs, we only send back those that belong to them. Otherwise things are pretty much the same.

Let's get these passing.

Open up the CleaningsRepository and update it with the following:

db/repositories/cleanings.py
# ...other code
GET_CLEANING_BY_ID_QUERY = """
SELECT id, name, description, price, cleaning_type, owner, created_at, updated_at
FROM cleanings
WHERE id = :id;
"""
GET_ALL_CLEANINGS_QUERY = """
SELECT id, name, description, price, cleaning_type, owner, created_at, updated_at
FROM cleanings
WHERE owner = :owner;
"""
class CleaningsRepository(BaseRepository):
""""
All database actions associated with the Cleaning resource
"""
async def create_cleaning(self, *, new_cleaning: CleaningCreate, requesting_user: UserInDB) -> CleaningInDB:
cleaning = await self.db.fetch_one(
query=CREATE_CLEANING_QUERY, values={**new_cleaning.dict(), "owner": requesting_user.id}
)
return CleaningInDB(**cleaning)
async def get_cleaning_by_id(self, *, id: int, requesting_user: UserInDB) -> CleaningInDB:
cleaning = await self.db.fetch_one(query=GET_CLEANING_BY_ID_QUERY, values={"id": id})
if not cleaning:
return None
return CleaningInDB(**cleaning)
async def get_all_cleanings(self, requesting_user: UserInDB) -> List[CleaningInDB]:
cleaning_records = await self.db.fetch_all(query=GET_ALL_CLEANINGS_QUERY, values={"owner": requesting_user.id})
return [CleaningInDB(**l) for l in cleaning_records]

We're now expecting the requesting_user in each of our methods. Even though the get_cleaning_by_id method doesn't use that parameter, it's there for consistency. Besides, we'll end up using it later anyway - so keep it there.

Let's modify our routes to support these changes as well.

api/routes/cleanings.py
# ...other code
@router.get("/{cleaning_id}/", response_model=CleaningPublic, name="cleanings:get-cleaning-by-id")
async def get_cleaning_by_id(
cleaning_id: int = Path(..., ge=1),
current_user: UserInDB = Depends(get_current_active_user),
cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),
) -> CleaningPublic:
cleaning = await cleanings_repo.get_cleaning_by_id(id=cleaning_id, requesting_user=current_user)
if not cleaning:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No cleaning found with that id.")
return cleaning
@router.get("/", response_model=List[CleaningPublic], name="cleanings:get-all-cleanings")
async def get_all_cleanings(
current_user: UserInDB = Depends(get_current_active_user),
cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),
) -> List[CleaningPublic]:
return await cleanings_repo.get_all_cleanings(requesting_user=current_user)

In both of these routes, we're using the get_current_active_user dependency to protect the route. On top of that, we pass the user to our CleaningsRepository for all relevant database activity.

Only two more to go.

Update Cleanings

The tests for updating cleanings need only a few modifications.

tests/test_cleanings.py
# ...other code
class TestUpdateCleaning:
@pytest.mark.parametrize(
"attrs_to_change, values",
(
(["name"], ["new fake cleaning name"]),
(["description"], ["new fake cleaning description"]),
(["price"], [3.14]),
(["cleaning_type"], ["full_clean"]),
(["name", "description"], ["extra new fake cleaning name", "extra new fake cleaning description"]),
(["price", "cleaning_type"], [42.00, "dust_up"]),
),
)
async def test_update_cleaning_with_valid_input(
self,
app: FastAPI,
authorized_client: AsyncClient,
test_cleaning: CleaningInDB,
attrs_to_change: List[str],
values: List[str],
) -> None:
cleaning_update = {"cleaning_update": {attrs_to_change[i]: values[i] for i in range(len(attrs_to_change))}}
res = await authorized_client.put(
app.url_path_for("cleanings:update-cleaning-by-id", cleaning_id=test_cleaning.id), json=cleaning_update
)
assert res.status_code == status.HTTP_200_OK
updated_cleaning = CleaningInDB(**res.json())
assert updated_cleaning.id == test_cleaning.id # make sure it's the same cleaning
# make sure that any attribute we updated has changed to the correct value
for i in range(len(attrs_to_change)):
assert getattr(updated_cleaning, attrs_to_change[i]) != getattr(test_cleaning, attrs_to_change[i])
assert getattr(updated_cleaning, attrs_to_change[i]) == values[i]
# make sure that no other attributes' values have changed
for attr, value in updated_cleaning.dict().items():
if attr not in attrs_to_change and attr != "updated_at":
assert getattr(test_cleaning, attr) == value
async def test_user_recieves_error_if_updating_other_users_cleaning(
self, app: FastAPI, authorized_client: AsyncClient, test_cleanings_list: List[CleaningInDB],
) -> None:
res = await authorized_client.put(
app.url_path_for("cleanings:update-cleaning-by-id", cleaning_id=test_cleanings_list[0].id),
json={"cleaning_update": {"price": 99.99}},
)
assert res.status_code == status.HTTP_403_FORBIDDEN
async def test_user_cant_change_ownership_of_cleaning(
self,
app: FastAPI,
authorized_client: AsyncClient,
test_cleaning: CleaningInDB,
test_user: UserInDB,
test_user2: UserInDB,
) -> None:
res = await authorized_client.put(
app.url_path_for("cleanings:update-cleaning-by-id", cleaning_id=test_cleaning.id),
json={"cleaning_update": {"owner": test_user2.id}},
)
assert res.status_code == status.HTTP_200_OK
cleaning = CleaningPublic(**res.json())
assert cleaning.owner == test_user.id
@pytest.mark.parametrize(
"id, payload, status_code",
(
(-1, {"name": "test"}, 422),
(0, {"name": "test2"}, 422),
(500, {"name": "test3"}, 404),
(1, None, 422),
(1, {"cleaning_type": "invalid cleaning type"}, 422),
(1, {"cleaning_type": None}, 400),
),
)
async def test_update_cleaning_with_invalid_input_throws_error(
self, app: FastAPI, authorized_client: AsyncClient, id: int, payload: Dict[str, Optional[str]], status_code: int
) -> None:
cleaning_update = {"cleaning_update": payload}
res = await authorized_client.put(
app.url_path_for("cleanings:update-cleaning-by-id", cleaning_id=id), json=cleaning_update
)
assert res.status_code == status_code

All we've really done here is ensure that our authenticated user doesn't have permission to update another user's cleaning resource. We also make sure that a user can't change the owner of their own cleanig resource. Everything else is the same.

And on to the CleaningsRepository.

db/repositories/cleanings.py
# ...other code
UPDATE_CLEANING_BY_ID_QUERY = """
UPDATE cleanings
SET name = :name,
description = :description,
price = :price,
cleaning_type = :cleaning_type
WHERE id = :id AND owner = :owner
RETURNING id, name, description, price, cleaning_type, owner, created_at, updated_at;
"""
class CleaningsRepository(BaseRepository):
# ...other code
async def update_cleaning(
self, *, id: int, cleaning_update: CleaningUpdate, requesting_user: UserInDB
) -> CleaningInDB:
cleaning = await self.get_cleaning_by_id(id=id, requesting_user=requesting_user)
if not cleaning:
return None
if cleaning.owner != requesting_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Users are only able to update cleanings that they created.",
)
cleaning_update_params = cleaning.copy(update=cleaning_update.dict(exclude_unset=True))
if cleaning_update_params.cleaning_type is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid cleaning type. Cannot be None."
)
updated_cleaning = await self.db.fetch_one(
query=UPDATE_CLEANING_BY_ID_QUERY,
values={
**cleaning_update_params.dict(exclude={"created_at", "updated_at"}),
"owner": requesting_user.id,
},
)
return CleaningInDB(**updated_cleaning)

Wow! Our update_cleaning method has really ballooned in size! That's ok, since we're mostly handling edge cases. If we find ourselves doing this frequently, it might make more sense to build out the BaseRepository a bit more to handle a lot of the boilerplate. We won't do that now since we're so close to finished, but keep it in mind.

And for the route:

api/routes/cleanings.py
# ...other code
@router.put("/{cleaning_id}/", response_model=CleaningPublic, name="cleanings:update-cleaning-by-id")
async def update_cleaning_by_id(
cleaning_id: int = Path(..., ge=1),
current_user: UserInDB = Depends(get_current_active_user),
cleaning_update: CleaningUpdate = Body(..., embed=True),
cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),
) -> CleaningPublic:
updated_cleaning = await cleanings_repo.update_cleaning(
id=cleaning_id, cleaning_update=cleaning_update, requesting_user=current_user
)
if not updated_cleaning:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No cleaning found with that id.")
return updated_cleaning

Much of the same old stuff here. The biggest difference is that we're protecting the route with our get_current_active_user dependency and passing it to our repo. We won't belabor the point anymore.

Run the tests again. So close.

One more to go

Delete Cleanings

Let's polish this off nice and clean by finishing our DELETE route.

Add one last test class:

tests/test_cleanings.py
# ...other code
class TestDeleteCleaning:
async def test_can_delete_cleaning_successfully(
self, app: FastAPI, authorized_client: AsyncClient, test_cleaning: CleaningInDB
) -> None:
res = await authorized_client.delete(
app.url_path_for("cleanings:delete-cleaning-by-id", cleaning_id=test_cleaning.id)
)
assert res.status_code == status.HTTP_200_OK
async def test_user_cant_delete_other_users_cleaning(
self, app: FastAPI, authorized_client: AsyncClient, test_cleanings_list: List[CleaningInDB],
) -> None:
res = await authorized_client.delete(
app.url_path_for("cleanings:delete-cleaning-by-id", cleaning_id=test_cleanings_list[0].id)
)
assert res.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.parametrize(
"id, status_code", ((5000000, 404), (0, 422), (-1, 422), (None, 422)),
)
async def test_wrong_id_throws_error(
self, app: FastAPI, authorized_client: AsyncClient, test_cleaning: CleaningInDB, id: int, status_code: int
) -> None:
res = await authorized_client.delete(app.url_path_for("cleanings:delete-cleaning-by-id", cleaning_id=id))
assert res.status_code == status_code

Our last update to the test_cleanings file checks to make sure that users can delete their own cleaning jobs and that they can't delete other users' cleaning jobs.

Let's make that happen in our cleaning repo.

db/repositories/cleanings.py
# ...other code
DELETE_CLEANING_BY_ID_QUERY = """
DELETE FROM cleanings
WHERE id = :id AND owner = :owner
RETURNING id;
"""
class CleaningsRepository(BaseRepository):
# ...other code
async def delete_cleaning_by_id(self, *, id: int, requesting_user: UserInDB) -> int:
cleaning = await self.get_cleaning_by_id(id=id, requesting_user=requesting_user)
if not cleaning:
return None
if cleaning.owner != requesting_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Users are only able to delete cleanings that they created.",
)
deleted_id = await self.db.execute(
query=DELETE_CLEANING_BY_ID_QUERY, values={"id": id, "owner": requesting_user.id}
)
return deleted_id

We're doing the same thing with our delete_cleaning_by_id methods that we did with all the others. Pass in the requesting_user and ensure that they're only allowed to delete the cleaning jobs they own. Otherwise we raise the proepr exception.

Last, but not least, the DELETE endpoint.

api/routes/cleanings.py
# ...other code
@router.delete("/{cleaning_id}/", response_model=int, name="cleanings:delete-cleaning-by-id")
async def delete_cleaning_by_id(
cleaning_id: int = Path(..., ge=1),
current_user: UserInDB = Depends(get_current_active_user),
cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),
) -> int:
deleted_id = await cleanings_repo.delete_cleaning_by_id(id=cleaning_id, requesting_user=current_user)
if not deleted_id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No cleaning found with that id.")
return deleted_id

We run our tests and...finally! All passing.

But there's something fishy about this. We're duplicating code all over the place. We keep checking to see if a cleaning exists and raising a 404 exception if it doesn't. We're also raising 403 exceptions when the user isn't allowed to modify or delete a resource.

Is there a better way to do that?

Well, actually there is. We can use FastAPI's built-in dependency system to handle that for us. Let's first create a new dependencies file for cleanings, and then we'll refactor our code a bit.

touch backend/app/api/dependencies/cleanings.py

And add a new dependency callable:

api/dependencies/cleanings.py
from fastapi import HTTPException, Depends, Path, status
from app.models.user import UserInDB
from app.models.cleaning import CleaningInDB
from app.db.repositories.cleanings import CleaningsRepository
from app.api.dependencies.database import get_repository
from app.api.dependencies.auth import get_current_active_user
async def get_cleaning_by_id_from_path(
cleaning_id: int = Path(..., ge=1),
current_user: UserInDB = Depends(get_current_active_user),
cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),
) -> CleaningInDB:
cleaning = await cleanings_repo.get_cleaning_by_id(id=cleaning_id, requesting_user=current_user)
if not cleaning:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="No cleaning found with that id.",
)
return cleaning
def check_cleaning_modification_permissions(
current_user: UserInDB = Depends(get_current_active_user),
cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path),
) -> None:
if cleaning.owner != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Action forbidden. Users are only able to modify cleanings they own.",
)

We've abstracted both of our common exceptions into dependencies that will help us manage access to any cleaning resource. Now we can simplify our routes and repository significantly. Let's see how our updated CleaningsRepository looks first.

db/repositories/cleanings.py
from typing import List
from fastapi import HTTPException, status
from app.db.repositories.base import BaseRepository
from app.models.cleaning import CleaningCreate, CleaningUpdate, CleaningInDB
from app.models.user import UserInDB
CREATE_CLEANING_QUERY = """
INSERT INTO cleanings (name, description, price, cleaning_type, owner)
VALUES (:name, :description, :price, :cleaning_type, :owner)
RETURNING id, name, description, price, cleaning_type, owner, created_at, updated_at;
"""
GET_CLEANING_BY_ID_QUERY = """
SELECT id, name, description, price, cleaning_type, owner, created_at, updated_at
FROM cleanings
WHERE id = :id;
"""
GET_ALL_CLEANINGS_QUERY = """
SELECT id, name, description, price, cleaning_type, owner, created_at, updated_at
FROM cleanings
WHERE owner = :owner;
"""
UPDATE_CLEANING_BY_ID_QUERY = """
UPDATE cleanings
SET name = :name,
description = :description,
price = :price,
cleaning_type = :cleaning_type
WHERE id = :id
RETURNING id, name, description, price, cleaning_type, owner, created_at, updated_at;
"""
DELETE_CLEANING_BY_ID_QUERY = """
DELETE FROM cleanings
WHERE id = :id
RETURNING id;
"""
class CleaningsRepository(BaseRepository):
""""
All database actions associated with the Cleaning resource
"""
async def create_cleaning(self, *, new_cleaning: CleaningCreate, requesting_user: UserInDB) -> CleaningInDB:
cleaning = await self.db.fetch_one(
query=CREATE_CLEANING_QUERY, values={**new_cleaning.dict(), "owner": requesting_user.id}
)
return CleaningInDB(**cleaning)
async def get_cleaning_by_id(self, *, id: int, requesting_user: UserInDB) -> CleaningInDB:
cleaning = await self.db.fetch_one(query=GET_CLEANING_BY_ID_QUERY, values={"id": id})
if not cleaning:
return None
return CleaningInDB(**cleaning)
async def get_all_cleanings(self, requesting_user: UserInDB) -> List[CleaningInDB]:
cleaning_records = await self.db.fetch_all(query=GET_ALL_CLEANINGS_QUERY, values={"owner": requesting_user.id})
return [CleaningInDB(**l) for l in cleaning_records]
async def update_cleaning(self, *, cleaning: CleaningInDB, cleaning_update: CleaningUpdate) -> CleaningInDB:
cleaning_update_params = cleaning.copy(update=cleaning_update.dict(exclude_unset=True))
if cleaning_update_params.cleaning_type is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid cleaning type. Cannot be None."
)
updated_cleaning = await self.db.fetch_one(
query=UPDATE_CLEANING_BY_ID_QUERY,
values=cleaning_update_params.dict(exclude={"owner", "created_at", "updated_at"}),
)
return CleaningInDB(**updated_cleaning)
async def delete_cleaning_by_id(self, *, cleaning: CleaningInDB) -> int:
return await self.db.execute(query=DELETE_CLEANING_BY_ID_QUERY, values={"id": cleaning.id})

There is significantly less code here now. The repository is now accepting a CleaningInDB model instead of an id for both the update and delete actions. Our dependency uses the get_cleaning_by_id method to handle all 404 issues, making our life much easier. We've also removed any references to requesting_user for our modification actions because our check_cleaning_modification_permissions dependency is handling that for us.

So how do we use them in our routes? Well, here's where things get interesting.

We'll look at new api/routes/cleanings.py file here as well.

api/routes/cleanings.py
from typing import List
from fastapi import APIRouter, Body, Depends, status
from app.models.user import UserInDB
from app.models.cleaning import CleaningCreate, CleaningUpdate, CleaningInDB, CleaningPublic
from app.db.repositories.cleanings import CleaningsRepository
from app.api.dependencies.database import get_repository
from app.api.dependencies.auth import get_current_active_user
from app.api.dependencies.cleanings import get_cleaning_by_id_from_path, check_cleaning_modification_permissions
router = APIRouter()
@router.post("/", response_model=CleaningPublic, name="cleanings:create-cleaning", status_code=status.HTTP_201_CREATED)
async def create_new_cleaning(
new_cleaning: CleaningCreate = Body(..., embed=True),
current_user: UserInDB = Depends(get_current_active_user),
cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),
) -> CleaningPublic:
return await cleanings_repo.create_cleaning(new_cleaning=new_cleaning, requesting_user=current_user)
@router.get("/", response_model=List[CleaningPublic], name="cleanings:get-all-cleanings")
async def get_all_cleanings(
current_user: UserInDB = Depends(get_current_active_user),
cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),
) -> List[CleaningPublic]:
return await cleanings_repo.get_all_cleanings(requesting_user=current_user)
@router.get("/{cleaning_id}/", response_model=CleaningPublic, name="cleanings:get-cleaning-by-id")
async def get_cleaning_by_id(cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path)) -> CleaningPublic:
return cleaning
@router.put(
"/{cleaning_id}/",
response_model=CleaningPublic,
name="cleanings:update-cleaning-by-id",
dependencies=[Depends(check_cleaning_modification_permissions)],
)
async def update_cleaning_by_id(
cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path),
cleaning_update: CleaningUpdate = Body(..., embed=True),
cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),
) -> CleaningPublic:
return await cleanings_repo.update_cleaning(cleaning=cleaning, cleaning_update=cleaning_update)
@router.delete(
"/{cleaning_id}/",
response_model=int,
name="cleanings:delete-cleaning-by-id",
dependencies=[Depends(check_cleaning_modification_permissions)],
)
async def delete_cleaning_by_id(
cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path),
cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),
) -> int:
return await cleanings_repo.delete_cleaning_by_id(cleaning=cleaning)

Well, look at that. For each of our route functions, the body is a simple one-liner. Most of the work is being done by our dependencies. In fact, we've even added dependencies in the route decorator for our update and delete endpoints. The FastAPI docs provide more details on that pattern.

Our check_cleaning_modification_permissions dependency ensures that the user has sufficient permission to update or delete a cleaning, so we don't have to do that ourselves.

Run the tests now and see that they're all still passing.

It's comforting to have all these tests in place when we refactor. Now we can be confident that our code is working as we expect it to even when we make large refactors such as this one. And this is a big improvement. We have much less code duplication and we've extracted permissions into its own system.

And there we have it! Refactoring is sometimes the least fun part of TDD, but it's essential. We'll do it a few more times before this series is over.

Wrapping Up and Resources

We've now set ourselves up to get into the meat of our application's functionality. In the next post, we'll give users the ability to offer their services for a cleaning job and let owners accept or reject a given offer.

However, now it's time for a break.

  • FastAPI docs on dependecies in path decorators
  • FastAPI Permissions repo - library that extracts permission handling into a system that mirrors the pyramid framework
  • FastAPI Contrib repo - library with built-in permissions system
  • Real World FastAPI repo - inspiration for a lot of the permissions code seen in this post