Serving A Paginated Activity Feed From FastAPI

undraw svg icon

Welcome to Part 21 of Up and Running with FastAPI. If you missed part 20, you can find it here.

This series is focused on building a full-stack application with the FastAPI framework. The app allows users to post requests to have their residence cleaned, and other users can select a cleaning project for a given hourly rate.

Up And Running With FastAPI

Last time we met, we beefed up our frontend and put together an interface for accepting and rejecting user offers. Prospective cleaners can now offer their servies for a cleaning job and owners can choose which one to select.

On the backend side of things, we upgraded our FastAPI and pydantic versions, refactored our offer and evaluation models, and populated OfferPublic objects with the profile of the owner making the offer. We also spent a bit of time fixing tests that were broken in the refactor.

We'll stay in the backend realm for this post, as there will be quite a bit fo work to do there. We're going to present authenticated users with an activity feed for available cleaning jobs, as well as updates on jobs where an offer has been accepted.

On to the code.

Creating A Feed Model

Our goal here is to create an endpoint that will serve a naive activity feed displaying recently updated and created cleaning jobs. We're using the term "naive" to describe our end result because it's a simple solution to a potentially complex problem.

Big companies like Facebook, Twitter, and Snapchat use rather intricate algorithms to determine what content users should see. We won't be doing that. Instead, we'll approach this problem with efficiency as our primary objective. We want to pump out the best activity feed possible with the resources we currently have on hand.

That means we won't be adding a new table to our db, and we won't be architecting our own feed algorithm. We'll simply use recency as an ordering mechanism and vary our content based on whether the cleaning resource was created or updated recently.

To start, we'll need to create a new model that will represent a single item in our cleaning feed.

Create a new feed.py file in the models directory.

touch backend/app/models/feed.py

And add the following to it:

models/feed.py
from typing import Optional, Literal
import datetime
from app.models.core import CoreModel
from app.models.cleaning import CleaningPublic
class FeedItem(CoreModel):
row_number: Optional[int]
event_timestamp: Optional[datetime.datetime]
class CleaningFeedItem(CleaningPublic, FeedItem):
event_type: Optional[Literal["is_update", "is_create"]]

Two new models here. The first is the FeedItem model which has row_number and event_timestamp attributes. The row number will represent the order for each item within a single page, while the event_timestamp will refer to the order of the item compared to all items in our database. If that distinction doesn't make sense yet, don't worry. We'll get to it in a moment.

We're then importing our CleaningPublic model and creating a CleaningFeedItem model that inherits from it and the new FeedItem model. We've tacked on an additional event_type attribute that will differentiate between evengs where a new cleaning job was created and events where an existing cleaning job was modified. For this reason, we assign the Literal type to this field that only allows the values is_update and is_create. This is a slightly more lightweight approach than using an Enum.

That's actually all we'll need on the data side for now. Next, we'll set up our tests that will ensure our new code is working and satisfies all of our feed needs.

Kickstarting Our Tests

Make a new file called test_feed.py.

touch backend/app/tests/test_feed.py

And add the following to it:

tests/test_feed.py
from typing import List
import pytest
from httpx import AsyncClient
from fastapi import FastAPI, status
from app.models.cleaning import CleaningInDB
pytestmark = pytest.mark.asyncio
class TestFeedRoutes:
async def test_routes_exist(self, app: FastAPI, client: AsyncClient) -> None:
res = await client.get(app.url_path_for("feed:get-cleaning-feed-for-user"))
assert res.status_code != status.HTTP_404_NOT_FOUND

We start, as always, by testing that the endpoint exists, and that it doesn't return a 404 response.

Head into the docker container, and run that test suite.

docker ps
docker exec -it [CONTAINER_ID] bash
pytest tests/test_feed.py -v

It should fail. Time to fix that.

Setting Up A Dummy Endpoint

On to our routes. Create a new file called feed.py inside the api/routes directory.

touch backend/app/api/routes/feed.py

And get it set up like so:

routes/feed.py
from typing import List
from fastapi import APIRouter, Depends
from app.models.feed import CleaningFeedItem
from app.api.dependencies.auth import get_current_active_user
router = APIRouter()
@router.get(
"/cleanings/",
response_model=List[CleaningFeedItem],
name="feed:get-cleaning-feed-for-user",
dependencies=[Depends(get_current_active_user)],
)
async def get_cleaning_feed_for_user() -> List[CleaningFeedItem]:
return []

That was simple. Not much going on here with the /cleanings/ endpoint. We're returning an empty list because we're doing the bare minimum to make this test pass. We're also using the get_current_active_user dependency just to protect this route from any unauthenticated requests.

Now we make sure to include our new feed router in the app router:

routes/init.py
from fastapi import APIRouter
from app.api.routes.cleanings import router as cleanings_router
from app.api.routes.users import router as users_router
from app.api.routes.profiles import router as profiles_router
from app.api.routes.offers import router as offers_router
from app.api.routes.evaluations import router as evaluations_router
from app.api.routes.feed import router as feed_router
router = APIRouter()
router.include_router(cleanings_router, prefix="/cleanings", tags=["cleanings"])
router.include_router(users_router, prefix="/users", tags=["users"])
router.include_router(profiles_router, prefix="/profiles", tags=["profiles"])
router.include_router(offers_router, prefix="/cleanings/{cleaning_id}/offers", tags=["offers"])
router.include_router(evaluations_router, prefix="/users/{username}/evaluations", tags=["evaluations"])
router.include_router(feed_router, prefix="/feed", tags=["feed"])

We prefix the new router under /feed, so that our cleaning feed route looks like /api/feed/cleanings/. This should make it easy to add other feed endpoints in the future.

Run the tests again and they should pass.

S`more Tests

With all our tests now passing, the honor of writing a few more tests is bestowed upon us.

Before we do that, let's create a fixture that provides us with 50 cleaning jobs and updates 1 out of every 4 of them or so.

tests/conftest.py
# ...other code
from app.models.cleaning import CleaningCreate, CleaningUpdate, CleaningInDB
# ...other code
@pytest.fixture
async def test_list_of_new_and_updated_cleanings(db: Database, test_user_list: List[UserInDB]) -> List[CleaningInDB]:
cleanings_repo = CleaningsRepository(db)
new_cleanings = [
await cleanings_repo.create_cleaning(
new_cleaning=CleaningCreate(
name=f"feed item cleaning job - {i}",
description=f"test description for feed item cleaning: {i}",
price=float(f"{i}9.99"),
cleaning_type=["full_clean", "spot_clean", "dust_up"][i % 3],
),
requesting_user=test_user_list[i % len(test_user_list)],
)
for i in range(50)
]
# update every 4 cleanings
for i, cleaning in enumerate(new_cleanings):
if i % 4 == 0:
updated_cleaning = await cleanings_repo.update_cleaning(
cleaning=cleaning,
cleaning_update=CleaningUpdate(
description=f"Updated {cleaning.description}", price=cleaning.price + 100.0
),
)
new_cleanings[i] = updated_cleaning
return new_cleanings

That'll do the trick. We start by creating a list of new cleanings and attaching different attributes to each one. We also mix up the ownership between users in our test_user_list just for good measure.

Then we loop over the new cleaning jobs and update every fourth one with a new description and increased price.

Since we're not requiring any of the cleaning attributes to be unique, we can use this fixture as often as we want, and it'll create 50 new cleaning jobs each time.

Let's put it to good use.

tests/test_feed.py
# ...other code
class TestCleaningFeed:
async def test_cleaning_feed_returns_valid_response(
self,
*,
app: FastAPI,
authorized_client: AsyncClient,
test_list_of_new_and_updated_cleanings: List[CleaningInDB],
) -> None:
cleaning_ids = [cleaning.id for cleaning in test_list_of_new_and_updated_cleanings]
res = await authorized_client.get(app.url_path_for("feed:get-cleaning-feed-for-user"))
assert res.status_code == status.HTTP_200_OK
cleaning_feed = res.json()
assert isinstance(cleaning_feed, list)
assert len(cleaning_feed) == 20
assert set(feed_item["id"] for feed_item in cleaning_feed).issubset(set(cleaning_ids))

Ah there we are. Let's break down what we're expecting from this test.

  1. The endpoint should return a valid 200 response.
  2. The JSON response should be a list with 20 items in it.
  3. Each item in the list should have an id attribute that exists within the set of cleaning_ids we compose from the test_list_of_new_and_updated_cleanings fixture that we just created.

Using the issubset method provided by python sets makes this test very easy. It simply confirms that each unique id in our endpoint response is also a member of the cleaning_ids set we create. No need for loops. A+.

We should probably clarify why we're expecting our endpoint to return exactly 20 items. Because we're going to implement a mechanism for pagination at this endpoint, we'll also need to specify a default page size. In our case, we're going with 20 items for now. Feel free to change that as needed. Unless we provide the endpoint with additional parameters, the JSON returned from it should contain exactly 20 items - provided at least 20 cleaning jobs exist in our database.

If we run this test, it should fail at the 20 items assertion. We'll need to do a few things to get this passing and a few more to get the next assertion passing.

The Feed Repository

First things first, let's create a new database interface for any feed-related requests.

touch backend/app/db/repositories/feed.py

Then add create a simple FeedRepository class with an extremely naive SQL query to match.

db/repositories/feed.py
from typing import List
from app.db.repositories.base import BaseRepository
from app.models.feed import CleaningFeedItem
FETCH_CLEANING_JOBS_FOR_FEED_QUERY = """
SELECT id,
name,
description,
price,
cleaning_type,
owner,
created_at,
updated_at,
'is_create' AS event_type,
created_at AS event_timestamp,
ROW_NUMBER() OVER ( ORDER BY created_at DESC ) AS row_number
FROM cleanings
ORDER BY created_at DESC
LIMIT :page_chunk_size;
"""
class FeedRepository(BaseRepository):
async def fetch_cleaning_jobs_feed(self, *, page_chunk_size: int = 20) -> List[CleaningFeedItem]:
cleaning_feed_item_records = await self.db.fetch_all(
query=FETCH_CLEANING_JOBS_FOR_FEED_QUERY,
values={"page_chunk_size": page_chunk_size},
)
return [CleaningFeedItem(**item) for item in cleaning_feed_item_records]

We start by importing the new CleaningFeedItem model and our standard BaseRepository, and then using them in our FeedRepository class. At the moment our feed repo has a single method - fetch_cleaning_jobs_feed - that is responsible for executing the SQL query we defined above as FETCH_CLEANING_JOBS_FOR_FEED_QUERY and converting the result of that query into a list of CleaningFeedItem models.

There's a few interesting things happening with our SQL. It starts by selecting attributes pertaining to a standard CleaningPublic model: id, name, and so on. Nearing the bottom of the SELECT statement, we create fields for event_type, event_timestamp, and row_number associated with our FeedItem and CleaningFeedItem models.

To get the tests passing, we simply set event_type to is_create and event_timestamp to the created_at field. Don't spend much time on these two, as we'll be changing them shortly. For row_number we employ the postgres ROW_NUMBER() window function to assign each cleaning resource a position in the query that corresponds with the job's creation date. Using the OVER ( ORDER BY created_at DESC ) syntax ensures that jobs that were most recently created will have the lowest row numbers. Therefore, our feed should show current events before past events.

At the bottom of the query, we order the response appropriately and grab only the first NN items, where NN is the :page_chunk_size parameter passed to our query.

This seems like it could work! And it should. Sort of. We'll see why we need to refactor it in a second.

For now let's hook it up to our endpoint.

Filling Out the Cleaning Feed Endpoint

Start by adding a few imports and using the FeedRepository in our route handler.

routes/feed.py
from typing import List
from fastapi import APIRouter, Depends
from app.models.feed import CleaningFeedItem
from app.db.repositories.feed import FeedRepository
from app.api.dependencies.database import get_repository
from app.api.dependencies.auth import get_current_active_user
router = APIRouter()
@router.get(
"/cleanings/",
response_model=List[CleaningFeedItem],
name="feed:get-cleaning-feed-for-user",
dependencies=[Depends(get_current_active_user)],
)
async def get_cleaning_feed_for_user(
feed_repository: FeedRepository = Depends(get_repository(FeedRepository)),
) -> List[CleaningFeedItem]:
return await feed_repository.fetch_cleaning_jobs_feed()

Magical. We await the response from our fetch_cleaning_jobs_feed method and return it in our endpoint.

This should be all we need to get the first test passing.

Before we get too excited, let's add another test case to highlight the work that still needs to be done here.

Test Proper Feed Order

Add another test case to our TestCleaningFeed class.

tests/test_feed.py
# ...other code
class TestCleaningFeed:
async def test_cleaning_feed_returns_valid_response(
self,
*,
app: FastAPI,
authorized_client: AsyncClient,
test_list_of_new_and_updated_cleanings: List[CleaningInDB],
) -> None:
cleaning_ids = [cleaning.id for cleaning in test_list_of_new_and_updated_cleanings]
res = await authorized_client.get(app.url_path_for("feed:get-cleaning-feed-for-user"))
assert res.status_code == status.HTTP_200_OK
cleaning_feed = res.json()
assert isinstance(cleaning_feed, list)
assert len(cleaning_feed) == 20
assert set(feed_item["id"] for feed_item in cleaning_feed).issubset(set(cleaning_ids))
async def test_cleaning_feed_response_is_ordered_correctly(
self,
*,
app: FastAPI,
authorized_client: AsyncClient,
test_list_of_new_and_updated_cleanings: List[CleaningInDB],
) -> None:
res = await authorized_client.get(app.url_path_for("feed:get-cleaning-feed-for-user"))
assert res.status_code == status.HTTP_200_OK
cleaning_feed = res.json()
# the first 13 should be updated and the rest should not be updated
for feed_item in cleaning_feed[:13]:
assert feed_item["event_type"] == "is_update"
for feed_item in cleaning_feed[13:]:
assert feed_item["event_type"] == "is_create"

Ok, things are now a bit more interesting. We're expecting the first 13 items in our response to have their event_type attribute set to is_update, and the rest to be set to is_create.

How did we arrive at the number 13?

Well, we used an i % 4 == 0 condition to determine which cleaning job to update. Since 0 % 4 == 0, we updated the first job and then counted up to 4 until we updated the next one. Doing that across 50 jobs gives us 13 updated jobs.

Why should they come first in our query?

The answer to that is simple. The timestamp representing when the event took place - event_timestamp - should be more recent than the created_at timestamps present in every other cleaning feed item. We're not handling that at the moment, but we'll need to.

Run the tests again and this one should fail.

Time to go about fixing that.

First, we'll need a way of identifying any cleaning jobs that were updated. Then we'll need to use that information to modify their event_type and event_timestamp accordingly. Finally, we'll order things properly and celebrate our success.

Refactor The Repository

Head back into the FeedRepository and make a few adjustments.

db/repositories/feed.py
from typing import List
from app.db.repositories.base import BaseRepository
from app.models.feed import CleaningFeedItem
FETCH_CLEANING_JOBS_FOR_FEED_QUERY = """
SELECT id,
name,
description,
price,
cleaning_type,
owner,
created_at,
updated_at,
CASE
WHEN created_at = updated_at THEN 'is_create'
ELSE 'is_update'
END AS event_type,
GREATEST(created_at, updated_at) AS event_timestamp,
ROW_NUMBER() OVER ( ORDER BY GREATEST(created_at, updated_at) DESC ) AS row_number
FROM cleanings
ORDER BY GREATEST(created_at, updated_at) DESC
LIMIT :page_chunk_size;
"""
class FeedRepository(BaseRepository):
async def fetch_cleaning_jobs_feed(self, *, page_chunk_size: int = 20) -> List[CleaningFeedItem]:
cleaning_feed_item_records = await self.db.fetch_all(
query=FETCH_CLEANING_JOBS_FOR_FEED_QUERY,
values={"page_chunk_size": page_chunk_size},
)
return [CleaningFeedItem(**item) for item in cleaning_feed_item_records]

Some good ol' fashioned SQL to the rescue.

Our CASE...WHEN statement checks to see if the created_at timestamp is equal to the updated_at timestamp. If they are, we know the cleaning job has not been updated at all, and therefore we label this as an is_create event. Otherwise, we assign event_type the value is_update.

Below that, we use the GREATEST conditional expression to assess which value, created_at or updated_at, contains the most recent timestamp. Whichever one is greater is used to determine ordering in our response.

Run the tests again and they should pass.

However, our refactor has introduced a troubling problem. 10 imaginary points to any reader who already knows what it is. For those that haven't identified it, hold that thought. We'll dig out this issue in a bit.

Before we get to that, let's go ahead and write a new test to check that we're able to paginate properly.

Implementing Pagination Without Offset

Add a new test case to our tests/test_feed.py file:

tests/test_feed.py
# ...other code
class TestCleaningFeed:
# ...other code
async def test_cleaning_feed_can_paginate_correctly(
self,
*,
app: FastAPI,
authorized_client: AsyncClient,
test_list_of_new_and_updated_cleanings: List[CleaningInDB],
) -> None:
res_page_1 = await authorized_client.get(app.url_path_for("feed:get-cleaning-feed-for-user"))
assert res_page_1.status_code == status.HTTP_200_OK
cleaning_feed_page_1 = res_page_1.json()
assert len(cleaning_feed_page_1) == 20
ids_page_1 = set(feed_item["id"] for feed_item in cleaning_feed_page_1)
new_starting_date = cleaning_feed_page_1[-1]["event_timestamp"]
res_page_2 = await authorized_client.get(
app.url_path_for("feed:get-cleaning-feed-for-user"),
params={"starting_date": new_starting_date, "page_chunk_size": 20},
)
assert res_page_2.status_code == status.HTTP_200_OK
cleaning_feed_page_2 = res_page_2.json()
assert len(cleaning_feed_page_2) == 20
ids_page_2 = set(feed_item["id"] for feed_item in cleaning_feed_page_2)
assert ids_page_1 != ids_page_2

This test is more involved than the previous two.

We make two requests to the "feed:get-cleaning-feed-for-user" endpoint. The first request is almost identical to the one in our first test case. After we create a set of ids from the response for page 1, we grab the event_timestamp the last item in that response and store it in the new_starting_date variable.

We then make an additional request to the same endpoint, this time passing the starting_date and page_chunk_size parameters in our request. What this will do is tack on query parameters to the url, transforming it into something like this:

"http://localhost:8000/api/feed/cleanings/?starting_date=2020-12-18%2019:33:29.115465&page_chunk_size=20"

Using query parameters in FastAPI endpoints is pretty straightforward. We just treat them the same as if they were path parameters. Even though our endpoint isn't currently set up to handle query parameters, the base url hasn't changed and so our route should operate fine whether or not we include them.

Upon receiving res_page_2 for that HTTP request, we parse the response as JSON and create a set of ids from it.

At the bottom of our test, we ensure that set of ids from the first response is not equivalent to the set of ids from the second response. Meaning, we have different results depending on the query params passed to our endpoint.

Run that test and watch it fail magnificently. Both sets of ids are identical. And they should be, since we haven't done anything to change that.

First, let's modify our route to accept the appropriate query params.

In the api/routes/feed.py file, add the following:

routes/feed.py
from typing import List
import datetime
from fastapi import APIRouter, Depends
from app.models.feed import CleaningFeedItem
from app.db.repositories.feed import FeedRepository
from app.api.dependencies.database import get_repository
from app.api.dependencies.auth import get_current_active_user
router = APIRouter()
@router.get(
"/cleanings/",
response_model=List[CleaningFeedItem],
name="feed:get-cleaning-feed-for-user",
dependencies=[Depends(get_current_active_user)],
)
async def get_cleaning_feed_for_user(
page_chunk_size: int = 20,
starting_date: datetime.datetime = datetime.datetime.now() + datetime.timedelta(minutes=10),
feed_repository: FeedRepository = Depends(get_repository(FeedRepository)),
) -> List[CleaningFeedItem]:
return await feed_repository.fetch_cleaning_jobs_feed(
starting_date=starting_date, page_chunk_size=page_chunk_size,
)

That's actually all that's needed to make query params work in FastAPI. They're basically indistinguishable from path params. In fact, almost too indistinguishable. We'd probably benefit from making their role more explicit. Fortunately, FastAPI provides a couple easy ways to do that.

Let's try out the most straightforward approach:

routes/feed.py
from typing import List
import datetime
from fastapi import APIRouter, Depends, Query
from app.models.feed import CleaningFeedItem
from app.db.repositories.feed import FeedRepository
from app.api.dependencies.database import get_repository
from app.api.dependencies.auth import get_current_active_user
router = APIRouter()
@router.get(
"/cleanings/",
response_model=List[CleaningFeedItem],
name="feed:get-cleaning-feed-for-user",
dependencies=[Depends(get_current_active_user)],
)
async def get_cleaning_feed_for_user(
page_chunk_size: int = Query(
20,
ge=1,
le=50,
description="Used to determine how many cleaning feed item objects to return in the response"
),
starting_date: datetime.datetime = Query(
datetime.datetime.now() + datetime.timedelta(minutes=10),
description="Used to determine the timestamp at which to begin querying for cleaning feed items."
),
feed_repository: FeedRepository = Depends(get_repository(FeedRepository)),
) -> List[CleaningFeedItem]:
return await feed_repository.fetch_cleaning_jobs_feed(
starting_date=starting_date, page_chunk_size=page_chunk_size,
)

We bring in FastAPI's Query class and use it add some validation and metadata to each query parameter. For page_chunk_size, we give it a default value of 20, we ensure that it's an integer between 1 and 50, and we add a nice little description. Our starting_date query param is simpler, as we just add a default value and description.

What's cool about this approach is that both of these descriptions will now be included in the free documentation supplied by OpenAPI.

Next, we'll go into our FeedRepository and update it to use our new query params.

db/repositories/feed.py
from typing import List
import datetime
from app.db.repositories.base import BaseRepository
from app.models.feed import CleaningFeedItem
FETCH_CLEANING_JOBS_FOR_FEED_QUERY = """
SELECT id,
name,
description,
price,
cleaning_type,
owner,
created_at,
updated_at,
CASE
WHEN created_at = updated_at THEN 'is_create'
ELSE 'is_update'
END AS event_type,
GREATEST(created_at, updated_at) AS event_timestamp,
ROW_NUMBER() OVER ( ORDER BY GREATEST(created_at, updated_at) DESC ) AS row_number
FROM cleanings
WHERE GREATEST(created_at, updated_at) < :starting_date
ORDER BY GREATEST(created_at, updated_at) DESC
LIMIT :page_chunk_size;
"""
class FeedRepository(BaseRepository):
async def fetch_cleaning_jobs_feed(
self, *, starting_date: datetime.datetime, page_chunk_size: int = 20
) -> List[CleaningFeedItem]:
cleaning_feed_item_records = await self.db.fetch_all(
query=FETCH_CLEANING_JOBS_FOR_FEED_QUERY,
values={"starting_date": starting_date, "page_chunk_size": page_chunk_size},
)
return [CleaningFeedItem(**item) for item in cleaning_feed_item_records]

That's all that's needed! It might not look like a big change, but by simply including that WHERE statement, we've implemented a pagination system that doesn't rely on the OFFSET clause. Why are we going through all this trouble just to avoid using OFFSET?

Many pagination libraries use OFFSET by default, as it's the most direct approach to selecting rows NN through N+chunkN + chunk from a given query. However this approach becomes less performant on each subsequent pagination request. As The Art of Postgres puts it:

The offset clause is going to cause your SQL query plan to read all the results anyway and then discard most of it until reaching the offset count. When paging through additional results, it’s less and less efficient with each additional page you fetch that way.

Huh! Good to know.

We navigate around this pitfall by using the LIMIT and WHERE clauses to implement pagination. By passing our query a starting_date parameter, we're able to select only the cleaning jobs with an event_timestamp that occurs before this date. Then, we simply limit the number of results returned by our query to the proper page_chunk_size. In the next request, we just have to get the event_timestamp of the last result in our previous query, and use it to fetch the next page.

It might even be smart to implement a package that abstracts all of this away for us. One approach might be to include a pagination attribute with each response, and attach preformatted urls representing the prev and next pages as is recommended in this StackOverflow answer.

Or we could skip all that noise and just use the fastapi-pagination library.

Run the tests again and watch them pass. Look at us go! We implemented pagination from scratch.

Actually, before we get ahead of ourselves. Let's beef up that test and make sure it's still passing.

tests/test_feed.py
# ...other code
import datetime
# ...other code
class TestCleaningFeed:
# ...other code
async def test_cleaning_feed_can_paginate_correctly(
self,
*,
app: FastAPI,
authorized_client: AsyncClient,
test_list_of_new_and_updated_cleanings: List[CleaningInDB],
) -> None:
starting_date = datetime.datetime.now() + datetime.timedelta(minutes=10)
combos = []
for chunk_size in [25, 15, 10]:
res = await authorized_client.get(
app.url_path_for("feed:get-cleaning-feed-for-user"),
params={"starting_date": starting_date, "page_chunk_size": chunk_size},
)
assert res.status_code == status.HTTP_200_OK
page_json = res.json()
assert len(page_json) == chunk_size
id_and_event_combo = set(f"{item['id']}-{item['event_type']}" for item in page_json)
combos.append(id_and_event_combo)
starting_date = page_json[-1]["event_timestamp"]
# Ensure that none of the items in any response exist in any other response
length_of_all_id_combos = sum(len(combo) for combo in combos)
assert len(set().union(*combos)) == length_of_all_id_combos

Our upgraded test case now makes three requests. We do so by iterating over different page chunk sizes and make a request to the cleaning feed endpoint with the chunk_size and starting_date. Then, we compose the id and event_type of each CleaningFeedItem in the response into a single string and store that set in the combos list. Before moving on to the next iteration, we grab the last item in the list and store the latest event_timestamp in the starting_date variable.

We check to make sure that each CleaningFeedItem is unique across all requests and call it a day. The reason why we use both the id and event_type to create a unique identifier, is that our feed should include an event for when a cleaning job is created, and another event for when that cleaning job is updated. But does it?

Fixing the Hidden Bug

Readers who didn't get the 10 imaginary points for identifying the key problem before can still submit their work late and get 9 imaginary points (I was a very lenient grader during my teaching career).

Without spoiling it, let's write a test case to identify our bug.

Add one last test case to our TestCleaningFeed class.

tests/test_feed.py
# ...other code
from collections import Counter
# ...other code
class TestCleaningFeed:
# ...other code
async def test_cleaning_feed_has_created_and_updated_items_for_modified_cleaning_jobs(
self,
*,
app: FastAPI,
authorized_client: AsyncClient,
test_list_of_new_and_updated_cleanings: List[CleaningInDB],
) -> None:
res_page_1 = await authorized_client.get(
app.url_path_for("feed:get-cleaning-feed-for-user"), params={"page_chunk_size": 30},
)
assert res_page_1.status_code == status.HTTP_200_OK
ids_page_1 = [feed_item["id"] for feed_item in res_page_1.json()]
new_starting_date = res_page_1.json()[-1]["updated_at"]
res_page_2 = await authorized_client.get(
app.url_path_for("feed:get-cleaning-feed-for-user"),
params={"starting_date": new_starting_date, "page_chunk_size": 33},
)
assert res_page_2.status_code == status.HTTP_200_OK
ids_page_2 = [feed_item["id"] for feed_item in res_page_2.json()]
# should have duplicate IDs for the 13 updated events - an `is_create` event and an `is_update` event
id_counts = Counter(ids_page_1 + ids_page_2)
assert len([id for id, cnt in id_counts.items() if cnt > 1]) == 13

Run the tests and watch them fail.

We grab a total of 63 items across two requests, ensuring that all 50 create events and the 13 update events are included. We then use a Counter to assert that 13 of the events appear twice in our results set - one for is_create and one for is_update.

The tests are failing because we're not actually doing that. Our SQL query is simply fetching all cleaning jobs and either putting them in the update or create buckets. We'll need to make sure that separate events are created for a created cleaning jobs and an updated cleaning job.

Warning, this SQL query is about to get huge.

db/repositories/feed.py
from typing import List
import datetime
from databases import Database
from app.db.repositories.base import BaseRepository
from app.models.feed import CleaningFeedItem
FETCH_CLEANING_JOBS_FOR_FEED_QUERY = """
SELECT id,
name,
description,
price,
cleaning_type,
owner,
created_at,
updated_at,
event_type,
event_timestamp,
ROW_NUMBER() OVER ( ORDER BY event_timestamp DESC ) AS row_number
FROM (
(
SELECT id,
name,
description,
price,
cleaning_type,
owner,
created_at,
updated_at,
updated_at AS event_timestamp,
'is_update' AS event_type
FROM cleanings
WHERE updated_at < :starting_date AND updated_at != created_at
ORDER BY updated_at DESC
LIMIT :page_chunk_size
) UNION (
SELECT id,
name,
description,
price,
cleaning_type,
owner,
created_at,
updated_at,
created_at AS event_timestamp,
'is_create' AS event_type
FROM cleanings
WHERE created_at < :starting_date
ORDER BY created_at DESC
LIMIT :page_chunk_size
)
) AS cleaning_feed
ORDER BY event_timestamp DESC
LIMIT :page_chunk_size;
"""
class FeedRepository(BaseRepository):
async def fetch_cleaning_jobs_feed(
self, *, starting_date: datetime.datetime, page_chunk_size: int = 20
) -> List[CleaningFeedItem]:
cleaning_feed_item_records = await self.db.fetch_all(
query=FETCH_CLEANING_JOBS_FOR_FEED_QUERY,
values={"starting_date": starting_date, "page_chunk_size": page_chunk_size},
)
return [CleaningFeedItem(**item) for item in cleaning_feed_item_records]

Wow wow wow. What just happened?

We had a nice simple query that has now ballooned into a massive one. What gives?

The query itself hasn't really gotten bigger. It's just now selecting from a subquery that is particularly large.

If we remove the subquery, we can see that the query itself is actually simpler:

SELECT id,
name,
description,
price,
cleaning_type,
owner,
created_at,
updated_at,
event_type,
event_timestamp,
ROW_NUMBER() OVER ( ORDER BY event_timestamp DESC ) AS row_number
FROM (
-- BIG 'OL SUBQUERY
) AS cleaning_feed
ORDER BY event_timestamp DESC
LIMIT :page_chunk_size;

There are no more CASE...WHEN or GREATER conditional expressions and we've extracted the WHERE clause out of the main query. We're also simply selecting the columns of interest in accordance with what our CleaningFeedItem model needs.

So then what's going on with the subquery?

(
SELECT id,
name,
description,
price,
cleaning_type,
owner,
created_at,
updated_at,
updated_at AS event_timestamp,
'is_update' AS event_type
FROM cleanings
WHERE updated_at < :starting_date AND updated_at != created_at
ORDER BY updated_at DESC
LIMIT :page_chunk_size
) UNION (
SELECT id,
name,
description,
price,
cleaning_type,
owner,
created_at,
updated_at,
created_at AS event_timestamp,
'is_create' AS event_type
FROM cleanings
WHERE created_at < :starting_date
ORDER BY created_at DESC
LIMIT :page_chunk_size
)

As we can see here, it's actually two subqueries that we combine with the UNION set operation. What this does is effectively append the result of the second subquery to the result of the first subquery, filtering out any duplicates. We don't have to worry about duplicates here as the first subquery will always have the value is_update for its event_type and the second subquery will always have the value is_create. If we wanted to ensure that duplicates were kept, we could use the UNION ALL set operation.

Each of these subqueries looks similar to our original query, and they are. There are a few key difference between the two.

  • The first subquery includes only rows where the cleaning job has been updated at some point. The second subquery includes them all. The distinction occurs in the WHERE clause by using the updated_at != created_at condition to select only updated rows.
  • For any row returned by the first subquery, we set the event_type to is_update, while the second subquery instead uses is_create.
  • We use the updated_at column as the event_timestamp for all rows returned by the first subquery, while the second subquery uses the created_at column.
  • In turn, whatever timestamp is used for event_timestamp is also used for filtering in the WHERE clause and for ordering in the ORDER BY clause.

We make sure to limit each subquery to the proper page_chunk_size and return the combination of those two queries. Our main query then selects the correct columns, orders them appropriately, and limits the result set to exactly the right page_chunk_size again.

And just like that, we're good to go.

Run the tests again and watch them all pass. We've done it!

One last improvement before we call it quits for today. We'll go ahead and populate each CleaningFeedItem model with the profile of the user who owns it.

db/repositories/feed.py
from typing import List
import datetime
from databases import Database
from asyncpg import Record
from app.db.repositories.base import BaseRepository
from app.db.repositories.users import UsersRepository
from app.models.user import UserInDB
from app.models.feed import CleaningFeedItem
FETCH_CLEANING_JOBS_FOR_FEED_QUERY = """
SELECT id,
name,
description,
price,
cleaning_type,
owner,
created_at,
updated_at,
event_type,
event_timestamp,
ROW_NUMBER() OVER ( ORDER BY event_timestamp DESC ) AS row_number
FROM (
(
SELECT id,
name,
description,
price,
cleaning_type,
owner,
created_at,
updated_at,
updated_at AS event_timestamp,
'is_update' AS event_type
FROM cleanings
WHERE updated_at < :starting_date AND updated_at != created_at
ORDER BY updated_at DESC
LIMIT :page_chunk_size
) UNION (
SELECT id,
name,
description,
price,
cleaning_type,
owner,
created_at,
updated_at,
created_at AS event_timestamp,
'is_create' AS event_type
FROM cleanings
WHERE created_at < :starting_date
ORDER BY created_at DESC
LIMIT :page_chunk_size
)
) AS cleaning_feed
ORDER BY event_timestamp DESC
LIMIT :page_chunk_size;
"""
class FeedRepository(BaseRepository):
def __init__(self, db: Database) -> None:
super().__init__(db)
self.users_repo = UsersRepository(db)
async def fetch_cleaning_jobs_feed(
self, *, starting_date: datetime.datetime, page_chunk_size: int = 20
) -> List[CleaningFeedItem]:
cleaning_feed_item_records = await self.db.fetch_all(
query=FETCH_CLEANING_JOBS_FOR_FEED_QUERY,
values={"starting_date": starting_date, "page_chunk_size": page_chunk_size},
)
return [
await self.populate_cleaning_feed_item(cleaning_feed_item=cleaning_feed_item)
for cleaning_feed_item in cleaning_feed_item_records
]
async def populate_cleaning_feed_item(self, *, cleaning_feed_item: Record) -> CleaningFeedItem:
return CleaningFeedItem(
**{k: v for k, v in cleaning_feed_item.items() if k != "owner"},
owner=await self.users_repo.get_user_by_id(user_id=cleaning_feed_item["owner"])
)

We've brought in the UsersRepository and attached it in the FeedRepository constructor. Then, we pass each cleaning feed item to our new populate_cleaning_feed_item method and attach the user's profile to the item.

After putting a nice finishing touch on this feature, we run our tests one last time and watch them all pass.

While everything works as expected, this isn't a perfect design.

Because we're simply querying data that already exists in the database, if users update an event multiple times, we'll only serve a single update event. We also won't have access to what fields in a cleaning job were changed, since we're not storing that in our database. If we wanted to upgrade this sytem, we could create an activity table in postgres and store metadata about each event on our site. For now, our current system will do.

Wrapping Up And Resources

We now have a fully functional feed endpoint that accepts query parameters and returns a paginated response of events. Each event represents a user creating a new cleaning job or a user updating an existing one. In the next post we'll create a few additional UI components, add a feed slice to redux, and design a brand new page to host our feed content.

  • FastAPI query params docs
  • FastAPI query params validation docs
  • FastAPI query params metadata docs
  • Postgres window functions docs
  • Postgres conditional expressions docs
  • Postgres limit and offset clauses docs
  • Postgres union set operation docs
  • Markus Winand article on the pitfalls of using OFFSET
  • Fastapi-pagination repo