Populating Cleaning Jobs with User Offers in FastAPI

undraw svg icon

Welcome to Part 25 of Up and Running with FastAPI. If you missed part 24, you can find it here.

This series is focused on building a full-stack application with the FastAPI framework. The app allows users to post requests to have their residence cleaned, and other users can select a cleaning project for a given hourly rate.

Up And Running With FastAPI

Our last post saw us finish transitioning our UI logic into custom hooks. With much of the tedious labor out of the way, our React app is nearing it's final form. That's right. We're in the endgame now.

But we shouldn't count our chickens before they hatch. There's still work to be done. In particular, cleaning jobs being sent from our FastAPI backend are missing offers. Along with that, we've also removed most of the offer-related UI logic. We'll need to patch both of those issues. The UI stuff we'll put off for another day.

On the API side of things, we could return to our old method of fetching offers for a cleaning job when a given component is mounted. However, it probably makes more sense to populate cleaning jobs with their associated offers when they're sent to the frontend. So for the time being, we'll leave the world of React hooks behind and head to FastAPI land for some backend updates.

Updating the CleaningPublic Model

We've populated models with associated data before, so let's start by naively assuming that previously established pattern will work as is.

Since we're adding offers to a cleaning job, go ahead and import the OfferPublic model into the models/cleaning.py file.

models/cleaning.py
from typing import Optional, Union, List
from enum import Enum
from app.models.core import IDModelMixin, DateTimeModelMixin, CoreModel
from app.models.user import UserPublic
from app.models.offer import OfferPublic
# ...other code
class CleaningPublic(CleaningInDB):
owner: Union[int, UserPublic]
total_offers: Optional[int]
offers: List[OfferPublic] = []

Uh oh. As soon as we save the file, we see an error message printed to the terminal:

server_1 | ImportError: cannot import name 'CleaningPublic' from partially initialized module 'app.models.cleaning' (most likely due to a circular import) (./app/models/cleaning.py)

This is the well-documented circular import problem with Pydantic models (for the uninitiated, this is actually a python thing and not simply a hurdle specific to Pydantic). See discussions about this issue here, here, and here.

The reason we're seeing the error is that we're importing the CleaningPublic model in our models/offer.py file and importing the OfferPublic model into the models/cleaning.py file. A dependency graph generated for our project would now have a cycle that looks like this:

There are a few valid ways to address this issue. Let's see a few of them in action and then make a decision about which one makes the most sense.

For the first approach, we'll use the Pydantic docs on self-referencing models to guide our thinking. The page sheds some insight on our predicament and provides us with postponed-annotations as the core solution.

Let's see how that would look:

models/cleaning.py
class CleaningPublic(CleaningInDB):
owner: Union[int, UserPublic]
total_offers: Optional[int]
offers: List["OfferPublic"] = []
from app.models.offer import OfferPublic
CleaningPublic.update_forward_refs()

That's pretty yucky, but it works. Beyond the fact that moving the import statement to the bottom of the file violates PEP-8, we're also using we're refering to the not-yet-constructed OfferPublic model using a string. Then, we import the OfferPublic model itself and call update_forward_refs so that Pydantic knows to prepare any ForwardRef types existing in the model. The string syntax seen here - List["OfferPublic"] - signals to Pydantic to internally resolve the field (string or ForwardRef) into a type object.

Now, with python 3.7, we can get rid of that ugly string syntax as long as we import annotations from the __future__.

models/cleaning.py
from __future__ import annotations
from typing import List, Optional, Union
from enum import Enum
from app.models.core import IDModelMixin, DateTimeModelMixin, CoreModel
from app.models.user import UserPublic
# ... other code
class CleaningPublic(CleaningInDB):
owner: Union[int, UserPublic]
total_offers: Optional[int]
offers: List[OfferPublic] = []
from app.models.offer import OfferPublic # noqa
CleaningPublic.update_forward_refs()

Better, but still not great.

Another solution would involve importing the non-public models from models/cleaning.py and models/offer.py into a single file (like models/public.py or something) and then handling interrelated references there. That will solve the circular import problem, but doesn't seem very clean and would require updating imports in most of our files.

models/public.py
from __future__ import annotations
from typing import List, Optional, Union
from app.models.user import UserPublic
from app.models.cleaning import CleaningInDB
from app.models.offer import OfferInDB
class CleaningPublic(CleaningInDB):
owner: Union[int, UserPublic]
total_offers: Optional[int]
offers: List[OfferPublic] = []
class OfferPublic(OfferInDB):
user: Optional[UserPublic]
cleaning: Optional[CleaningPublic]
CleaningPublic.update_forward_refs()

Not the biggest fan of this approach either, but it also works just fine.

The last proposal we'll consider is simply removing the cleaning attribute containing our CleaningPublic model from the OfferPublic model. Does an offer really need to have the full cleaning object attached to it?

This approach would fix our circular import problem, but it forces us to change the way our application works. There's definitely an argument to be made about removing circular dependencies whenever possible, though at the end of the day it's really up to us to decide what best fits our use case.

For this series, we're going to stick with our modified first approach using postponed-annotations:

models/cleaning.py
from __future__ import annotations
from typing import List, Optional, Union
from enum import Enum
from app.models.core import IDModelMixin, DateTimeModelMixin, CoreModel
from app.models.user import UserPublic
# ... other code
class CleaningPublic(CleaningInDB):
owner: Union[int, UserPublic]
total_offers: Optional[int]
offers: List[OfferPublic] = []
from app.models.offer import OfferPublic # noqa E402
CleaningPublic.update_forward_refs()

Unfortunately, that small change broke our tests! Run them and see what's broken.

docker ps
docker exec -it [CONTAINER_ID] bash
pytest -vv

We'll need to go in and modify a few things to support these extra attributes on our CleaningPublic model.

First, modify the repositories/cleanings.py file with a simple fix to the update_cleaning method:

repositories/cleanings.py
# ...other code
class CleaningsRepository(BaseRepository):
"""
All database actions associated with the Cleaning resource
"""
# ...other code
async def update_cleaning(
self,
*,
cleaning: CleaningInDB,
cleaning_update: CleaningUpdate,
) -> CleaningInDB:
cleaning_update_params = cleaning.copy(
update=cleaning_update.dict(exclude_unset=True),
)
if cleaning_update_params.cleaning_type is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid cleaning type. Cannot be None.",
)
updated_cleaning = await self.db.fetch_one(
query=UPDATE_CLEANING_BY_ID_QUERY,
values=cleaning_update_params.dict(
exclude={
"owner",
"offers",
"total_offers",
"created_at",
"updated_at",
},
),
)
return CleaningInDB(**updated_cleaning)
# ...other code

We're excluding the two new attributes - offers and total_offers - from the dictionary we use to update a cleaning record, which should fix most of the broken tests. Run them again and watch them pass.

Great, now all that's left is to fix the test_get_cleaning_by_id test as well.

test_cleanings.py
# ...other code
class TestGetCleaning:
async def test_get_cleaning_by_id(
self,
app: FastAPI,
authorized_client: AsyncClient,
test_cleaning: CleaningInDB,
) -> None:
res = await authorized_client.get(
app.url_path_for(
"cleanings:get-cleaning-by-id",
cleaning_id=test_cleaning.id,
)
)
assert res.status_code == status.HTTP_200_OK
cleaning = CleaningPublic(**res.json()).dict(
exclude={"owner", "offers", "total_offers"},
)
assert cleaning == test_cleaning.dict(exclude={"owner"})
# ...other code

And all our tests should now be passing! With that exploration out of the way, it's time to move on to actually populating cleaning jobs with the proper offers.

Populating Cleaning Jobs with their Offers

We'll start with some new tests. Add this fixture to the tests/conftest.py file:

test_cleanings.py
# ...other code
@pytest.fixture
async def test_list_of_cleanings_with_pending_offers(
db: Database, test_user: UserInDB, test_user_list: List[UserInDB]
) -> List[CleaningInDB]:
cleaning_repo = CleaningsRepository(db)
offers_repo = OffersRepository(db)
cleanings = []
for i in range(5):
created_cleaning = await cleaning_repo.create_cleaning(
new_cleaning=CleaningCreate(
name=f"test cleaning with offers - {i}",
description=f"test desc for cleaning with offers - {i}",
price=float(f"{i}9.99"),
cleaning_type="spot_clean",
),
requesting_user=test_user,
)
for user in test_user_list:
await offers_repo.create_offer_for_cleaning(
new_offer=OfferCreate(cleaning_id=created_cleaning.id, user_id=user.id)
)
cleanings.append(created_cleaning)
return cleanings
# ...other code

Next, let's add two new test cases in our tests/test_cleanings.py file that use this new fixture:

test_cleanings.py
from typing import List, Dict, Union, Optional, Callable
# ...other code
class TestPopulatedCleanings:
async def test_user_owned_cleanings_are_populated_with_correct_offers(
self,
app: FastAPI,
create_authorized_client: Callable,
test_user: UserInDB,
test_user_list: List[UserInDB],
test_list_of_cleanings_with_pending_offers: List[CleaningInDB],
) -> None:
authorized_client = create_authorized_client(user=test_user)
test_user_ids = [u.id for u in test_user_list]
test_cleaning_ids = [c.id for c in test_list_of_cleanings_with_pending_offers]
res = await authorized_client.get(
app.url_path_for("cleanings:list-all-user-cleanings"),
)
assert res.status_code == status.HTTP_200_OK
cleanings = [CleaningPublic(**c) for c in res.json()]
for c in cleanings:
if c.id in test_cleaning_ids:
# ensure that there are the correct number of offers
assert len(c.offers) == len(test_user_list)
assert c.total_offers == len(test_user_list)
# ensure that the offers are valid
for offer in c.offers:
assert offer.user_id in test_user_ids
assert offer.user_id != c.owner
assert offer.cleaning_id == c.id
async def test_public_cleaning_jobs_list_number_of_total_offers(
self,
app: FastAPI,
create_authorized_client: Callable,
test_user2: UserInDB, # not owner or offer maker
test_list_of_cleanings_with_pending_offers: List[CleaningInDB],
test_user_list: List[UserInDB],
) -> None:
authorized_client = create_authorized_client(user=test_user2)
test_cleaning = test_list_of_cleanings_with_pending_offers[0]
res = await authorized_client.get(
app.url_path_for(
"cleanings:get-cleaning-by-id",
cleaning_id=test_cleaning.id,
)
)
assert res.status_code == status.HTTP_200_OK
cleaning = CleaningPublic(**res.json())
assert cleaning.total_offers > 0
# one offer for each user
assert cleaning.total_offers == len(test_user_list)
# but no actual offers are included
assert cleaning.offers == []
# ...other code

Here we're first testing that when a user requests their own cleanings, each cleaning comes back populated with all of its associated offers. We check that the numbers of offers is correct, and that they are associated with the proper user and cleaning job.

The second test case looks at requesting info about a single cleaning job from a user who is not the owner of the cleaning job, nor one of the users making an offer for that job. We ensure that the cleaning job comes back with information about the total number of offers made for the job, but none of the actual offers themselves.

Run those tests and watch them fail.

To get them passing, we can now follow the same strategy we've used before when populating models.

Start by adding the OffersRepository to the CleaningsRepository:

repositories/cleanings.py
# ...other code
from app.db.repositories.base import BaseRepository
from app.db.repositories.users import UsersRepository
from app.db.repositories.offers import OffersRepository
# ...other code
class CleaningsRepository(BaseRepository):
"""
All database actions associated with the Cleaning resource
"""
def __init__(self, db: Database) -> None:
super().__init__(db)
self.users_repo = UsersRepository(db)
self.offers_repo = OffersRepository(db)
# ...other code

Then, add a populate parameter to the list_all_user_cleanings method and populate each cleaning if that parameter is True.

repositories/cleanings.py
# ...other code
class CleaningsRepository(BaseRepository):
"""
All database actions associated with the Cleaning resource
"""
# ...other code
async def list_all_user_cleanings(
self,
*,
requesting_user: UserInDB,
populate: bool = True,
) -> List[Union[CleaningInDB, CleaningPublic]]:
cleaning_records = await self.db.fetch_all(
query=LIST_ALL_USER_CLEANINGS_QUERY,
values={"owner": requesting_user.id},
)
cleanings = [CleaningInDB(**l) for l in cleaning_records]
if populate:
return [
await self.populate_cleaning(
cleaning=cleaning,
requesting_user=requesting_user,
populate_offers=True,
)
for cleaning in cleanings
]
return cleanings
# ...other code

Now you'll notice here that we're calling the populate_cleaning method with a new parameter called populate_offers. When True, this should signify that we want to add the actual offer objects themselves to the cleaning.

Let's update the populate_cleaning method now to support our new feature.

repositories/cleanings.py
# ...other code
class CleaningsRepository(BaseRepository):
"""
All database actions associated with the Cleaning resource
"""
# ...other code
async def populate_cleaning(
self,
*,
cleaning: CleaningInDB,
requesting_user: UserInDB = None,
populate_offers: bool = False,
) -> CleaningPublic:
"""
Cleaning models are populated with the owner
and total number of offers made for it.
If the user is the owner of the cleaning, offers are included by default.
Otherwise, only include an offer made by the requesting user - if it exists
"""
offers = await self.offers_repo.list_offers_for_cleaning(
cleaning=cleaning,
populate=populate_offers,
requesting_user=requesting_user,
)
return CleaningPublic(
**cleaning.dict(exclude={"owner"}),
owner=await self.users_repo.get_user_by_id(user_id=cleaning.owner),
total_offers=len(offers),
# full offers if `populate_offers` is specified,
# otherwise only the offer from the authed user
offers=offers if populate_offers else [
offer for offer in [
await self.offers_repo.get_offer_for_cleaning_from_user(
cleaning=cleaning,
user=requesting_user,
)
] if offer
],
# any other populated fields for cleaning public would be tacked on here
)
# ...other code

We're now fetching offers for a cleaning job whenever we want to populate one. Doing so makes it easy to set the total_offers attribute on our cleaning job, and include the full offers list itself if populate_offers is True. If populate_offers is not specified, then we only include the offer made by the authenticated user - if one exists.

This is mildly inefficient, since we only need to know the number of offers when populate_offers is False. At the moment, it's not too concerning. When the time comes, we'll add that to the list of things we want to optimize.

Run those tests again and the should all be passing! Except one.

We'll also need to update our existing test case for users requesting their own cleanings now that we're populating them by default and returning full CleaningPublic models populate with their owners.

Make the following updates:

test_cleanings.py
# ...other code
class TestGetCleaning:
# ...other code
async def test_get_all_cleanings_returns_only_user_owned_cleanings(
self,
app: FastAPI,
authorized_client: AsyncClient,
test_user: UserInDB,
test_cleaning: CleaningInDB,
test_cleanings_list: List[CleaningInDB],
) -> None:
res = await authorized_client.get(
app.url_path_for("cleanings:list-all-user-cleanings")
)
assert res.status_code == status.HTTP_200_OK
assert isinstance(res.json(), list)
assert len(res.json()) > 0
cleanings = [CleaningPublic(**c) for c in res.json()]
cleaning_ids = [c.id for c in cleanings]
# check that a cleaning created by our user is returned
assert test_cleaning.id in cleaning_ids
# test that all cleanings returned are owned by this user
for cleaning in cleanings:
assert cleaning.owner.id == test_user.id
# assert all cleanings created by another user not included (redundant, but fine)
assert all(c.id not in cleaning_ids for c in test_cleanings_list)
# ...other code

Ahh much better. All green tests now!

We're almost there. There are still a few changes we should make on the backend to ensure that offers owned by the authenticated user are being populated as well. Starting with the OffersRepository, make the following updates.

repositories/offers.py
# ...other code
MARK_OFFER_COMPLETED_QUERY = """
UPDATE user_offers_for_cleanings
SET status = 'completed'
WHERE cleaning_id = :cleaning_id
AND user_id = :user_id
RETURNING *;
"""
class OffersRepository(BaseRepository):
def __init__(self, db: Database) -> None:
super().__init__(db)
self.users_repo = UsersRepository(db)
async def create_offer_for_cleaning(
self,
*,
new_offer: OfferCreate,
requesting_user: UserInDB = None,
) -> OfferPublic:
created_offer = await self.db.fetch_one(
query=CREATE_OFFER_FOR_CLEANING_QUERY,
values={**new_offer.dict(), "status": "pending"},
)
return OfferPublic(**created_offer, user=requesting_user)
async def list_offers_for_cleaning(
self,
*,
cleaning: CleaningInDB,
populate: bool = True,
) -> Union[List[OfferInDB], List[OfferPublic]]:
offer_records = await self.db.fetch_all(
query=LIST_OFFERS_FOR_CLEANING_QUERY,
values={"cleaning_id": cleaning.id},
)
offers = [OfferInDB(**o) for o in offer_records]
if populate:
return [await self.populate_offer(offer=offer) for offer in offers]
return offers
async def get_offer_for_cleaning_from_user(
self,
*,
cleaning: CleaningInDB,
user: UserInDB,
) -> Optional[OfferPublic]:
offer_record = await self.db.fetch_one(
query=GET_OFFER_FOR_CLEANING_FROM_USER_QUERY,
values={"cleaning_id": cleaning.id, "user_id": user.id},
)
if not offer_record:
return None
return OfferPublic(**offer_record, user=user)
async def accept_offer(
self,
*,
offer: OfferInDB,
offer_update: OfferUpdate,
) -> OfferPublic:
async with self.db.transaction():
accepted_offer = await self.db.fetch_one(
query=ACCEPT_OFFER_QUERY, # accept current offer
values={"cleaning_id": offer.cleaning_id, "user_id": offer.user_id},
)
await self.db.execute(
query=REJECT_ALL_OTHER_OFFERS_QUERY, # reject all other offers
values={"cleaning_id": offer.cleaning_id, "user_id": offer.user_id},
)
return await self.populate_offer(offer=OfferInDB(**accepted_offer))
async def cancel_offer(
self,
*,
offer: OfferInDB,
offer_update: OfferUpdate,
) -> OfferPublic:
async with self.db.transaction():
cancelled_offer = await self.db.fetch_one(
query=CANCEL_OFFER_QUERY, # cancel current offer
values={"cleaning_id": offer.cleaning_id, "user_id": offer.user_id},
)
await self.db.execute(
query=SET_ALL_OTHER_OFFERS_AS_PENDING_QUERY, # set all other offers to pending again
values={"cleaning_id": offer.cleaning_id, "user_id": offer.user_id},
)
return await self.populate_offer(offer=OfferInDB(**cancelled_offer))
async def rescind_offer(self, *, offer: OfferInDB) -> int:
return await self.db.execute(
query=RESCIND_OFFER_QUERY, # rescinding an offer deletes it as long as it's pending
values={"cleaning_id": offer.cleaning_id, "user_id": offer.user_id},
)
async def mark_offer_completed(
self,
*,
cleaning: CleaningInDB,
cleaner: UserInDB,
) -> OfferPublic:
offer_record = await self.db.fetch_one(
query=MARK_OFFER_COMPLETED_QUERY, # owner of cleaning marks job status as completed
values={"cleaning_id": cleaning.id, "user_id": cleaner.id},
)
return OfferPublic(**offer_record, user=cleaner)
async def populate_offer(self, *, offer: OfferInDB) -> OfferPublic:
return OfferPublic(
**offer.dict(),
user=await self.users_repo.get_user_by_id(user_id=offer.user_id),
# could populate cleaning here as well if needed
)
# ...other code

This might look like a lot of changes, but all we're really doing is returning OfferPublic instance in places where we originally had an OfferInDB instance and then adding a full user instance to it. In places where we already have access to authenticated user we simply add them to the OfferPublic instance. Otherwise, we call the populate_offer method to handle that for us.

We'll need to make one small change to the routes/offers.py file as well, as at the moment we aren't passing the anything to the requesting_user parameter in our create_offer_for_cleaning method.

routes/offers.py
# ...other code
@router.post(
"/",
response_model=OfferPublic,
name="offers:create-offer",
status_code=status.HTTP_201_CREATED,
dependencies=[Depends(check_offer_create_permissions)],
)
async def create_offer(
cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path),
current_user: UserInDB = Depends(get_current_active_user),
offers_repo: OffersRepository = Depends(get_repository(OffersRepository)),
) -> OfferPublic:
return await offers_repo.create_offer_for_cleaning(
new_offer=OfferCreate(cleaning_id=cleaning.id, user_id=current_user.id),
requesting_user=current_user,
)
# ...other code

Nice! Now we have populated offers pretty much everywhere we expect them to be.

As usual, with modifications to our repositories come broken tests. And of course, running our tests show exactly that. Make the following changes to the tests/conftest.py file to fix things most of them temporarily.

conftest.py
# ...other code
@pytest.fixture
async def test_cleaning_with_offers(
db: Database,
test_user2: UserInDB,
test_user_list: List[UserInDB]
) -> CleaningInDB:
cleaning_repo = CleaningsRepository(db)
offers_repo = OffersRepository(db)
new_cleaning = CleaningCreate(
name="cleaning with offers",
description="desc for cleaning",
price=9.99,
cleaning_type="full_clean",
)
created_cleaning = await cleaning_repo.create_cleaning(
new_cleaning=new_cleaning, requesting_user=test_user2
)
for user in test_user_list:
await offers_repo.create_offer_for_cleaning(
new_offer=OfferCreate(cleaning_id=created_cleaning.id, user_id=user.id),
requesting_user=user,
)
return created_cleaning
@pytest.fixture
async def test_cleaning_with_accepted_offer(
db: Database,
test_user2: UserInDB,
test_user3: UserInDB,
test_user_list: List[UserInDB],
) -> CleaningInDB:
cleaning_repo = CleaningsRepository(db)
offers_repo = OffersRepository(db)
new_cleaning = CleaningCreate(
name="cleaning with offers",
description="desc for cleaning",
price=9.99,
cleaning_type="full_clean",
)
created_cleaning = await cleaning_repo.create_cleaning(
new_cleaning=new_cleaning, requesting_user=test_user2
)
offers = []
for user in test_user_list:
offers.append(
await offers_repo.create_offer_for_cleaning(
new_offer=OfferCreate(
cleaning_id=created_cleaning.id,
user_id=user.id,
),
requesting_user=user,
)
)
await offers_repo.accept_offer(
offer=[
o for o in offers
if o.user_id == test_user3.id
][0],
offer_update=OfferUpdate(status="accepted"),
)
return created_cleaning
@pytest.fixture
async def test_list_of_cleanings_with_pending_offers(
db: Database,
test_user: UserInDB,
test_user_list: List[UserInDB],
) -> List[CleaningInDB]:
cleaning_repo = CleaningsRepository(db)
offers_repo = OffersRepository(db)
cleanings = []
for i in range(5):
created_cleaning = await cleaning_repo.create_cleaning(
new_cleaning=CleaningCreate(
name=f"test cleaning with offers - {i}",
description=f"test desc for cleaning with offers - {i}",
price=float(f"{i}9.99"),
cleaning_type="spot_clean",
),
requesting_user=test_user,
)
for user in test_user_list:
await offers_repo.create_offer_for_cleaning(
new_offer=OfferCreate(
cleaning_id=created_cleaning.id,
user_id=user.id,
),
requesting_user=user,
)
cleanings.append(created_cleaning)
return cleanings
async def create_cleaning_with_evaluated_offer_helper(
db: Database,
owner: UserInDB,
cleaner: UserInDB,
cleaning_create: CleaningCreate,
evaluation_create: EvaluationCreate,
) -> CleaningInDB:
cleaning_repo = CleaningsRepository(db)
offers_repo = OffersRepository(db)
evals_repo = EvaluationsRepository(db)
created_cleaning = await cleaning_repo.create_cleaning(
new_cleaning=cleaning_create,
requesting_user=owner,
)
offer = await offers_repo.create_offer_for_cleaning(
new_offer=OfferCreate(cleaning_id=created_cleaning.id, user_id=cleaner.id),
requesting_user=cleaner,
)
await offers_repo.accept_offer(
offer=offer, offer_update=OfferUpdate(status="accepted"),
)
await evals_repo.create_evaluation_for_cleaner(
evaluation_create=evaluation_create,
cleaning=created_cleaning,
cleaner=cleaner,
)
return created_cleaning
# ...other code

Ah. That wasn't so bad. All we did was make sure that every call to offers_repo.create_offer_for_cleaning also included the user making the request.

And those tests should now be passing again.

Let's finish up this post by also populating any cleaning jobs owned by the authenticated user.

Open up the repositories/cleanings.py file and update the CleaningsRepository like so:

cleanings.py
# ...other code
class CleaningsRepository(BaseRepository):
"""
All database actions associated with the Cleaning resource
"""
# ...other code
async def create_cleaning(
self,
*,
new_cleaning: CleaningCreate,
requesting_user: UserInDB,
) -> CleaningPublic:
cleaning_record = await self.db.fetch_one(
query=CREATE_CLEANING_QUERY,
values={**new_cleaning.dict(), "owner": requesting_user.id},
)
return CleaningPublic(**cleaning_record, total_offers=0)
# ...other code
async def update_cleaning(
self,
*,
cleaning: CleaningInDB,
cleaning_update: CleaningUpdate,
) -> CleaningPublic:
cleaning_update_params = cleaning.copy(
update=cleaning_update.dict(exclude_unset=True)
)
if cleaning_update_params.cleaning_type is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid cleaning type. Cannot be None.",
)
updated_cleaning = await self.db.fetch_one(
query=UPDATE_CLEANING_BY_ID_QUERY,
values=cleaning_update_params.dict(
exclude={
"owner",
"offers",
"total_offers",
"created_at",
"updated_at",
},
),
)
return await self.populate_cleaning(
cleaning=CleaningInDB(**updated_cleaning),
populate_offers=True,
)
# ...other code
# ...other code

Not too much to see here either. We're returning a CleaningPublic instance from our create_cleaning method and explicitly setting total_offers to 0, as it's not possible for a new cleaning job to already have offers. And in the update_cleaning method, we call our populate_cleaning method to attach offers to it. That should be totally fine, as we know the authenticated user is the owner of this cleaning job.

Just like before, our tests are breaking again. All that's left is to make some simple fixes to tests/test_cleanings.py and we can be on merry way:

cleanings.py
# ...other code
class TestGetCleaning:
async def test_get_cleaning_by_id(
self,
app: FastAPI,
authorized_client: AsyncClient,
test_cleaning: CleaningInDB,
) -> None:
res = await authorized_client.get(
app.url_path_for(
"cleanings:get-cleaning-by-id",
cleaning_id=test_cleaning.id,
)
)
assert res.status_code == status.HTTP_200_OK
cleaning = CleaningPublic(**res.json()).dict(
exclude={"owner", "offers", "total_offers"},
)
assert cleaning == test_cleaning.dict(
exclude={"owner", "offers", "total_offers"},
)
# ...other code
# ...other code
class TestUpdateCleaning:
@pytest.mark.parametrize(
"attrs_to_change, values",
(
(["name"], ["new fake cleaning name"]),
(["description"], ["new fake cleaning description"]),
(["price"], [3.14]),
(["cleaning_type"], ["full_clean"]),
(
["name", "description"],
[
"extra new fake cleaning name",
"extra new fake cleaning description",
]
),
(["price", "cleaning_type"], [42.00, "dust_up"]),
),
)
async def test_update_cleaning_with_valid_input(
self,
app: FastAPI,
authorized_client: AsyncClient,
test_cleaning: CleaningInDB,
attrs_to_change: List[str],
values: List[str],
) -> None:
cleaning_update = {
"cleaning_update": {
attrs_to_change[i]: values[i] for i in range(len(attrs_to_change))
}
}
res = await authorized_client.put(
app.url_path_for(
"cleanings:update-cleaning-by-id",
cleaning_id=test_cleaning.id,
),
json=cleaning_update,
)
assert res.status_code == status.HTTP_200_OK
updated_cleaning = CleaningPublic(**res.json())
assert updated_cleaning.id == test_cleaning.id # make sure it's the same cleaning
# make sure that any attribute we updated has changed to the correct value
for i in range(len(attrs_to_change)):
updated_attr = getattr(updated_cleaning, attrs_to_change[i])
assert updated_attr != getattr(test_cleaning, attrs_to_change[i])
assert updated_attr == values[i]
# make sure that no other attributes' values have changed
for attr, value in updated_cleaning.dict(
exclude={"owner", "offers", "total_offers"}
).items():
if attr not in attrs_to_change and attr != "updated_at":
assert getattr(test_cleaning, attr) == value
async def test_user_recieves_error_if_updating_other_users_cleaning(
self,
app: FastAPI,
authorized_client: AsyncClient,
test_cleanings_list: List[CleaningInDB],
) -> None:
res = await authorized_client.put(
app.url_path_for(
"cleanings:update-cleaning-by-id",
cleaning_id=test_cleanings_list[0].id,
),
json={"cleaning_update": {"price": 99.99}},
)
assert res.status_code == status.HTTP_403_FORBIDDEN
async def test_user_cant_change_ownership_of_cleaning(
self,
app: FastAPI,
authorized_client: AsyncClient,
test_cleaning: CleaningInDB,
test_user: UserInDB,
test_user2: UserInDB,
) -> None:
res = await authorized_client.put(
app.url_path_for(
"cleanings:update-cleaning-by-id",
cleaning_id=test_cleaning.id,
),
json={"cleaning_update": {"owner": test_user2.id}},
)
assert res.status_code == status.HTTP_200_OK
cleaning = CleaningPublic(**res.json())
assert cleaning.owner == test_user
# ...other code
# ...other code

Three changes in total:

  • First, we once again update our test_get_cleaning_by_id test, this time ensuring that we exclude owner, offers, and total_offers from both dictionaries.
  • Next, since we're returning a CleaningPublic instance from our update_cleaning method, we modify the test_update_cleaning_with_valid_input test to parse the response with the CleaningPublic model. We then make sure to compare all attributes of our updated cleaning resource, except we also exclude owner, offers, and total_offers.
  • Finally, we update our test_user_cant_change_ownership_of_cleaning test to check equality for cleaning.owner and test_user, as they're both user objects instead of ids.

And if we run our tests again, they should all be passing.

Now we can head back to the UI and see how things are looking. But not today.

Wrapping Up and Resources

In this post we finished populating cleaning resources with offers that other users have made for those jobs. To do so, we had to explore the classic circular imports problem and make substantial updates to our OffersRepository and CleaningRepository classes. Fortunately, we had a robust testing suite that informed us when things were breaking and we made the necessary updates to remedy each of our issues.

With that out the way, the next step will be to update our frontend to take advantage of our new structure.

  • Pydantic discussion on circular imports
  • Another discussion about circular imports in Pydantic
  • One more Github discussion about handling circular imports in Pydantic
  • Pydantic docs on self-referencing models and postponed annotations