Marketplace Functionality in FastAPI

undraw svg icon

Welcome to Part 11 of Up and Running with FastAPI. If you missed part 10, you can find it here.

This series is focused on building a full-stack application with the FastAPI framework. The app allows users to post requests to have their residence cleaned, and other users can select a cleaning project for a given hourly rate.

Up And Running With FastAPI

Previously, we set up our database to gives users ownership over the cleaning resources they create. Now, we want to allow users to interact by giving them the ability to bid on jobs. We also want the owners to be able to approve and reject bids, just like a marketplace.

To do that, we're going to need to update our database quite a bit.

Migrations

Before we make any changes, let's roll back our migrations.

docker ps
docker exec -it [CONTAINER_ID] bash
alembic downgrade base

And now for our changes.

# ...other code
def create_offers_table() -> None:
op.create_table(
"user_offers_for_cleanings",
sa.Column(
"user_id", # 'user' is a reserved word in postgres, so going with user_id instead
sa.Integer,
sa.ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
index=True,
),
sa.Column(
"cleaning_id", # going with `cleaning_id` for consistency
sa.Integer,
sa.ForeignKey("cleanings.id", ondelete="CASCADE"),
nullable=False,
index=True,
),
sa.Column("status", sa.Text, nullable=False, server_default="pending", index=True),
*timestamps(),
)
op.create_primary_key("pk_user_offers_for_cleanings", "user_offers_for_cleanings", ["user_id", "cleaning_id"])
op.execute(
"""
CREATE TRIGGER update_user_offers_for_cleanings_modtime
BEFORE UPDATE
ON user_offers_for_cleanings
FOR EACH ROW
EXECUTE PROCEDURE update_updated_at_column();
"""
)
def upgrade() -> None:
create_updated_at_trigger()
create_users_table()
create_profiles_table()
create_cleanings_table()
create_offers_table()
def downgrade() -> None:
op.drop_table("user_offers_for_cleanings")
op.drop_table("cleanings")
op.drop_table("profiles")
op.drop_table("users")
op.execute("DROP FUNCTION update_updated_at_column")

Our new user_offers_for_cleanings table connects a user who wants to offer their services and the cleaning they're willing to work on. We also add a status column to indicate the state of the offer. If the owner of the cleaning has approved the cleaner for this job, the status will be "accepted". If another cleaner gets the job, the status will be "rejected". Until a decision has been made, the status will be "pending".

Then we create a primary key composed of the cleaning id and the cleaner's id. By using these two columns to create a primary key, we are ensuring that only unique combinations are allowed. So a user can only offer their services for a particular cleaning job once.

Many-to-Many relationships model situations where any number of items from the first table could be paired with any number of items from the second table. So how should we structure our endpoints? Does this live under the cleanings router or the users router? Does it get its own router?

We'll get to all that in a minute. First, let's tee up some models for our offers.

Offer Models

Go ahead and create a new file.

touch backend/app/models/offer.py

And then create a few new models.

models/offer.py
from typing import Optional, Union
from enum import Enum, auto
from pydantic import Field
from app.models.core import DateTimeModelMixin, CoreModel
from app.models.user import UserPublic
from app.models.cleaning import CleaningPublic
class OfferStatus(str, Enum):
accepted = "accepted"
rejected = "rejected"
pending = "pending"
cancelled = "cancelled"
class OfferBase(CoreModel):
user_id: Optional[int]
cleaning_id: Optional[int]
status: Optional[OfferStatus] = OfferStatus.pending
class OfferCreate(OfferBase):
user_id: int
cleaning_id: int
class OfferUpdate(CoreModel):
status: OfferStatus
class OfferInDB(DateTimeModelMixin, OfferBase):
pass
class OfferPublic(OfferInDB):
user: Union[int, UserPublic] = Field(..., alias="user_id")
cleaning: Union[int, CleaningPublic] = Field(..., alias="cleaning_id")
class Config:
allow_population_by_field_name = True

Ooh. Lot's of fun stuff happening here. We're specifying that the status of the offer can either by pending, accepted, rejected, or cancelled. We'll talk more about each state later on in the post. When an offer is created, we're specifying that only the user_id and cleaning_id should be set. We'll default the status to "pending" regardless. The status is also the only item we're allowing users to update as well.

Note that the OfferInDB does not have the IDModelMixin, as we're using a combination of the cleaning_id and user_id to construct the primary key in postgres. So we don't actually ever have an ID for this model. Don't worry, it won't change anything.

In the OfferPublic, we're also doing something interesting. The Config subclass specifies the allow_population_by_field_name property as True. So what does this do? It's probably easier to show than tell.

>>> offer = OfferPublic(user=5, cleaning=4)
>>> offer
OfferPublic(user=5, cleaning=4, status='pending')
>>> other_offer = OfferPublic(user_id=2, cleaning_id=5)
>>> other_offer
OfferPublic(user=2, cleaning=5, status='pending')

So we're indicating that it's ok to create an OfferPublic model using either user or user_id for specifying a user. Same goes for cleaning and cleaning_id. Optionally, we can attach the actual UserPublic or CleaningPublic models if we see fit.

Great. Now we can start writing some fancy new tests.

Setting Up the Offers Routes

Create a new test file.

touch backend/tests/test_offers.py

Then add a test class to the file to check that our offers routes exist.

tests/test_offers.py
from typing import List, Callable
import pytest
from httpx import AsyncClient
from fastapi import FastAPI, status
from databases import Database
from app.models.cleaning import CleaningCreate, CleaningInDB
from app.models.user import UserInDB
from app.models.offer import OfferCreate, OfferUpdate, OfferInDB, OfferPublic
from app.db.repositories.offers import OffersRepository
pytestmark = pytest.mark.asyncio
class TestOffersRoutes:
"""
Make sure all offers routes don't return 404s
"""
async def test_routes_exist(self, app: FastAPI, client: AsyncClient) -> None:
res = await client.post(app.url_path_for("offers:create-offer", cleaning_id=1))
assert res.status_code != status.HTTP_404_NOT_FOUND
res = await client.get(app.url_path_for("offers:list-offers-for-cleaning", cleaning_id=1))
assert res.status_code != status.HTTP_404_NOT_FOUND
res = await client.get(app.url_path_for("offers:get-offer-from-user", cleaning_id=1, username="bradpitt"))
assert res.status_code != status.HTTP_404_NOT_FOUND
res = await client.put(app.url_path_for("offers:accept-offer-from-user", cleaning_id=1, username="bradpitt"))
assert res.status_code != status.HTTP_404_NOT_FOUND
res = await client.put(app.url_path_for("offers:cancel-offer-from-user", cleaning_id=1))
assert res.status_code != status.HTTP_404_NOT_FOUND
res = await client.delete(app.url_path_for("offers:rescind-offer-from-user", cleaning_id=1))
assert res.status_code != status.HTTP_404_NOT_FOUND

We have some standard REST endpoints, except we have two PUT routes. We'll see why in a moment.

We run our tests and watch them fail. Getting these passing isn't too much trouble.

Let's make some dummy routes.

First, create the new file.

touch backend/app/api/routes/offers.py

Then add a few routes that return None.

api/routes/offers.py
from typing import List
from fastapi import APIRouter, Path, Body, status
from app.models.offer import OfferCreate, OfferUpdate, OfferInDB, OfferPublic
router = APIRouter()
@router.post("/", response_model=OfferPublic, name="offers:create-offer", status_code=status.HTTP_201_CREATED)
async def create_offer(offer_create: OfferCreate = Body(..., embed=True)) -> OfferPublic:
return None
@router.get("/", response_model=List[OfferPublic], name="offers:list-offers-for-cleaning")
async def list_offers_for_cleaning() -> OfferPublic:
return None
@router.get("/{username}/", response_model=OfferPublic, name="offers:get-offer-from-user")
async def get_offer_from_user(username: str = Path(..., min_length=3)) -> OfferPublic:
return None
@router.put("/{username}/", response_model=OfferPublic, name="offers:accept-offer-from-user")
async def accept_offer_from_user(username: str = Path(..., min_length=3)) -> OfferPublic:
return None
@router.put("/", response_model=OfferPublic, name="offers:cancel-offer-from-user")
async def cancel_offer_from_user() -> OfferPublic:
return None
@router.delete("/", response_model=int, name="offers:rescind-offer-from-user")
async def rescind_offer_from_user() -> OfferPublic:
return None

Now, the question here is how should we namespace our endpoints? We might think it'd be fine to just put them under /offers, but that's probably not the best idea.

A better approach might look something like this:

api/routes/__init__.py
from fastapi import APIRouter
from app.api.routes.cleanings import router as cleanings_router
from app.api.routes.users import router as users_router
from app.api.routes.profiles import router as profiles_router
from app.api.routes.offers import router as offers_router
router = APIRouter()
router.include_router(cleanings_router, prefix="/cleanings", tags=["cleanings"])
router.include_router(users_router, prefix="/users", tags=["users"])
router.include_router(profiles_router, prefix="/profiles", tags=["profiles"])
router.include_router(offers_router, prefix="/cleanings/{cleaning_id}/offers", tags=["offers"])

Notice how we're namespacing our offers_router under the prefix "/cleanings/{cleaning_id}/offers"? For every route in the offers_router, we'll have the cleaning_id path parameter available representing the id of a cleaning resource. By doing so, we've made it easy to reuse our get_cleaning_by_id_from_path dependency we created in our previous post.

Run the tests again and they should pass.

Time for more tests.

Creating Offers

An offer is when a user wishes to provide their services for another user's cleaning job. To test that this functionality works properly, we'll need a user with a cleaning job and a few different users who will bid on the job.

Head into the conftest.py file and update it with a few additional fixtures, along with a helper method for creating users.

tests/conftest.py
from typing import List, Callable
# ...other code
async def user_fixture_helper(*, db: Database, new_user: UserCreate) -> UserInDB:
user_repo = UsersRepository(db)
existing_user = await user_repo.get_user_by_email(email=new_user.email)
if existing_user:
return existing_user
return await user_repo.register_new_user(new_user=new_user)
@pytest.fixture
async def test_user(db: Database) -> UserInDB:
new_user = UserCreate(email="lebron@james.io", username="lebronjames", password="heatcavslakers")
return await user_fixture_helper(db=db, new_user=new_user)
@pytest.fixture
async def test_user2(db: Database) -> UserInDB:
new_user = UserCreate(email="serena@williams.io", username="serenawilliams", password="tennistwins")
return await user_fixture_helper(db=db, new_user=new_user)
@pytest.fixture
async def test_user3(db: Database) -> UserInDB:
new_user = UserCreate(email="brad@pitt.io", username="bradpitt", password="adastra")
return await user_fixture_helper(db=db, new_user=new_user)
@pytest.fixture
async def test_user4(db: Database) -> UserInDB:
new_user = UserCreate(email="jennifer@lopez.io", username="jlo", password="jennyfromtheblock")
return await user_fixture_helper(db=db, new_user=new_user)
@pytest.fixture
async def test_user5(db: Database) -> UserInDB:
new_user = UserCreate(email="bruce@lee.io", username="brucelee", password="martialarts")
return await user_fixture_helper(db=db, new_user=new_user)
@pytest.fixture
async def test_user6(db: Database) -> UserInDB:
new_user = UserCreate(email="kal@penn.io", username="kalpenn", password="haroldandkumar")
return await user_fixture_helper(db=db, new_user=new_user)
@pytest.fixture
async def test_user_list(
test_user3: UserInDB, test_user4: UserInDB, test_user5: UserInDB, test_user6: UserInDB,
) -> List[UserInDB]:
return [test_user3, test_user4, test_user5, test_user6]
# ...other code

We're extracting the logic required for fetching or creating a user from our database into its own helper and making 6 test users in total.

On top of that, we should create a helper fixture that constructs an authorized_client for any user we pass in. This will help us test authorized requests from multiple different users.

conftest.py
@pytest.fixture
def create_authorized_client(client: AsyncClient) -> Callable:
def _create_authorized_client(*, user: UserInDB) -> AsyncClient:
access_token = auth_service.create_access_token_for_user(user=user, secret_key=str(SECRET_KEY))
client.headers = {
**client.headers,
"Authorization": f"{JWT_TOKEN_PREFIX} {access_token}",
}
return client
return _create_authorized_client

Now in our test_offers file we'll be able to have users create offers for a given cleaning resource.

Let's start with the TestCreateOffers class.

test/test_offers.py
# ...other code
class TestCreateOffers:
async def test_user_can_successfully_create_offer_for_other_users_cleaning_job(
self, app: FastAPI, create_authorized_client: Callable, test_cleaning: CleaningInDB, test_user3: UserInDB,
) -> None:
authorized_client = create_authorized_client(user=test_user3)
res = await authorized_client.post(app.url_path_for("offers:create-offer", cleaning_id=test_cleaning.id))
assert res.status_code == status.HTTP_201_CREATED
offer = OfferPublic(**res.json())
assert offer.user_id == test_user3.id
assert offer.cleaning_id == test_cleaning.id
assert offer.status == "pending"
async def test_user_cant_create_duplicate_offers(
self, app: FastAPI, create_authorized_client: Callable, test_cleaning: CleaningInDB, test_user4: UserInDB,
) -> None:
authorized_client = create_authorized_client(user=test_user4)
res = await authorized_client.post(app.url_path_for("offers:create-offer", cleaning_id=test_cleaning.id))
assert res.status_code == status.HTTP_201_CREATED
res = await authorized_client.post(app.url_path_for("offers:create-offer", cleaning_id=test_cleaning.id))
assert res.status_code == status.HTTP_400_BAD_REQUEST
async def test_user_unable_to_create_offer_for_their_own_cleaning_job(
self, app: FastAPI, authorized_client: AsyncClient, test_user: UserInDB, test_cleaning: CleaningInDB,
) -> None:
res = await authorized_client.post(app.url_path_for("offers:create-offer", cleaning_id=test_cleaning.id))
assert res.status_code == status.HTTP_400_BAD_REQUEST
async def test_unauthenticated_users_cant_create_offers(
self, app: FastAPI, client: AsyncClient, test_cleaning: CleaningInDB,
) -> None:
res = await client.post(app.url_path_for("offers:create-offer", cleaning_id=test_cleaning.id))
assert res.status_code == status.HTTP_401_UNAUTHORIZED
@pytest.mark.parametrize(
"id, status_code", ((5000000, 404), (-1, 422), (None, 422)),
)
async def test_wrong_id_gives_proper_error_status(
self, app: FastAPI, create_authorized_client: Callable, test_user5: UserInDB, id: int, status_code: int,
) -> None:
authorized_client = create_authorized_client(user=test_user5)
res = await authorized_client.post(app.url_path_for("offers:create-offer", cleaning_id=id))
assert res.status_code == status_code

Five new tests here.

We're ensuring that users can't create offers for their own cleaning jobs, that unauthenticated user can't create offers, and that users can't create more than one offer for a given cleaning job. We also make sure the expected response is returned with properly structured requests and that we get the right error code when things are off.

To get those passing, open up the routes file again.

api/routes/offers.py
from typing import List
from fastapi import APIRouter, HTTPException, Depends, Path, status
from app.models.offer import OfferCreate, OfferUpdate, OfferInDB, OfferPublic
from app.models.user import UserInDB
from app.models.cleaning import CleaningInDB
from app.db.repositories.offers import OffersRepository
from app.api.dependencies.database import get_repository
from app.api.dependencies.auth import get_current_active_user
from app.api.dependencies.cleanings import get_cleaning_by_id_from_path
router = APIRouter()
@router.post("/", response_model=OfferPublic, name="offers:create-offer", status_code=status.HTTP_201_CREATED)
async def create_offer(
cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path),
current_user: UserInDB = Depends(get_current_active_user),
offers_repo: OffersRepository = Depends(get_repository(OffersRepository)),
) -> OfferPublic:
if cleaning.owner == current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Users are unable to create offers for cleaning jobs they own.",
)
new_offer = OfferCreate(cleaning_id=cleaning.id, user_id=current_user.id)
return await offers_repo.create_offer_for_cleaning(new_offer=new_offer)
# ...other code

Ok - this makes sense. We check that a user is logged in and we check to make sure that user isn't the owner of the cleaning resource that an offer is being created for. This'll work, but whenever we start implementing permissions in the route like this, it's a clear sign to use dependencies. Let's refactor.

api/routes/offers.py
# ...other code
from app.api.dependencies.offers import check_offer_create_permissions
# ...other code
@router.post(
"/",
response_model=OfferPublic,
name="offers:create-offer",
status_code=status.HTTP_201_CREATED,
dependencies=[Depends(check_offer_create_permissions)],
)
async def create_offer(
cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path),
current_user: UserInDB = Depends(get_current_active_user),
offers_repo: OffersRepository = Depends(get_repository(OffersRepository)),
) -> OfferPublic:
return await offers_repo.create_offer_for_cleaning(
new_offer=OfferCreate(cleaning_id=cleaning.id, user_id=current_user.id)
)

Much cleaner. We're extracting our permissions checking into the api/dependencies/offers.py file and adding the check_offer_create_permissions function to our path decorator's dependencies list. Readers who don't recognize this pattern are encourage to read the previous post on owning resources in FastAPI.

Let's go ahead and create the offers dependency file.

touch backend/app/api/dependencies/offers.py

Then add this code:

api/dependencies/offers.py
from fastapi import HTTPException, Depends, status
from app.models.user import UserInDB
from app.models.cleaning import CleaningInDB
from app.db.repositories.offers import OffersRepository
from app.api.dependencies.database import get_repository
from app.api.dependencies.auth import get_current_active_user
from app.api.dependencies.cleanings import get_cleaning_by_id_from_path
async def check_offer_create_permissions(
current_user: UserInDB = Depends(get_current_active_user),
cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path),
offers_repo: OffersRepository = Depends(get_repository(OffersRepository)),
) -> None:
if cleaning.owner == current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Users are unable to create offers for cleaning jobs they own.",
)

There we go. Simple as pie. We're leveraging our previously defined get_cleaning_by_id_from_path which does most of the hard work for us. We'll be returning to this file a lot, so keep it open.

We still haven't created our OffersRepository yet, so let's do that next.

touch backend/app/db/repositories/offers.py

And in the file:

db/repositories/offers.py
from typing import List
from fastapi import HTTPException, status
from asyncpg.exceptions import UniqueViolationError
from app.db.repositories.base import BaseRepository
from app.models.offer import OfferCreate, OfferUpdate, OfferInDB
CREATE_OFFER_FOR_CLEANING_QUERY = """
INSERT INTO user_offers_for_cleanings (cleaning_id, user_id, status)
VALUES (:cleaning_id, :user_id, :status)
RETURNING cleaning_id, user_id, status, created_at, updated_at;
"""
class OffersRepository(BaseRepository):
async def create_offer_for_cleaning(self, *, new_offer: OfferCreate) -> OfferInDB:
try:
created_offer = await self.db.fetch_one(
query=CREATE_OFFER_FOR_CLEANING_QUERY, values={**new_offer.dict(), "status": "pending"}
)
return OfferInDB(**created_offer)
except UniqueViolationError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Users aren't allowed create more than one offer for a cleaning job.",
)

Now this works. Run the tests and they should all pass. However...this should again give us pause. Catching a unique violation error as a way to prevent duplicate entries is another permissions issue. When we see this, our thinking should go directly to dependencies.

We'll refactor it in a minute.

First, let's make sure we can fetch offers.

Getting Offers

Before we do anything else, we're going to add a new fixture that'll make our testing lives easier.

Add the following to tests/conftest.py:

tests/conftest.py
# ...other code
from app.models.offer import OfferCreate, OfferUpdate
from app.db.repositories.offers import OffersRepository
# ...other code
@pytest.fixture
async def test_cleaning_with_offers(db: Database, test_user2: UserInDB, test_user_list: List[UserInDB]) -> CleaningInDB:
cleaning_repo = CleaningsRepository(db)
offers_repo = OffersRepository(db)
new_cleaning = CleaningCreate(
name="cleaning with offers", description="desc for cleaning", price=9.99, cleaning_type="full_clean",
)
created_cleaning = await cleaning_repo.create_cleaning(new_cleaning=new_cleaning, requesting_user=test_user2)
for user in test_user_list:
await offers_repo.create_offer_for_cleaning(
new_offer=OfferCreate(cleaning_id=created_cleaning.id, user_id=user.id)
)
return created_cleaning

Pretty self-explanatory. The test_cleaning_with_offers fixture creates a cleaning resource owned by test_user2 and then creates an offer from each user in the test_user_list.

Back to the test_offers.py file:

tests/test_offers.py
# ...other code
import random
# ...other code
class TestGetOffers:
async def test_cleaning_owner_can_get_offer_from_user(
self,
app: FastAPI,
create_authorized_client: Callable,
test_user2: UserInDB,
test_user_list: List[UserInDB],
test_cleaning_with_offers: CleaningInDB,
) -> None:
authorized_client = create_authorized_client(user=test_user2)
selected_user = random.choice(test_user_list)
res = await authorized_client.get(
app.url_path_for(
"offers:get-offer-from-user",
cleaning_id=test_cleaning_with_offers.id,
username=selected_user.username,
)
)
assert res.status_code == status.HTTP_200_OK
offer = OfferPublic(**res.json())
assert offer.user_id == selected_user.id
async def test_offer_owner_can_get_own_offer(
self,
app: FastAPI,
create_authorized_client: Callable,
test_user_list: List[UserInDB],
test_cleaning_with_offers: CleaningInDB,
) -> None:
first_test_user = test_user_list[0]
authorized_client = create_authorized_client(user=first_test_user)
res = await authorized_client.get(
app.url_path_for(
"offers:get-offer-from-user",
cleaning_id=test_cleaning_with_offers.id,
username=first_test_user.username,
)
)
assert res.status_code == status.HTTP_200_OK
offer = OfferPublic(**res.json())
assert offer.user_id == first_test_user.id
async def test_other_authenticated_users_cant_view_offer_from_user(
self,
app: FastAPI,
create_authorized_client: Callable,
test_user_list: List[UserInDB],
test_cleaning_with_offers: CleaningInDB,
) -> None:
first_test_user = test_user_list[0]
second_test_user = test_user_list[1]
authorized_client = create_authorized_client(user=first_test_user)
res = await authorized_client.get(
app.url_path_for(
"offers:get-offer-from-user",
cleaning_id=test_cleaning_with_offers.id,
username=second_test_user.username,
)
)
assert res.status_code == status.HTTP_403_FORBIDDEN
async def test_cleaning_owner_can_get_all_offers_for_cleanings(
self,
app: FastAPI,
create_authorized_client: Callable,
test_user2: UserInDB,
test_user_list: List[UserInDB],
test_cleaning_with_offers: CleaningInDB,
) -> None:
authorized_client = create_authorized_client(user=test_user2)
res = await authorized_client.get(
app.url_path_for("offers:list-offers-for-cleaning", cleaning_id=test_cleaning_with_offers.id)
)
assert res.status_code == status.HTTP_200_OK
for offer in res.json():
assert offer["user_id"] in [user.id for user in test_user_list]
async def test_non_owners_forbidden_from_fetching_all_offers_for_cleaning(
self, app: FastAPI, authorized_client: AsyncClient, test_cleaning_with_offers: CleaningInDB,
) -> None:
res = await authorized_client.get(
app.url_path_for("offers:list-offers-for-cleaning", cleaning_id=test_cleaning_with_offers.id)
)
assert res.status_code == status.HTTP_403_FORBIDDEN

There's a lot going on here, but it's all connected. We're testing five conditions:

  1. We make sure that the owner of a cleaning job can successfully fetch a single offer made for that cleaning job.
  2. On top of that, the creator of the offer should be able to fetch their own offer.
  3. We ensure non-owners are forbidden from fetching an offer made for other users' cleaning jobs.
  4. We also make sure cleaning owners can fetch a list of all offers made for their own cleaning job.
  5. Finally, non-owners are forbidden from fetching a list of offers made for a cleaning resource they don't own.

Now let's get them passing.

Dependency Fiesta

First off, recall that we'll be fetching offers according to the id of the cleaning job and the username of the user who made the offer. So, let's start by creating a dependency to fetch users by username from a path parameter.

touch backend/app/api/dependencies/users.py

And in the new file:

api/dependencies/users.py
from fastapi import HTTPException, Depends, Path, status
from app.models.user import UserInDB
from app.db.repositories.users import UsersRepository
from app.api.dependencies.database import get_repository
from app.api.dependencies.auth import get_current_active_user
async def get_user_by_username_from_path(
username: str = Path(..., min_length=3, regex="^[a-zA-Z0-9_-]+$"),
current_user: UserInDB = Depends(get_current_active_user),
users_repo: UsersRepository = Depends(get_repository(UsersRepository)),
) -> UserInDB:
user = await users_repo.get_user_by_username(username=username, populate=False)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="No user found with that username.",
)
return user

Tada! That's an easy one. We already had our get_user_by_username method on our UsersRepository. All we needed to do was validate the username path parameter using the same code as our UsersCreate model and look up the user in the database.

Now update the api/dependencies/offers.py file like so:

api/dependencies/offers.py
from fastapi import HTTPException, Depends, status
from app.models.user import UserInDB
from app.models.cleaning import CleaningInDB
from app.models.offer import OfferInDB
from app.db.repositories.offers import OffersRepository
from app.api.dependencies.database import get_repository
from app.api.dependencies.auth import get_current_active_user
from app.api.dependencies.users import get_user_by_username_from_path
from app.api.dependencies.cleanings import get_cleaning_by_id_from_path
async def get_offer_for_cleaning_from_user(
*, user: UserInDB, cleaning: CleaningInDB, offers_repo: OffersRepository,
) -> OfferInDB:
offer = await offers_repo.get_offer_for_cleaning_from_user(cleaning=cleaning, user=user)
if not offer:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Offer not found.")
return offer
async def get_offer_for_cleaning_from_user_by_path(
user: UserInDB = Depends(get_user_by_username_from_path),
cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path),
offers_repo: OffersRepository = Depends(get_repository(OffersRepository)),
) -> OfferInDB:
return await get_offer_for_cleaning_from_user(user=user, cleaning=cleaning, offers_repo=offers_repo)
async def check_offer_create_permissions(
current_user: UserInDB = Depends(get_current_active_user),
cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path),
offers_repo: OffersRepository = Depends(get_repository(OffersRepository)),
) -> None:
if cleaning.owner == current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Users are unable to create offers for cleaning jobs they own.",
)
if await offers_repo.get_offer_for_cleaning_from_user(cleaning=cleaning, user=current_user):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Users aren't allowed create more than one offer for a cleaning job.",
)
def check_offer_list_permissions(
current_user: UserInDB = Depends(get_current_active_user),
cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path),
) -> None:
if cleaning.owner != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Unable to access offers.",
)
def check_offer_get_permissions(
current_user: UserInDB = Depends(get_current_active_user),
cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path),
offer: OfferInDB = Depends(get_offer_for_cleaning_from_user_by_path),
) -> None:
if cleaning.owner != current_user.id and offer.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Unable to access offer.",
)

That's a lot of code.

First we have a get_offer_for_cleaning_from_user utility function that returns an OfferInDB object or a 404 response. It uses the yet-to-be-created method on OffersRepository which is also named get_offer_for_cleaning_from_user to look up that record in the database.

That utility function powers our get_offer_for_cleaning_from_user_by_path dependency that is used all over. It combines the get_user_by_username_from_path dependency we defined a second ago and the get_cleaning_by_id_from_path dependency we defined in our previous post. By doing so, it can pass all the required data to our utility function and look up the offer in the database.

We also update the check_offer_create_permissions with an additional check for uniqueness. Now we can remove that clunky try...except UniqueViolationError... block from our OffersRepository.

Last, but not least, we define permission checkers for listing offers and getting a single offer. In check_offer_list_permissions we ensure that only the cleaning owner can list offers for their own cleaning. In check_offer_get_permissions we forbid anyone except the cleaning owner and offer creator from getting a particular offer.

As tedious as this may seem, it moves permissions logic out of the routes and keeps them nice and tidy. Update the routes like so:

api/routes/offers.py
# ...other code
from app.api.dependencies.offers import (
check_offer_create_permissions,
check_offer_get_permissions,
check_offer_list_permissions,
get_offer_for_cleaning_from_user_by_path,
)
# ...other code
@router.get(
"/",
response_model=List[OfferPublic],
name="offers:list-offers-for-cleaning",
dependencies=[Depends(check_offer_list_permissions)],
)
async def list_offers_for_cleaning(
cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path),
offers_repo: OffersRepository = Depends(get_repository(OffersRepository)),
) -> OfferPublic:
return await offers_repo.list_offers_for_cleaning(cleaning=cleaning)
@router.get(
"/{username}/",
response_model=OfferPublic,
name="offers:get-offer-from-user",
dependencies=[Depends(check_offer_get_permissions)],
)
async def get_offer_from_user(offer: OfferInDB = Depends(get_offer_for_cleaning_from_user_by_path)) -> OfferPublic:
return offer

Look at that! The function body for both of our routes is a single line. Not bad.

All we're missing is the database interface.

db/repositories/offers.py
from typing import List
from fastapi import HTTPException, status
from asyncpg.exceptions import UniqueViolationError
from app.db.repositories.base import BaseRepository
from app.models.cleaning import CleaningInDB
from app.models.user import UserInDB
from app.models.offer import OfferCreate, OfferUpdate, OfferInDB
CREATE_OFFER_FOR_CLEANING_QUERY = """
INSERT INTO user_offers_for_cleanings (cleaning_id, user_id, status)
VALUES (:cleaning_id, :user_id, :status)
RETURNING cleaning_id, user_id, status, created_at, updated_at;
"""
LIST_OFFERS_FOR_CLEANING_QUERY = """
SELECT cleaning_id, user_id, status, created_at, updated_at
FROM user_offers_for_cleanings
WHERE cleaning_id = :cleaning_id;
"""
GET_OFFER_FOR_CLEANING_FROM_USER_QUERY = """
SELECT cleaning_id, user_id, status, created_at, updated_at
FROM user_offers_for_cleanings
WHERE cleaning_id = :cleaning_id AND user_id = :user_id;
"""
class OffersRepository(BaseRepository):
async def create_offer_for_cleaning(self, *, new_offer: OfferCreate) -> OfferInDB:
created_offer = await self.db.fetch_one(
query=CREATE_OFFER_FOR_CLEANING_QUERY,
values={**new_offer.dict(), "status": "pending"},
)
return OfferInDB(**created_offer)
async def list_offers_for_cleaning(self, *, cleaning: CleaningInDB) -> List[OfferInDB]:
offers = await self.db.fetch_all(
query=LIST_OFFERS_FOR_CLEANING_QUERY,
values={"cleaning_id": cleaning.id}
)
return [OfferInDB(**o) for o in offers]
async def get_offer_for_cleaning_from_user(self, *, cleaning: CleaningInDB, user: UserInDB) -> OfferInDB:
offer_record = await self.db.fetch_one(
query=GET_OFFER_FOR_CLEANING_FROM_USER_QUERY,
values={"cleaning_id": cleaning.id, "user_id": user.id},
)
if not offer_record:
return None
return OfferInDB(**offer_record)

By removing all permission checking from our repository, this becomes simply an exercise in SQL.

Run the tests and they should all pass.

Accepting and Rejecting Offers

Now onto a slightly trickier part. To fulfill our marketplace requirements, cleaning owner should be able to accept and reject offers. Testing this properly is key to ensuring that our application is functioning the way we intend it to, so we'll pay special attention to this next part.

Here's what we want to accomplish:

  1. The owner of a cleaning resource should be able to accept an offer, and no one else should be able to.
  2. Only one offer should be accepted.
  3. Once an offer is accepted, all others should be set to "rejected".

Add the following test case to our tests/test_offers.py file:

tests/test_offers.py
# ...other code
class TestAcceptOffers:
async def test_cleaning_owner_can_accept_offer_successfully(
self,
app: FastAPI,
create_authorized_client: Callable,
test_user2: UserInDB,
test_user_list: List[UserInDB],
test_cleaning_with_offers: CleaningInDB,
) -> None:
selected_user = random.choice(test_user_list)
authorized_client = create_authorized_client(user=test_user2)
res = await authorized_client.put(
app.url_path_for(
"offers:accept-offer-from-user",
cleaning_id=test_cleaning_with_offers.id,
username=selected_user.username,
)
)
assert res.status_code == status.HTTP_200_OK
accepted_offer = OfferPublic(**res.json())
assert accepted_offer.status == "accepted"
assert accepted_offer.user == selected_user.id
assert accepted_offer.cleaning == test_cleaning_with_offers.id
async def test_non_owner_forbidden_from_accepting_offer_for_cleaning(
self,
app: FastAPI,
authorized_client: AsyncClient,
test_user_list: List[UserInDB],
test_cleaning_with_offers: CleaningInDB,
) -> None:
selected_user = random.choice(test_user_list)
res = await authorized_client.put(
app.url_path_for(
"offers:accept-offer-from-user",
cleaning_id=test_cleaning_with_offers.id,
username=selected_user.username,
)
)
assert res.status_code == status.HTTP_403_FORBIDDEN
async def test_cleaning_owner_cant_accept_multiple_offers(
self,
app: FastAPI,
create_authorized_client: Callable,
test_user2: UserInDB,
test_user_list: List[UserInDB],
test_cleaning_with_offers: CleaningInDB,
) -> None:
authorized_client = create_authorized_client(user=test_user2)
res = await authorized_client.put(
app.url_path_for(
"offers:accept-offer-from-user",
cleaning_id=test_cleaning_with_offers.id,
username=test_user_list[0].username,
)
)
assert res.status_code == status.HTTP_200_OK
res = await authorized_client.put(
app.url_path_for(
"offers:accept-offer-from-user",
cleaning_id=test_cleaning_with_offers.id,
username=test_user_list[1].username,
)
)
assert res.status_code == status.HTTP_400_BAD_REQUEST
async def test_accepting_one_offer_rejects_all_other_offers(
self,
app: FastAPI,
create_authorized_client: Callable,
test_user2: UserInDB,
test_user_list: List[UserInDB],
test_cleaning_with_offers: CleaningInDB,
) -> None:
selected_user = random.choice(test_user_list)
authorized_client = create_authorized_client(user=test_user2)
res = await authorized_client.put(
app.url_path_for(
"offers:accept-offer-from-user",
cleaning_id=test_cleaning_with_offers.id,
username=selected_user.username,
)
)
assert res.status_code == status.HTTP_200_OK
res = await authorized_client.get(
app.url_path_for("offers:list-offers-for-cleaning", cleaning_id=test_cleaning_with_offers.id)
)
assert res.status_code == status.HTTP_200_OK
offers = [OfferPublic(**o) for o in res.json()]
for offer in offers:
if offer.user == selected_user.id:
assert offer.status == "accepted"
else:
assert offer.status == "rejected"

Hopefully by now these tests don't look foreign. Feel free to add more if they don't feel sufficient.

Regardless, we'll get them passing in the same fashion as before. Start by adding a permission checker to the api/dependencies/offers.py file:

api/dependencies/offers.py
# ...other code
def check_offer_acceptance_permissions(
current_user: UserInDB = Depends(get_current_active_user),
cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path),
offer: OfferInDB = Depends(get_offer_for_cleaning_from_user_by_path),
) -> None:
if cleaning.owner != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Only the owner of the cleaning may accept offers."
)
if offer.status != "pending":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Can only accept offers that are currently pending."
)

Nothing new here. We're checking that the currently authenticated user must be the owner of the cleaning resource to accept it, and we're ensuring that the current state of the offer is "pending".

Now let's use it in the api/routes/offers.py file:

# ...other code
from app.api.dependencies.offers import (
check_offer_create_permissions,
check_offer_get_permissions,
check_offer_list_permissions,
check_offer_acceptance_permissions,
get_offer_for_cleaning_from_user_by_path,
)
# ...other code
@router.put(
"/{username}/",
response_model=OfferPublic,
name="offers:accept-offer-from-user",
dependencies=[Depends(check_offer_acceptance_permissions)],
)
async def accept_offer(
offer: OfferInDB = Depends(get_offer_for_cleaning_from_user_by_path),
offers_repo: OffersRepository = Depends(get_repository(OffersRepository)),
) -> OfferPublic:
return await offers_repo.accept_offer(offer=offer, offer_update=OfferUpdate(status="accepted"))

More of the same. This pattern has emerged in every one of our routes. Use the dependency injection system for supplying the database interface, gathering the correct resources, and checking permissions. Then modify database records and return the proper response.

In our OffersRepository:

db/repositories/offers.py
# ...other code
ACCEPT_OFFER_QUERY = """
UPDATE user_offers_for_cleanings
SET status = 'accepted'
WHERE cleaning_id = :cleaning_id AND user_id = :user_id
RETURNING cleaning_id, user_id, status, created_at, updated_at;
"""
REJECT_ALL_OTHER_OFFERS_QUERY = """
UPDATE user_offers_for_cleanings
SET status = 'rejected'
WHERE cleaning_id = :cleaning_id
AND user_id != :user_id
AND status = 'pending';
"""
class OffersRepository(BaseRepository):
# ...other code
async def accept_offer(self, *, offer: OfferInDB, offer_update: OfferUpdate) -> OfferInDB:
async with self.db.transaction():
accepted_offer = await self.db.fetch_one(
query=ACCEPT_OFFER_QUERY, # accept current offer
values={"cleaning_id": offer.cleaning_id, "user_id": offer.user_id},
)
await self.db.execute(
query=REJECT_ALL_OTHER_OFFERS_QUERY, # reject all other offers
values={"cleaning_id": offer.cleaning_id, "user_id": offer.user_id},
)
return OfferInDB(**accepted_offer)

Ah! Something new. In our accept_offer method, we're managing a transaction through an async context block as specified in the encode databases documentation. The Postgres documentation explains transactions like so:

The essential point of a transaction is that it bundles multiple steps into a single, all-or-nothing operation. The intermediate states between the steps are not visible to other concurrent transactions, and if some failure occurs that prevents the transaction from completing, then none of the steps affect the database at all.

So, what's the reason for using a transaction in our accept_offer method?

Well, we're executing multiple queries that depend on each other. The crucial thing to understand about transactions is that either every database action succeeds, or they all fail. If there is an error in the ACCEPT_OFFER_QUERY or in the REJECT_ALL_OTHER_OFFERS_QUERY, then entire transaction block stops executing and all database changes will be rolled back.

When an owner accepts an offer for their cleaning job, we change the status of the offer to "accepted" and change all others to "rejected". We need all of that to happen or none of it. Any state in between would leave our database out of sync. By executing both operations inside a transaction, we ensure that we're left with a state that's manageable. We could definitely add additional error handling here, but for the time being, we'll leave it as is.

Also notice how we don't really need the offer_update parameter? Feel free to remove it here and in the route later on.

Run the tests and watch them all pass! Fantastic.

Rescinding and Cancelling Offers

Finally. We can see the finish line.

For this last part, we want two things. First, users who have made offers should to be able to rescind their own if it is still pending. Second, if the offer has already been accepted, we want users to be able to cancel their offer. Maybe they already accepted another offer, or they realize they just don't have enough time.

We'll start with cancelling.

This will look similar to our "accept" functionality, so let's get to it.

Cancelling

Functionality allowing a user to cancel an offer will follow these guidelines:

  1. Only the user who made the offer can cancel it
  2. The offer must have already been accepted
  3. All "rejected" offers for that cleaning job resource need to be set to "pending"

We'll need to adequately test that.

To let users cancel their offer we're going to need another new fixture. So add this to the tests/conftest.py file:

tests/conftest.py
# ...other code
@pytest.fixture
async def test_cleaning_with_accepted_offer(
db: Database, test_user2: UserInDB, test_user3: UserInDB, test_user_list: List[UserInDB]
) -> CleaningInDB:
cleaning_repo = CleaningsRepository(db)
offers_repo = OffersRepository(db)
new_cleaning = CleaningCreate(
name="cleaning with offers", description="desc for cleaning", price=9.99, cleaning_type="full_clean",
)
created_cleaning = await cleaning_repo.create_cleaning(new_cleaning=new_cleaning, requesting_user=test_user2)
offers = []
for user in test_user_list:
offers.append(
await offers_repo.create_offer_for_cleaning(
new_offer=OfferCreate(cleaning_id=created_cleaning.id, user_id=user.id)
)
)
await offers_repo.accept_offer(
offer=[o for o in offers if o.user_id == test_user3.id][0], offer_update=OfferUpdate(status="accepted")
)
return created_cleaning

We're creating a cleaning with offers from everyone in the test_user_list and having test_user2 accept the offer from test_user3.

Now, we add some tests.

tests/test_offers.py
# ...other code
class TestCancelOffers:
async def test_user_can_cancel_offer_after_it_has_been_accepted(
self,
app: FastAPI,
create_authorized_client: Callable,
test_user3: UserInDB,
test_cleaning_with_accepted_offer: CleaningInDB,
) -> None:
accepted_user_client = create_authorized_client(user=test_user3)
res = await accepted_user_client.put(
app.url_path_for("offers:cancel-offer-from-user", cleaning_id=test_cleaning_with_accepted_offer.id)
)
assert res.status_code == status.HTTP_200_OK
cancelled_offer = OfferPublic(**res.json())
assert cancelled_offer.status == "cancelled"
assert cancelled_offer.user == test_user3.id
assert cancelled_offer.cleaning == test_cleaning_with_accepted_offer.id
async def test_only_accepted_offers_can_be_cancelled(
self,
app: FastAPI,
create_authorized_client: Callable,
test_user4: UserInDB,
test_cleaning_with_accepted_offer: CleaningInDB,
) -> None:
selected_user_client = create_authorized_client(user=test_user4)
res = await selected_user_client.put(
app.url_path_for("offers:cancel-offer-from-user", cleaning_id=test_cleaning_with_accepted_offer.id)
)
assert res.status_code == status.HTTP_400_BAD_REQUEST
async def test_cancelling_offer_sets_all_others_to_pending(
self,
app: FastAPI,
create_authorized_client: Callable,
test_user3: UserInDB,
test_cleaning_with_accepted_offer: CleaningInDB,
) -> None:
accepted_user_client = create_authorized_client(user=test_user3)
res = await accepted_user_client.put(
app.url_path_for("offers:cancel-offer-from-user", cleaning_id=test_cleaning_with_accepted_offer.id)
)
assert res.status_code == status.HTTP_200_OK
offers_repo = OffersRepository(app.state._db)
offers = await offers_repo.list_offers_for_cleaning(cleaning=test_cleaning_with_accepted_offer)
for offer in offers:
if offer.user_id == test_user3.id:
assert offer.status == "cancelled"
else:
assert offer.status == "pending"

Look familiar? That's because it's almost identical to accepting an offer. The biggest difference will be in the dependencies. Both in how we fetch the offer in question and how we specify permissions.

Open the api/dependencies/offers.py file and add two new dependencies.

api/dependencies/offers.py
# ...other code
async def get_offer_for_cleaning_from_current_user(
current_user: UserInDB = Depends(get_current_active_user),
cleaning: CleaningInDB = Depends(get_cleaning_by_id_from_path),
offers_repo: OffersRepository = Depends(get_repository(OffersRepository)),
) -> OfferInDB:
return await get_offer_for_cleaning_from_user(user=current_user, cleaning=cleaning, offers_repo=offers_repo)
# ...other code
def check_offer_cancel_permissions(offer: OfferInDB = Depends(get_offer_for_cleaning_from_current_user)) -> None:
if offer.status != "accepted":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Can only cancel offers that have been accepted.",
)

We're no longer fetching the offer using the username and id path parameter like we did with the get_offer_for_cleaning_from_user_by_path dependency. Instead we're taking the currently authenticated user and combining the cleaning resource injected by the get_cleaning_by_id_from_path dependency to fetch the offer. Then we're simply ensuring that the offer has already been accepted in the check_offer_cancel_permissions function.

And in the api/routes/offers.py file:

api/routes/offers.py
# ...other code
from app.api.dependencies.offers import (
check_offer_create_permissions,
check_offer_get_permissions,
check_offer_list_permissions,
check_offer_acceptance_permissions,
check_offer_cancel_permissions,
get_offer_for_cleaning_from_current_user,
get_offer_for_cleaning_from_user_by_path,
)
# ...other code
@router.put(
"/",
response_model=OfferPublic,
name="offers:cancel-offer-from-user",
dependencies=[Depends(check_offer_cancel_permissions)],
)
async def cancel_offer(
offer: OfferInDB = Depends(get_offer_for_cleaning_from_current_user),
offers_repo: OffersRepository = Depends(get_repository(OffersRepository)),
) -> OfferPublic:
return await offers_repo.cancel_offer(offer=offer, offer_update=OfferUpdate(status="cancelled"))

This route looks almost identical to the "accept" one. The biggest difference is that we don't need the a username path parameter, since a currently authenticated user can only cancel their own offer.

The database interface isn't much different either

In the OffersRepository:

db/repositories/offers.py
# ...other code
CANCEL_OFFER_QUERY = """
UPDATE user_offers_for_cleanings
SET status = 'cancelled'
WHERE cleaning_id = :cleaning_id AND user_id = :user_id
RETURNING cleaning_id, user_id, status, created_at, updated_at;
"""
SET_ALL_OTHER_OFFERS_AS_PENDING_QUERY = """
UPDATE user_offers_for_cleanings
SET status = 'pending'
WHERE cleaning_id = :cleaning_id
AND user_id != :user_id
AND status = 'rejected';
"""
class OffersRepository(BaseRepository):
# ...other code
async def cancel_offer(self, *, offer: OfferInDB, offer_update: OfferUpdate) -> OfferInDB:
async with self.db.transaction():
cancelled_offer = await self.db.fetch_one(
query=CANCEL_OFFER_QUERY, # cancel current offer
values={"cleaning_id": offer.cleaning_id, "user_id": offer.user_id},
)
await self.db.execute(
query=SET_ALL_OTHER_OFFERS_AS_PENDING_QUERY, # set all other offers to pending again
values={"cleaning_id": offer.cleaning_id, "user_id": offer.user_id},
)
return OfferInDB(**cancelled_offer)

Again, we're using a transaction here to cancel the current offer and set all other offers to pending again. By this point we're so close, we can smell it.

Run the tests and they should pass.

Rescinding

Now head into the test_offers.py file and add the last test class.

tests/test_offers.py
# ...other code
class TestRescindOffers:
async def test_user_can_successfully_rescind_pending_offer(
self,
app: FastAPI,
create_authorized_client: Callable,
test_user4: UserInDB,
test_user_list: List[UserInDB],
test_cleaning_with_offers: CleaningInDB,
) -> None:
authorized_client = create_authorized_client(user=test_user4)
res = await authorized_client.delete(
app.url_path_for("offers:rescind-offer-from-user", cleaning_id=test_cleaning_with_offers.id)
)
assert res.status_code == status.HTTP_200_OK
offers_repo = OffersRepository(app.state._db)
offers = await offers_repo.list_offers_for_cleaning(cleaning=test_cleaning_with_offers)
user_ids = [user.id for user in test_user_list]
for offer in offers:
assert offer.user_id in user_ids
assert offer.user_id != test_user4.id
async def test_users_cannot_rescind_accepted_offers(
self,
app: FastAPI,
create_authorized_client: Callable,
test_user3: UserInDB,
test_cleaning_with_accepted_offer: CleaningInDB,
) -> None:
authorized_client = create_authorized_client(user=test_user3)
res = await authorized_client.delete(
app.url_path_for("offers:rescind-offer-from-user", cleaning_id=test_cleaning_with_accepted_offer.id)
)
assert res.status_code == status.HTTP_400_BAD_REQUEST
async def test_users_cannot_rescind_cancelled_offers(
self,
app: FastAPI,
create_authorized_client: Callable,
test_user3: UserInDB,
test_cleaning_with_accepted_offer: CleaningInDB,
) -> None:
authorized_client = create_authorized_client(user=test_user3)
res = await authorized_client.put(
app.url_path_for("offers:cancel-offer-from-user", cleaning_id=test_cleaning_with_accepted_offer.id)
)
assert res.status_code == status.HTTP_200_OK
res = await authorized_client.delete(
app.url_path_for("offers:rescind-offer-from-user", cleaning_id=test_cleaning_with_accepted_offer.id)
)
assert res.status_code == status.HTTP_400_BAD_REQUEST
async def test_users_cannot_rescind_rejected_offers(
self,
app: FastAPI,
create_authorized_client: Callable,
test_user4: UserInDB,
test_cleaning_with_accepted_offer: CleaningInDB,
) -> None:
authorized_client = create_authorized_client(user=test_user4)
res = await authorized_client.delete(
app.url_path_for("offers:rescind-offer-from-user", cleaning_id=test_cleaning_with_accepted_offer.id)
)
assert res.status_code == status.HTTP_400_BAD_REQUEST

Here are the conditions we're testing:

  1. If a user submits an offer, they should be able to rescind it successfully if it is pending. It should no longer appear when listing all offers for a cleaning job.
  2. If the offer is already accepted, raise a 400 exception. They should be forced to cancel it instead.
  3. If the offer is already cancelled, they shouldn't be able to cancel it again.
  4. If the offer has been rejected, they shouldn't be able to rescind it. We'll keep the rejection as a record.

To get these passing, we'll add one last dependency:

api/dependencies/offers.py
# ...other code
def check_offer_rescind_permissions(offer: OfferInDB = Depends(get_offer_for_cleaning_from_current_user)) -> None:
if offer.status != "pending":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Can only rescind currently pending offers."
)

Same as before. We grab the offer in question and ensure that it's status is pending, otherwise we raise an exception.

Top it off by using it in the proper route:

api/routes/offers.py
# ...other code
from app.api.dependencies.offers import (
check_offer_create_permissions,
check_offer_get_permissions,
check_offer_list_permissions,
check_offer_acceptance_permissions,
check_offer_cancel_permissions,
check_offer_rescind_permissions,
get_offer_for_cleaning_from_current_user,
get_offer_for_cleaning_from_user_by_path,
)
# ...other code
@router.delete(
"/",
response_model=int,
name="offers:rescind-offer-from-user",
dependencies=[Depends(check_offer_rescind_permissions)],
)
async def rescind_offer(
offer: OfferInDB = Depends(get_offer_for_cleaning_from_current_user),
offers_repo: OffersRepository = Depends(get_repository(OffersRepository)),
) -> OfferPublic:
return await offers_repo.rescind_offer(offer=offer)

More of the same, just with a DELETE action this time. All that's left is to define the rescind_offer method.

And we polish off the OffersRepository with:

db/repositories/offers.py
# ...other code
RESCIND_OFFER_QUERY = """
DELETE FROM user_offers_for_cleanings
WHERE cleaning_id = :cleaning_id
AND user_id = :user_id;
"""
class OffersRepository(BaseRepository):
# ...other code
async def rescind_offer(self, *, offer: OfferInDB) -> int:
return await self.db.execute(
query=RESCIND_OFFER_QUERY, # rescinding an offer deletes it as long as it's pending
values={"cleaning_id": offer.cleaning_id, "user_id": offer.user_id},
)

Run the tests again and they should all pass.

Wrapping Up and Resources

I'm exhausted, but happy. A functional marketplace has begun to emerge from our API and we have tested the entire thing. There's more we could do here, but not without a proper rest.

In the next post, we'll add an evaluation system for jobs that have been completed.

  • Postgresql transaction docs
  • Encode databases docs
  • FastAPI path parameters docs
  • FastAPI route decorator dependency docs
  • FastAPI SQL databases docs
  • Toptal: High-performing Apps with Python – blog post focused on building a todo app from scratch with FastAPI using the SQLAlchemy ORM.

Github Repo

All code up to this point can be found here:

Special thanks to Valon Januzaj for correcting errors in the original code.