User Owned Resources in FastAPI
Welcome to Part 10 of Up and Running with FastAPI. If you missed part 9, you can find it here.
This series is focused on building a full-stack application with the FastAPI framework. The app allows users to post requests to have their residence cleaned, and other users can select a cleaning project for a given hourly rate.
part 1
part 2
part 3
part 4
part 5
part 6
part 7
part 8
part 9
part 10
In the previous post we built out user profiles and ownership, and made sure our API returned user profiles when necessary. Now we'll be getting to the meat of this application's functionality. In this post, we'll make users own cleaning resources they create and ensure that only the creators can manage their own cleanings jobs.
User-Owned Cleaning Jobs
Our database setup for the cleanings resource is rather naive. At the moment, we have no way of tracking who created a cleaning job. We're going to fix that. And in doing so, we're going to make it easy to create marketplace-style functionality.
Again, we'll begin with the migrations file.
Migrations
Just like before, let's start by rolling back our migrations.
docker psdocker exec -it [CONTAINER_ID] bashalembic downgrade base
Now go ahead and open up the file that looks like: db/migrations/versions/12345678654_create_main_tables.py
.
"""create main tablesRevision ID: 123456786543Revises:Create Date: 2020-05-05 10:41:35.468471"""from typing import Tuplefrom alembic import opimport sqlalchemy as sa# revision identifiers, used by Alembicrevision = "123456786543"down_revision = Nonebranch_labels = Nonedepends_on = Nonedef create_updated_at_trigger() -> None: op.execute( """ CREATE OR REPLACE FUNCTION update_updated_at_column() RETURNS TRIGGER AS $$ BEGIN NEW.updated_at = now(); RETURN NEW; END; $$ language 'plpgsql'; """ )def timestamps(indexed: bool = False) -> Tuple[sa.Column, sa.Column]: return ( sa.Column( "created_at", sa.TIMESTAMP(timezone=True), server_default=sa.func.now(), nullable=False, index=indexed, ), sa.Column( "updated_at", sa.TIMESTAMP(timezone=True), server_default=sa.func.now(), nullable=False, index=indexed, ), )def create_users_table() -> None: op.create_table( "users", sa.Column("id", sa.Integer, primary_key=True), sa.Column("username", sa.Text, unique=True, nullable=False, index=True), sa.Column("email", sa.Text, unique=True, nullable=False, index=True), sa.Column("email_verified", sa.Boolean, nullable=False, server_default="False"), sa.Column("salt", sa.Text, nullable=False), sa.Column("password", sa.Text, nullable=False), sa.Column("is_active", sa.Boolean(), nullable=False, server_default="True"), sa.Column("is_superuser", sa.Boolean(), nullable=False, server_default="False"), *timestamps(), ) op.execute( """ CREATE TRIGGER update_user_modtime BEFORE UPDATE ON users FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column(); """ )def create_profiles_table() -> None: op.create_table( "profiles", sa.Column("id", sa.Integer, primary_key=True), sa.Column("full_name", sa.Text, nullable=True), sa.Column("phone_number", sa.Text, nullable=True), sa.Column("bio", sa.Text, nullable=True, server_default=""), sa.Column("image", sa.Text, nullable=True), sa.Column("user_id", sa.Integer, sa.ForeignKey("users.id", ondelete="CASCADE")), *timestamps(), ) op.execute( """ CREATE TRIGGER update_profiles_modtime BEFORE UPDATE ON profiles FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column(); """ )def create_cleanings_table() -> None: op.create_table( "cleanings", sa.Column("id", sa.Integer, primary_key=True), sa.Column("name", sa.Text, nullable=False, index=True), sa.Column("description", sa.Text, nullable=True), sa.Column("cleaning_type", sa.Text, nullable=False, server_default="spot_clean"), sa.Column("price", sa.Numeric(10, 5), nullable=False), sa.Column("owner", sa.Integer, sa.ForeignKey("users.id", ondelete="CASCADE")), *timestamps(indexed=True), ) op.execute( """ CREATE TRIGGER update_cleanings_modtime BEFORE UPDATE ON cleanings FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column(); """ )def upgrade() -> None: create_updated_at_trigger() create_users_table() create_profiles_table() create_cleanings_table()def downgrade() -> None: op.drop_table("cleanings") op.drop_table("profiles") op.drop_table("users") op.execute("DROP FUNCTION update_updated_at_column")
We've moved the creation of the cleanings
table to after we've created the users
and profiles
tables. The reason being is that we want to reference the users
table when we define an owner
column on the cleanings
table. Here we set the value of that column equal to the id of the user that created it. This has multiple repercussions, but we'll get to those in a minute.
Let's go ahead and migrate the database by entering our docker container like before and running the upgrade head
command.
docker psdocker exec -it [CONTAINER_ID] bashalembic upgrade head
Now that we've given users ownership
of a cleaning resource in our database, we'll want to refactor our code a bit. Any user should be able to access a cleaning resource, but only the user that created them should be able to update it and delete it. Users that want to list all cleanings should only recieve the ones that they themselves have created.
Usually, we'd start with testing. But since we've modified the database, let's dip into the models/cleaning.py
file first.
Modeling User Ownership
from typing import Optional, Unionfrom enum import Enumfrom app.models.core import IDModelMixin, DateTimeModelMixin, CoreModelfrom app.models.user import UserPublic# ...other codeclass CleaningInDB(IDModelMixin, DateTimeModelMixin, CleaningBase): name: str price: float cleaning_type: CleaningType owner: intclass CleaningPublic(CleaningInDB): owner: Union[int, UserPublic]
We're adding the owner
attribute to our CleaningInDB
model that will an integer representing the id of the owning user. On top of that, we're finally taking advantage of timestamps by utilizing DateTimeModelMixin
in our CleaningInDB
model. Our CleaningPublic
model simply inherits everything from CleaningInDB
, but specifies that the owner
attribute can be either an int
id of the user, or the actual UserPublic
model itself.
If we were to run our tests at this point, most of them would error out. Feel free to try it out.
Let's go ahead and fix that. We're going to make quite a few updates, so don't be concerned when our tests start breaking.
Starting with our test_cleaning
fixture in the conftest.py
file, modify it with the following code:
# ...other code@pytest.fixtureasync def test_cleaning(db: Database, test_user: UserInDB) -> CleaningInDB: cleaning_repo = CleaningsRepository(db) new_cleaning = CleaningCreate( name="fake cleaning name", description="fake cleaning description", price=9.99, cleaning_type="spot_clean" ) return await cleaning_repo.create_cleaning(new_cleaning=new_cleaning, requesting_user=test_user)# ...other code
We're taking in the test_user
fixture and sending it to the CleaningsRepository
whenever a new cleaning is created. We'll want to take that user and pass their id to the database as the owner
attribute.
Let's do that now.
Create Cleanings
Make the following changes to the CleaningsRepository
.
from typing import Listfrom fastapi import HTTPException, statusfrom app.db.repositories.base import BaseRepositoryfrom app.models.cleaning import CleaningCreate, CleaningUpdate, CleaningInDBfrom app.models.user import UserInDBCREATE_CLEANING_QUERY = """ INSERT INTO cleanings (name, description, price, cleaning_type, owner) VALUES (:name, :description, :price, :cleaning_type, :owner) RETURNING id, name, description, price, cleaning_type, owner, created_at, updated_at;"""# ...other codeclass CleaningsRepository(BaseRepository): """" All database actions associated with the Cleaning resource """ async def create_cleaning(self, *, new_cleaning: CleaningCreate, requesting_user: UserInDB) -> CleaningInDB: cleaning = await self.db.fetch_one( query=CREATE_CLEANING_QUERY, values={**new_cleaning.dict(), "owner": requesting_user.id} ) return CleaningInDB(**cleaning) # ...other code
Though we've only updated the CREATE_CLEANING_QUERY
, we're going to need to change most of the SQL. We've added in the owner
attribute, and we're returning the timestamps as well. If we were to run our tests now, we'd still error out as we're not passing the currently logged in user to the CleaningRepository
. We'll get to that in a moment.
First, let's update our tests/test_cleanings.py
file. We're going to basically start from scratch, keeping what we like and discarding what we don't need anymore.
from typing import List, Dict, Union, Optionalimport pytestfrom httpx import AsyncClientfrom fastapi import FastAPI, statusfrom databases import Databasefrom app.db.repositories.cleanings import CleaningsRepositoryfrom app.models.cleaning import CleaningCreate, CleaningInDB, CleaningPublicfrom app.models.user import UserInDBpytestmark = pytest.mark.asyncio@pytest.fixturedef new_cleaning(): return CleaningCreate( name="test cleaning", description="test description", price=10.00, cleaning_type="spot_clean", )@pytest.fixtureasync def test_cleanings_list(db: Database, test_user2: UserInDB) -> List[CleaningInDB]: cleaning_repo = CleaningsRepository(db) return [ await cleaning_repo.create_cleaning( new_cleaning=CleaningCreate( name=f"test cleaning {i}", description="test description", price=20.00, cleaning_type="full_clean" ), requesting_user=test_user2, ) for i in range(5) ]class TestCleaningsRoutes: """ Check each cleaning route to ensure none return 404s """ async def test_routes_exist(self, app: FastAPI, client: AsyncClient) -> None: res = await client.post(app.url_path_for("cleanings:create-cleaning"), json={}) assert res.status_code != status.HTTP_404_NOT_FOUND res = await client.get(app.url_path_for("cleanings:get-cleaning-by-id", cleaning_id=1)) assert res.status_code != status.HTTP_404_NOT_FOUND res = await client.get(app.url_path_for("cleanings:list-all-user-cleanings")) assert res.status_code != status.HTTP_404_NOT_FOUND res = await client.put(app.url_path_for("cleanings:update-cleaning-by-id", cleaning_id=1)) assert res.status_code != status.HTTP_404_NOT_FOUND res = await client.delete(app.url_path_for("cleanings:delete-cleaning-by-id", cleaning_id=0)) assert res.status_code != status.HTTP_404_NOT_FOUNDclass TestCreateCleaning: async def test_valid_input_creates_cleaning_belonging_to_user( self, app: FastAPI, authorized_client: AsyncClient, test_user: UserInDB, new_cleaning: CleaningCreate ) -> None: res = await authorized_client.post( app.url_path_for("cleanings:create-cleaning"), json={"new_cleaning": new_cleaning.dict()} ) assert res.status_code == status.HTTP_201_CREATED created_cleaning = CleaningPublic(**res.json()) assert created_cleaning.name == new_cleaning.name assert created_cleaning.price == new_cleaning.price assert created_cleaning.cleaning_type == new_cleaning.cleaning_type assert created_cleaning.owner == test_user.id async def test_unauthorized_user_unable_to_create_cleaning( self, app: FastAPI, client: AsyncClient, new_cleaning: CleaningCreate ) -> None: res = await client.post( app.url_path_for("cleanings:create-cleaning"), json={"new_cleaning": new_cleaning.dict()} ) assert res.status_code == status.HTTP_401_UNAUTHORIZED @pytest.mark.parametrize( "invalid_payload, status_code", ( (None, 422), ({}, 422), ({"name": "test"}, 422), ({"price": 10.00}, 422), ({"name": "test", "description": "test"}, 422), ), ) async def test_invalid_input_raises_error( self, app: FastAPI, authorized_client: AsyncClient, invalid_payload: Dict[str, Union[str, float]], test_cleaning: CleaningCreate, status_code: int, ) -> None: res = await authorized_client.post( app.url_path_for("cleanings:create-cleaning"), json={"new_cleaning": invalid_payload} ) assert res.status_code == status_code
We've added a new test to ensure that unauthenticated users can't create cleaning opportunities, and we're checking that any newly created cleanings have the currently logged in user as their owner.
Open up the api/routes/cleanings.py
file and make that happen. We'll start from scratch here too, going route by route.
from typing import Listfrom fastapi import APIRouter, Body, Path, Depends, HTTPException, statusfrom app.models.user import UserCreate, UserUpdate, UserInDB, UserPublicfrom app.models.cleaning import CleaningCreate, CleaningUpdate, CleaningPublicfrom app.db.repositories.cleanings import CleaningsRepositoryfrom app.api.dependencies.database import get_repositoryfrom app.api.dependencies.auth import get_current_active_userrouter = APIRouter()@router.post("/", response_model=CleaningPublic, name="cleanings:create-cleaning", status_code=status.HTTP_201_CREATED)async def create_new_cleaning( new_cleaning: CleaningCreate = Body(..., embed=True), current_user: UserInDB = Depends(get_current_active_user), cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),) -> CleaningPublic: created_cleaning = await cleanings_repo.create_cleaning(new_cleaning=new_cleaning, requesting_user=current_user) return created_cleaning
We're now using the same auth dependencies we defined in one of our previous posts, and we're passing the logged in user to the CleaningsRepository
. This way any new cleaning will have the currently authenticated user attached as the owner of the resource.
Run the test again and see that we're a step closer. All the TestCreateCleaning
tests are passing.
Get Cleanings
Now we're moving on to the GET
requests.
Let's first add the next set of cleaning tests.
# ...other codeclass TestGetCleaning: async def test_get_cleaning_by_id( self, app: FastAPI, authorized_client: AsyncClient, test_cleaning: CleaningInDB ) -> None: res = await authorized_client.get(app.url_path_for("cleanings:get-cleaning-by-id", cleaning_id=test_cleaning.id)) assert res.status_code == status.HTTP_200_OK cleaning = CleaningInDB(**res.json()) assert cleaning == test_cleaning async def test_unauthorized_users_cant_access_cleanings( self, app: FastAPI, client: AsyncClient, test_cleaning: CleaningInDB ) -> None: res = await client.get(app.url_path_for("cleanings:get-cleaning-by-id", cleaning_id=test_cleaning.id)) assert res.status_code == status.HTTP_401_UNAUTHORIZED @pytest.mark.parametrize( "id, status_code", ((50000, 404), (-1, 422), (None, 422)), ) async def test_wrong_id_returns_error( self, app: FastAPI, authorized_client: AsyncClient, id: int, status_code: int ) -> None: res = await authorized_client.get(app.url_path_for("cleanings:get-cleaning-by-id", cleaning_id=id)) assert res.status_code == status_code async def test_get_all_cleanings_returns_only_user_owned_cleanings( self, app: FastAPI, authorized_client: AsyncClient, test_user: UserInDB, db: Database, test_cleaning: CleaningInDB, test_cleanings_list: List[CleaningInDB], ) -> None: res = await authorized_client.get(app.url_path_for("cleanings:list-all-user-cleanings")) assert res.status_code == status.HTTP_200_OK assert isinstance(res.json(), list) assert len(res.json()) > 0 cleanings = [CleaningInDB(**l) for l in res.json()] # check that a cleaning created by our user is returned assert test_cleaning in cleanings # test that all cleanings returned are owned by this user for cleaning in cleanings: assert cleaning.owner == test_user.id # assert all cleanings created by another user not included (redundant, but fine) assert all(c not in cleanings for c in test_cleanings_list)
We're doing a lot of the same thing here with fetching cleaning jobs. Users should only be able to get a cleaning resource if they're authenticated, and when users ask to list all cleaning jobs, we only send back those that belong to them. Otherwise things are pretty much the same.
Let's get these passing.
Open up the CleaningsRepository
and update it with the following:
# ...other codeGET_CLEANING_BY_ID_QUERY = """ SELECT id, name, description, price, cleaning_type, owner, created_at, updated_at FROM cleanings WHERE id = :id;"""LIST_ALL_USER_CLEANINGS_QUERY = """ SELECT id, name, description, price, cleaning_type, owner, created_at, updated_at FROM cleanings WHERE owner = :owner;"""class CleaningsRepository(BaseRepository): """ All database actions associated with the Cleaning resource """ async def create_cleaning(self, *, new_cleaning: CleaningCreate, requesting_user: UserInDB) -> CleaningInDB: cleaning = await self.db.fetch_one( query=CREATE_CLEANING_QUERY, values={**new_cleaning.dict(), "owner": requesting_user.id} ) return CleaningInDB(**cleaning) async def get_cleaning_by_id(self, *, id: int, requesting_user: UserInDB) -> CleaningInDB: cleaning = await self.db.fetch_one(query=GET_CLEANING_BY_ID_QUERY, values={"id": id}) if not cleaning: return None return CleaningInDB(**cleaning) async def list_all_user_cleanings(self, requesting_user: UserInDB) -> List[CleaningInDB]: cleaning_records = await self.db.fetch_all( query=LIST_ALL_USER_CLEANINGS_QUERY, values={"owner": requesting_user.id} ) return [CleaningInDB(**l) for l in cleaning_records]
We're now expecting the requesting_user
in each of our methods. Even though the get_cleaning_by_id
method doesn't use that parameter, it's there for consistency. Besides, we'll end up using it later anyway - so keep it there.
Let's modify our routes to support these changes as well.
# ...other code@router.get("/{cleaning_id}/", response_model=CleaningPublic, name="cleanings:get-cleaning-by-id")async def get_cleaning_by_id( cleaning_id: int = Path(..., ge=1), current_user: UserInDB = Depends(get_current_active_user), cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),) -> CleaningPublic: cleaning = await cleanings_repo.get_cleaning_by_id(id=cleaning_id, requesting_user=current_user) if not cleaning: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No cleaning found with that id.") return cleaning@router.get("/", response_model=List[CleaningPublic], name="cleanings:list-all-user-cleanings")async def list_all_user_cleanings( current_user: UserInDB = Depends(get_current_active_user), cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),) -> List[CleaningPublic]: return await cleanings_repo.list_all_user_cleanings(requesting_user=current_user)
In both of these routes, we're using the get_current_active_user
dependency to protect the route. On top of that, we pass the user to our CleaningsRepository
for all relevant database activity.
Only two more to go.
Update Cleanings
The tests for updating cleanings need only a few modifications.
# ...other codeclass TestUpdateCleaning: @pytest.mark.parametrize( "attrs_to_change, values", ( (["name"], ["new fake cleaning name"]), (["description"], ["new fake cleaning description"]), (["price"], [3.14]), (["cleaning_type"], ["full_clean"]), (["name", "description"], ["extra new fake cleaning name", "extra new fake cleaning description"]), (["price", "cleaning_type"], [42.00, "dust_up"]), ), ) async def test_update_cleaning_with_valid_input( self, app: FastAPI, authorized_client: AsyncClient, test_cleaning: CleaningInDB, attrs_to_change: List[str], values: List[str], ) -> None: cleaning_update = {"cleaning_update": {attrs_to_change[i]: values[i] for i in range(len(attrs_to_change))}} res = await authorized_client.put( app.url_path_for("cleanings:update-cleaning-by-id", cleaning_id=test_cleaning.id), json=cleaning_update ) assert res.status_code == status.HTTP_200_OK updated_cleaning = CleaningInDB(**res.json()) assert updated_cleaning.id == test_cleaning.id # make sure it's the same cleaning # make sure that any attribute we updated has changed to the correct value for i in range(len(attrs_to_change)): assert getattr(updated_cleaning, attrs_to_change[i]) != getattr(test_cleaning, attrs_to_change[i]) assert getattr(updated_cleaning, attrs_to_change[i]) == values[i] # make sure that no other attributes' values have changed for attr, value in updated_cleaning.dict().items(): if attr not in attrs_to_change and attr != "updated_at": assert getattr(test_cleaning, attr) == value async def test_user_recieves_error_if_updating_other_users_cleaning( self, app: FastAPI, authorized_client: AsyncClient, test_cleanings_list: List[CleaningInDB], ) -> None: res = await authorized_client.put( app.url_path_for("cleanings:update-cleaning-by-id", cleaning_id=test_cleanings_list[0].id), json={"cleaning_update": {"price": 99.99}}, ) assert res.status_code == status.HTTP_403_FORBIDDEN async def test_user_cant_change_ownership_of_cleaning( self, app: FastAPI, authorized_client: AsyncClient, test_cleaning: CleaningInDB, test_user: UserInDB, test_user2: UserInDB, ) -> None: res = await authorized_client.put( app.url_path_for("cleanings:update-cleaning-by-id", cleaning_id=test_cleaning.id), json={"cleaning_update": {"owner": test_user2.id}}, ) assert res.status_code == status.HTTP_200_OK cleaning = CleaningPublic(**res.json()) assert cleaning.owner == test_user.id @pytest.mark.parametrize( "id, payload, status_code", ( (-1, {"name": "test"}, 422), (0, {"name": "test2"}, 422), (500, {"name": "test3"}, 404), (1, None, 422), (1, {"cleaning_type": "invalid cleaning type"}, 422), (1, {"cleaning_type": None}, 400), ), ) async def test_update_cleaning_with_invalid_input_throws_error( self, app: FastAPI, authorized_client: AsyncClient, id: int, payload: Dict[str, Optional[str]], status_code: int ) -> None: cleaning_update = {"cleaning_update": payload} res = await authorized_client.put( app.url_path_for("cleanings:update-cleaning-by-id", cleaning_id=id), json=cleaning_update ) assert res.status_code == status_code
All we've really done here is ensure that our authenticated user doesn't have permission to update another user's cleaning resource. We also make sure that a user can't change the owner of their own cleanig resource. Everything else is the same.
And on to the CleaningsRepository
.
# ...other codeUPDATE_CLEANING_BY_ID_QUERY = """ UPDATE cleanings SET name = :name, description = :description, price = :price, cleaning_type = :cleaning_type WHERE id = :id AND owner = :owner RETURNING id, name, description, price, cleaning_type, owner, created_at, updated_at;"""class CleaningsRepository(BaseRepository): # ...other code async def update_cleaning( self, *, id: int, cleaning_update: CleaningUpdate, requesting_user: UserInDB ) -> CleaningInDB: cleaning = await self.get_cleaning_by_id(id=id, requesting_user=requesting_user) if not cleaning: return None if cleaning.owner != requesting_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Users are only able to update cleanings that they created.", ) cleaning_update_params = cleaning.copy(update=cleaning_update.dict(exclude_unset=True)) if cleaning_update_params.cleaning_type is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid cleaning type. Cannot be None." ) updated_cleaning = await self.db.fetch_one( query=UPDATE_CLEANING_BY_ID_QUERY, values={ **cleaning_update_params.dict(exclude={"created_at", "updated_at"}), "owner": requesting_user.id, }, ) return CleaningInDB(**updated_cleaning)
Wow! Our update_cleaning
method has really ballooned in size! That's ok, since we're mostly handling edge cases. If we find ourselves doing this frequently, it might make more sense to build out the BaseRepository
a bit more to handle a lot of the boilerplate. We won't do that now since we're so close to finished, but keep it in mind.
And for the route:
# ...other code@router.put("/{cleaning_id}/", response_model=CleaningPublic, name="cleanings:update-cleaning-by-id")async def update_cleaning_by_id( cleaning_id: int = Path(..., ge=1), current_user: UserInDB = Depends(get_current_active_user), cleaning_update: CleaningUpdate = Body(..., embed=True), cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),) -> CleaningPublic: updated_cleaning = await cleanings_repo.update_cleaning( id=cleaning_id, cleaning_update=cleaning_update, requesting_user=current_user ) if not updated_cleaning: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No cleaning found with that id.") return updated_cleaning
Much of the same old stuff here. The biggest difference is that we're protecting the route with our get_current_active_user
dependency and passing it to our repo. We won't belabor the point anymore.
Run the tests again. So close.
One more to go
Delete Cleanings
Let's polish this off nice and clean by finishing our DELETE
route.
Add one last test class:
# ...other codeclass TestDeleteCleaning: async def test_can_delete_cleaning_successfully( self, app: FastAPI, authorized_client: AsyncClient, test_cleaning: CleaningInDB ) -> None: res = await authorized_client.delete( app.url_path_for("cleanings:delete-cleaning-by-id", cleaning_id=test_cleaning.id) ) assert res.status_code == status.HTTP_200_OK async def test_user_cant_delete_other_users_cleaning( self, app: FastAPI, authorized_client: AsyncClient, test_cleanings_list: List[CleaningInDB], ) -> None: res = await authorized_client.delete( app.url_path_for("cleanings:delete-cleaning-by-id", cleaning_id=test_cleanings_list[0].id) ) assert res.status_code == status.HTTP_403_FORBIDDEN @pytest.mark.parametrize( "id, status_code", ((5000000, 404), (0, 422), (-1, 422), (None, 422)), ) async def test_wrong_id_throws_error( self, app: FastAPI, authorized_client: AsyncClient, test_cleaning: CleaningInDB, id: int, status_code: int ) -> None: res = await authorized_client.delete(app.url_path_for("cleanings:delete-cleaning-by-id", cleaning_id=id)) assert res.status_code == status_code
Our last update to the test_cleanings
file checks to make sure that users can delete their own cleaning jobs and that they can't delete other users' cleaning jobs.
Let's make that happen in our cleaning repo.
# ...other codeDELETE_CLEANING_BY_ID_QUERY = """ DELETE FROM cleanings WHERE id = :id AND owner = :owner RETURNING id;"""class CleaningsRepository(BaseRepository): # ...other code async def delete_cleaning_by_id(self, *, id: int, requesting_user: UserInDB) -> int: cleaning = await self.get_cleaning_by_id(id=id, requesting_user=requesting_user) if not cleaning: return None if cleaning.owner != requesting_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Users are only able to delete cleanings that they created.", ) deleted_id = await self.db.execute( query=DELETE_CLEANING_BY_ID_QUERY, values={"id": id, "owner": requesting_user.id} ) return deleted_id
We're doing the same thing with our delete_cleaning_by_id
methods that we did with all the others. Pass in the requesting_user
and ensure that they're only allowed to delete the cleaning jobs they own. Otherwise we raise the proper exception.
Last, but not least, the DELETE
endpoint.
# ...other code@router.delete("/{cleaning_id}/", response_model=int, name="cleanings:delete-cleaning-by-id")async def delete_cleaning_by_id( cleaning_id: int = Path(..., ge=1), current_user: UserInDB = Depends(get_current_active_user), cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),) -> int: deleted_id = await cleanings_repo.delete_cleaning_by_id(id=cleaning_id, requesting_user=current_user) if not deleted_id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No cleaning found with that id.") return deleted_id
We run our tests and...finally! All passing.
But there's something fishy about this. We're duplicating code all over the place. We keep checking to see if a cleaning exists and raising a 404 exception if it doesn't. We're also raising 403 exceptions when the user isn't allowed to modify or delete a resource.
Is there a better way to do that?
Well, actually there is. We can use FastAPI's built-in dependency system to handle that for us. Let's first create a new dependencies file for cleanings, and then we'll refactor our code a bit.
touch backend/app/api/dependencies/cleanings.py
And add a new dependency callable:
from fastapi import HTTPException, Depends, Path, statusfrom app.models.user import UserInDBfrom app.models.cleaning import CleaningInDBfrom app.db.repositories.cleanings import CleaningsRepositoryfrom app.api.dependencies.database import get_repositoryfrom app.api.dependencies.auth import get_current_active_userasync def get_cleaning_by_id_from_path( cleaning_id: int = Path(..., ge=1), current_user: UserInDB = Depends(get_current_active_user), cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),) -> CleaningInDB: cleaning = await cleanings_repo.get_cleaning_by_id(id=cleaning_id, requesting_user=current_user) if not cleaning: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No cleaning found with that id.", ) return cleaningdef check_cleaning_modification_permissions( current_user: UserInDB = Depends(get_current_active_user), cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path),) -> None: if cleaning.owner != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Action forbidden. Users are only able to modify cleanings they own.", )
We've abstracted both of our common exceptions into dependencies that will help us manage access to any cleaning resource. Now we can simplify our routes and repository significantly. Let's see how our updated CleaningsRepository
looks first.
from typing import Listfrom fastapi import HTTPException, statusfrom app.db.repositories.base import BaseRepositoryfrom app.models.cleaning import CleaningCreate, CleaningUpdate, CleaningInDBfrom app.models.user import UserInDBCREATE_CLEANING_QUERY = """ INSERT INTO cleanings (name, description, price, cleaning_type, owner) VALUES (:name, :description, :price, :cleaning_type, :owner) RETURNING id, name, description, price, cleaning_type, owner, created_at, updated_at;"""GET_CLEANING_BY_ID_QUERY = """ SELECT id, name, description, price, cleaning_type, owner, created_at, updated_at FROM cleanings WHERE id = :id;"""LIST_ALL_USER_CLEANINGS_QUERY = """ SELECT id, name, description, price, cleaning_type, owner, created_at, updated_at FROM cleanings WHERE owner = :owner;"""UPDATE_CLEANING_BY_ID_QUERY = """ UPDATE cleanings SET name = :name, description = :description, price = :price, cleaning_type = :cleaning_type WHERE id = :id RETURNING id, name, description, price, cleaning_type, owner, created_at, updated_at;"""DELETE_CLEANING_BY_ID_QUERY = """ DELETE FROM cleanings WHERE id = :id RETURNING id;"""class CleaningsRepository(BaseRepository): """" All database actions associated with the Cleaning resource """ async def create_cleaning(self, *, new_cleaning: CleaningCreate, requesting_user: UserInDB) -> CleaningInDB: cleaning = await self.db.fetch_one( query=CREATE_CLEANING_QUERY, values={**new_cleaning.dict(), "owner": requesting_user.id} ) return CleaningInDB(**cleaning) async def get_cleaning_by_id(self, *, id: int, requesting_user: UserInDB) -> CleaningInDB: cleaning = await self.db.fetch_one(query=GET_CLEANING_BY_ID_QUERY, values={"id": id}) if not cleaning: return None return CleaningInDB(**cleaning) async def list_all_user_cleanings(self, requesting_user: UserInDB) -> List[CleaningInDB]: cleaning_records = await self.db.fetch_all( query=LIST_ALL_USER_CLEANINGS_QUERY, values={"owner": requesting_user.id} ) return [CleaningInDB(**l) for l in cleaning_records] async def update_cleaning(self, *, cleaning: CleaningInDB, cleaning_update: CleaningUpdate) -> CleaningInDB: cleaning_update_params = cleaning.copy(update=cleaning_update.dict(exclude_unset=True)) if cleaning_update_params.cleaning_type is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid cleaning type. Cannot be None." ) updated_cleaning = await self.db.fetch_one( query=UPDATE_CLEANING_BY_ID_QUERY, values=cleaning_update_params.dict(exclude={"owner", "created_at", "updated_at"}), ) return CleaningInDB(**updated_cleaning) async def delete_cleaning_by_id(self, *, cleaning: CleaningInDB) -> int: return await self.db.execute(query=DELETE_CLEANING_BY_ID_QUERY, values={"id": cleaning.id})
There is significantly less code here now. The repository is now accepting a CleaningInDB
model instead of an id
for both the update and delete actions. Our dependency uses the get_cleaning_by_id
method to handle all 404 issues, making our life much easier. We've also removed any references to requesting_user
for our modification actions because our check_cleaning_modification_permissions
dependency is handling that for us.
So how do we use them in our routes? Well, here's where things get interesting.
We'll look at new api/routes/cleanings.py
file here as well.
from typing import Listfrom fastapi import APIRouter, Body, Depends, statusfrom app.models.user import UserInDBfrom app.models.cleaning import CleaningCreate, CleaningUpdate, CleaningInDB, CleaningPublicfrom app.db.repositories.cleanings import CleaningsRepositoryfrom app.api.dependencies.database import get_repositoryfrom app.api.dependencies.auth import get_current_active_userfrom app.api.dependencies.cleanings import get_cleaning_by_id_from_path, check_cleaning_modification_permissionsrouter = APIRouter()@router.post("/", response_model=CleaningPublic, name="cleanings:create-cleaning", status_code=status.HTTP_201_CREATED)async def create_new_cleaning( new_cleaning: CleaningCreate = Body(..., embed=True), current_user: UserInDB = Depends(get_current_active_user), cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),) -> CleaningPublic: return await cleanings_repo.create_cleaning(new_cleaning=new_cleaning, requesting_user=current_user)@router.get("/", response_model=List[CleaningPublic], name="cleanings:list-all-user-cleanings")async def list_all_user_cleanings( current_user: UserInDB = Depends(get_current_active_user), cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),) -> List[CleaningPublic]: return await cleanings_repo.list_all_user_cleanings(requesting_user=current_user)@router.get("/{cleaning_id}/", response_model=CleaningPublic, name="cleanings:get-cleaning-by-id")async def get_cleaning_by_id(cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path)) -> CleaningPublic: return cleaning@router.put( "/{cleaning_id}/", response_model=CleaningPublic, name="cleanings:update-cleaning-by-id", dependencies=[Depends(check_cleaning_modification_permissions)],)async def update_cleaning_by_id( cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path), cleaning_update: CleaningUpdate = Body(..., embed=True), cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),) -> CleaningPublic: return await cleanings_repo.update_cleaning(cleaning=cleaning, cleaning_update=cleaning_update)@router.delete( "/{cleaning_id}/", response_model=int, name="cleanings:delete-cleaning-by-id", dependencies=[Depends(check_cleaning_modification_permissions)],)async def delete_cleaning_by_id( cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path), cleanings_repo: CleaningsRepository = Depends(get_repository(CleaningsRepository)),) -> int: return await cleanings_repo.delete_cleaning_by_id(cleaning=cleaning)
Well, look at that. For each of our route functions, the body is a simple one-liner. Most of the work is being done by our dependencies. In fact, we've even added dependencies in the route decorator for our update and delete endpoints. The FastAPI docs provide more details on that pattern.
Our check_cleaning_modification_permissions
dependency ensures that the user has sufficient permission to update or delete a cleaning, so we don't have to do that ourselves.
Run the tests now and see that they're all still passing.
It's comforting to have all these tests in place when we refactor. Now we can be confident that our code is working as we expect it to even when we make large refactors such as this one. And this is a big improvement. We have much less code duplication and we've extracted permissions into its own system.
And there we have it! Refactoring is sometimes the least fun part of TDD, but it's essential. We'll do it a few more times before this series is over.
Wrapping Up and Resources
We've now set ourselves up to get into the meat of our application's functionality. In the next post, we'll give users the ability to offer their services for a cleaning job and let owners accept or reject a given offer.
However, now it's time for a break.
- FastAPI docs on dependecies in path decorators
- FastAPI Permissions repo - library that extracts permission handling into a system that mirrors the pyramid framework
- FastAPI Contrib repo - library with built-in permissions system
- Real World FastAPI repo - inspiration for a lot of the permissions code seen in this post
Github Repo
All code up to this point can be found here: