import requests
import polars as pl
import os
import itertools as it
import us
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor, as_completed
load_dotenv()
# states and years to pull ACS 5-year county-level estimates for
= [state.fips for state in us.states.STATES]
states = [2020, 2021]
years
# list of tuples representing cartesian product of individual state and year lists
= list(it.product(states, years))
state_year_pairs
def get_acs(
str,
state_fips: int,
year: str = None,
api_key: dict = {
acs_vars: "B01001_001E": "pop_total",
"B01002_001E": "median_age",
"B19013_001E": "med_hh_income",
"B23025_005E": "unemployed",
"B25077_001E": "med_home_value",
"B25064_001E": "med_rent",
}
):
= (
url f"https://api.census.gov/data/{year}/acs/"
f"acs5?get={','.join(acs_vars.keys())}"
f"&for=county:*&in=state:{state_fips}"
f"&key={api_key}"
)
# issue request and parse resulting json
try:
= requests.get(url)
response
response.raise_for_status()= response.json()
data if len(data)>1:
= pl.DataFrame(
acs_df 1:],
data[=data[0],
schema="row",
orient=pl.lit(year))
).rename(acs_vars).with_columns(yearreturn acs_df
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
return None
Background
As part of a machine learning class I’ve been building for a client, I’ve provided the option to join county-level ACS estimates onto the training and prediction data. County data require state-level calls to the Census API, and including all states and a few years, sequential pulls were proving to be a significant bottleneck. Swapping the sequential process out in favor of a pooled one yielded dramatic time savings. This post demonstrates that change via an abstracted, simplified example.
Data Ingestion
The get_acs
function pulls county-level 5-year ACS estimates from the Census API for a single state and year. We also do all the importing and environment variable creation we’ll need for both versions of subsequent steps (we want those to be as close to apples-to-apples as possible since we’ll be comparing timing).
Sequential Implementation via List Comprehension
Here we grab and stack the data for all states and two data years via list comprehension.
%%time
= [
all_acs =os.getenv("CENSUS_API_KEY"))
get_acs(state_fips, year, api_keyfor state_fips, year in state_year_pairs
]
# concatenate all results
if all_acs:
= pl.concat(all_acs)
stacked_acs print(f"Data stacked and {stacked_acs.height} records returned")
else:
print("No data was fetched.")
Data stacked and 6284 records returned
CPU times: total: 46.9 ms
Wall time: 1min 12s
A Threaded Alternative
Here we leverage the concurrent.futures
module from the standard Python library to issue all API calls simultaneously, gathering results asynchronously as they become available.
%%time
= []
all_acs with ThreadPoolExecutor(max_workers=len(state_year_pairs)) as executor:
= {
futures
executor.submit(
get_acs,
state_fips,
year, =os.getenv("CENSUS_API_KEY")
api_key
): (state_fips, year)for state_fips, year in state_year_pairs
}for future in as_completed(futures.keys()):
= futures[future]
pair try:
= future.result()
result if result is not None:
all_acs.append(result)except Exception as e:
print(f"ERROR: Something went wrong for state={pair[0]}, year={pair[1]}: {e}")
# concatenate all results
if all_acs:
= pl.concat(all_acs)
stacked_acs print(f"Data stacked and {stacked_acs.height} records returned")
else:
print("No data was fetched.")
Data stacked and 6284 records returned
CPU times: total: 46.9 ms
Wall time: 2.6 s
In this alternative, ThreadPoolExecutor
is used as a context manager to issue get_acs
calls and “future states” are gathered via a dictionary comprehension. Note, however, that this dictionary flips the script a bit with the returned data frames in the key position and the state, year tuples they correspond to in the values position. This is possible because the returned data frames are unique / hashable, and it allows us to report out for which state and year a failure occurs.
As results are returned, they are assessed and appended to the all_acs
list for subsequent concatenation. As shown by the timing stats under each code chunk, we see a significant speed up from over 1 minute to just a handful of seconds. Note also that we specify a max_workers
count equivalent to the number of times we will be hitting the API. For thread pooling, we’re not limited to our machine’s core count, as we’re not running a truly parallelized process that utilizes multiple cores simultaneously.
Citation
@online{couzens2025,
author = {Couzens, Lance},
title = {Thread {Pooling} a {Python} {Process} to {Build} a {Stacked}
{ACS} {DataFrame}},
date = {2025-08-05},
url = {https://mostlyunoriginal.github.io/posts/2025-08-05-Python-Thread-Pooling/},
langid = {en}
}