When looking for my current job, I started looking by replying to tech recruiters offering me the 5th weekly Java, PHP or Node.js position. I replied with what I was actually looking for in a new position and somehow some of them suddenly had roles up their sleeves. If they would have read my “very interesting profile and experience” in the first place, we both could have saved some time here.
The task
Anyhow, in mid January 2020 I had a meet and greet phone interview with this company where a linkedin recruiter saw me as a fitting match and the company also liked my CV at first glance. After talking to the technical director they wanted to go further in the process by giving me a take-home assignment:
We would like you to show your approach in creating a simple data pipeline.
- Take the data from https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
- Please write a Python program that calculates the average trip length of all Yellow Taxis for a month.
- Extend this to a data pipeline that can ingest new data and calculates the 45 day rolling average trip length.
- Make sure your program could be run in a production setup.
Additionally, please document how you would scale your pipeline to a multiple of the data size that does not fit any more to one machine.
The description is rather small, which first surprised me. There were 2 roles to fill: Senior Software Engineer and Senior Data Engineer. So I guess they also evaluate how much effort in docs, testing, design etc. is put into this task to probe seniority.
The implementation
First of all, they did not give me a time limit on that assignment. I told them I had a newborn son at home and was on parental leave, so I could only squeeze single hours in here and there to work on it. They were very understanding and just agreed to my conditions.
For the last few months (maybe 3) working for ABOUT YOU after they disbanded my former team, I had joined the BI team to help them out building an ETL pipeline to generate reports. So I had minimal experience in data engineering using pandas and AirFlow.
Scaffold
So first the usual python module “boilerplate”. Put in quotes, because I never felt it was too much:
tox.ini
setup.py|cfg
(I really like pbr)Makefile
(personal preference and convenience)Pipfile
(There are alsorequirements.txt
andrequrements-test.txt
files which might feel redundant, but at the time writing the module I could not find a smooth way to make tox work with pipenv. Both these files were generated, so the Pipfile always held the truth)
Data mangling
I remembered from my time in the business intelligence team that before doing anything with data you need inspect it first. What’s the schema? How is the data quality? What information do we actually have in what formats? So I downloaded one dataset, spun up a yupiter notebook and just checked what I was dealing with. (Here is a small sample)
What I also remembered from working with pandas dataframes is that small
methods that take a dataframe and return a dataframe become pretty handy since
these operations are easily chainable with dataframe.pipe()
.
There was no direct data about the trip duration, so let’s calculate them from
tpep_pickup_datetime
and tpep_dropoff_datetime
:
def load_csv(fname: str) -> pd.DataFrame:
return pd.read_csv(
fname,
usecols=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
)
def rename_columns(df: pd.DataFrame) -> pd.DataFrame:
return df.rename(
columns={"tpep_pickup_datetime": "pickup", "tpep_dropoff_datetime": "dropoff"}
)
def calculate_durations(df: pd.DataFrame) -> pd.DataFrame:
"""Durations in seconds"""
df = df.copy()
df["duration"] = (df["dropoff"] - df["pickup"]).dt.seconds
return df
Chaining these functions results in a dataframe with 3 columns pickup
,
dropoff
, duration
with a lot of datapoints.
Data processing
To calculate average durations the dataset needs to be grouped by day on either pickup or dropoff. Luckily pandas offers dataframe.resample(), which is a “convenience method for frequency conversion and resampling of time series”. The dataframe just needs a datatime-like index to do so:
def reindex_on_pickup(df: pd.DataFrame) -> pd.DataFrame:
return df.set_index("pickup")
def daily_average_durations(df: pd.DataFrame) -> pd.DataFrame:
return df.resample("D").mean()
def monthly_average_durations(df: pd.DataFrame) -> pd.DataFrame:
return df.resample("M").mean()
Task part 1: Python program to calculate average trip duration for a given month
At least this is how I interpreted “Please write a Python program that calculates the average trip length of all Yellow Taxis for a month.".
I had noticed, that in the datasets for a specific month-year there were also data points in there which were actually not from that month-year, so I also added a method that specifically filtered the downloaded dataset for the requested time span. After that I had everything in place to finish part one of the assignment. Because for command line interfaces written in python I reall like click, I went with it basically only had to assemble and this is the result:
@click.command()
@click.argument("year-month", type=click.DateTime(formats=["%Y-%m"]), required=1)
@click.pass_context
def average_trip_duration(ctx, year_month):
"""Calculate the average NY yellow cab trip duration in YYYY-MM"""
url = get_url(year_month)
try:
fname = download(url)
except requests.exceptions.RequestException as e:
click.echo(f"{e}")
ctx.abort()
df = (
p.load_csv(fname)
.pipe(p.rename_columns)
.pipe(p.filter_by_month_year, dt=year_month)
.pipe(p.calculate_durations)
.pipe(p.reindex_on_pickup)
.pipe(p.monthly_average_durations)
)
duration = int(round(df["duration"][0]))
click.echo(
f"The average trip duration in "
f"{year_month.month:02d}/{year_month.year} was {duration} seconds."
)
Task part 2: Extend this to a data pipeline that calculates the 45 day rolling average trip length
From my short episode as a data engineer I already knew AirFlow, but for this assignment it felt a bit heavy. So I looked for a more lightweight alternative and found luigi.
Both their principles are very similar: Small tasks are defined which can have other tasks as dependencies. Quite similar to targets in a Makefile a task only starts after all required tasks have succeeded. Because cyclic dependencies are not allowed (and don’t make sense) the dependency graph results in a so-called DAG (directed acyclic graph). An even more lightweight alternative would have been a simple Makefile orchestrating pandas scripts. But the task required a kind of production ready thing and luigi offers some handy things like a scheduler, workers, smooth task transitions and a visual dashboard to monitor progress to just name a few.
In my solution I tried to keep all tasks pretty atomic. They are either fetching data, transforming data or persisting data in the database. This way on failure and retry the whole pipeline can be resumed at the last successful step. For simplicity temp data resulting from intermediate tasks are written to local files (csv or pickled dataframes) and only final results which were asked by the task are written to a database (sqlite in this case). In a distributed system with workers on different machines/containers the local file system wouldn’t make sense for the intermediate data. It would be written to more central storages like temp tables in a database or a shared mount, so any worker can pick up where the former one finished.
The whole luigi part is more or less small task-wrappers around the already existing functions: yellowcabs/luigi.py
It is storing the daily averages for every processed set of data points over a one month time span in a database. This “unlocks” the calculation of a 45 day rolling average when run at least 2 consecutive months.
The final result
If you read this far, you might be interested in the whole thing I handed in as a solution for the take-home assignment. You can find it at github.com/dermorz/yellocabs.
Shortly after handing it in, I was invited for a final interview which went pretty good. They criticized 2 things about my solution:
- A missing end-to-end integration test.
- Very little in-line comments and doc strings.
On the first one I can kind of agree. It wouldn’t have been very much more effort and would have made sure all the unit-tested functions align and give the desired result. On the second one I respectfully disagreed, because I am not a friend of redundant comments and doc strings if variable- and method-names speak for themselves. I have used a few doc strings where I thought they were necessary but maybe other engineers would have disagreed.
After that final interview they made me a pretty good offer, but I turned it down in favor of my current job. It was a close call between these two offers, but there were a few good personal and professional reasons to justify my choice. Now after 2.5 months in it still feels like the right decision.