# Taxi trip duration forecasting

First things first, you need a running Beaver instance. For example, you could download this repository, and run `docker compose up.`

Next, we need some data to work with. In this example, we're looking at taxi trips in New-York city. Running `simulate.py` will insert data into two topics. The first topic contains all the taxi departures. The topic is located in the default Redpanda broker provided by Beaver.

## Creating a project

Now begins this notebook. The first thing to do is connect to the Beaver instance.

In [1]:
import beaver_sdk

sdk = beaver_sdk.Instance(host='http://127.0.0.1:8000')

You can connect your own infrastructure to Beaver. In that sense, Beaver is an orchestrator of sorts. Beaver also ships with out-of-the-box infrastructure. In any case, we have to tell Beaver what infrastructure to use.

In [37]:
mb = sdk.message_bus.create(name='rp-mb', protocol='REDPANDA', url='redpanda:29092')
sp = sdk.stream_processor.create(name='mz-sp', protocol='MATERIALIZE', url='postgres://materialize@materialize:6875/materialize')
jr = sdk.job_runner.create(name='rq-jr', protocol='RQ', url='redis://redis:6379/0')

Now we can create a project. We specify the name of each infrastructure piece.

In [38]:
project = sdk.project.create(
    name='taxi_trips',
    task='REGRESSION',
    message_bus_name='rp-mb',
    stream_processor_name='mz-sp',
    job_runner_name='rq-jr',
)

Note that the infrastructure we defined could be reused in different projects, so you only have to set it up once.

## Connecting to some raw data

We're using Redpanda to store the raw data. We wish to use Materialize to process this data, and turn it into features and targets to feed into a machine learning model.

There's a little bit of prior work necessary. Basically, Materialize has to be connected to Redpanda by creating some views. As of now, this isn't something Beaver does automatically.

In [39]:
# The following is for idempotency
project.stream_processor.execute('DROP VIEW IF EXISTS basic_features')
project.stream_processor.execute('DROP VIEW IF EXISTS taxi_departures')
project.stream_processor.execute('DROP SOURCE IF EXISTS taxi_departures_src')

project.stream_processor.execute("""
CREATE MATERIALIZED SOURCE taxi_departures_src
FROM KAFKA BROKER 'redpanda:29092' TOPIC 'taxi-departures'
    KEY FORMAT TEXT
    VALUE FORMAT BYTES
    INCLUDE KEY AS trip_no, TIMESTAMP AS received_at;
""")

project.stream_processor.execute("""
CREATE VIEW taxi_departures AS (
    SELECT
        trip_no,
        received_at,
        CAST(CONVERT_FROM(data, 'utf8') AS JSONB) AS trip
    FROM taxi_departures_src
)
""")

Let's do the same for taxi arrivals.

In [40]:
# The following is for idempotency
project.stream_processor.execute('DROP VIEW IF EXISTS taxi_trips_target')
project.stream_processor.execute('DROP VIEW IF EXISTS taxi_arrivals')
project.stream_processor.execute('DROP SOURCE IF EXISTS taxi_arrivals_src')

project.stream_processor.execute("""
CREATE MATERIALIZED SOURCE taxi_arrivals_src
FROM KAFKA BROKER 'redpanda:29092' TOPIC 'taxi-arrivals'
    KEY FORMAT TEXT
    VALUE FORMAT BYTES
    INCLUDE KEY AS trip_no, TIMESTAMP AS received_at;
""")

project.stream_processor.execute("""
CREATE VIEW taxi_arrivals AS (
    SELECT
        trip_no,
        received_at,
        CAST(CONVERT_FROM(data, 'utf8') AS JSONB) AS arrival
    FROM taxi_arrivals_src
)
""")

## Streaming targets

To train a model, we need a target. Beaver also encourages you to define this target with SQL. For this example, we'll predict the duration in seconds of each trip, which is a regression task.

In [41]:
query = """
SELECT
    trip_no,
    received_at,
    CAST(arrival ->> 'duration' AS INTEGER) AS duration
FROM taxi_arrivals
"""

project.target.set(
    query=query,
    key_field='trip_no',
    ts_field='received_at',
    value_field='duration'
)

## Streaming features

We also need some features. Let's start simple and calculate two features based on the distance between the pick-up and drop-off locations, as well as some basic temporal features.

In [None]:
query = """
SELECT 
    trip_no,
    received_at,
    jsonb_build_object(
        'manhattan_distance', ABS(dropoff_lat - pickup_lat) + ABS(dropoff_lon - pickup_lon),
        'euclidean_distance', SQRT(POWER(dropoff_lat - pickup_lat, 2) + POWER(dropoff_lon - pickup_lon, 2)),
        'pickup_hour', EXTRACT(HOUR FROM pickup_datetime),
        'is_monday', EXTRACT(DOW FROM pickup_datetime) = 1,
        'is_tuesday', EXTRACT(DOW FROM pickup_datetime) = 2,
        'is_wednesday', EXTRACT(DOW FROM pickup_datetime) = 3,
        'is_thursday', EXTRACT(DOW FROM pickup_datetime) = 4,
        'is_friday', EXTRACT(DOW FROM pickup_datetime) = 5,
        'is_saturday', EXTRACT(DOW FROM pickup_datetime) = 6,
        'is_sunday', EXTRACT(DOW FROM pickup_datetime) = 7
    ) AS features
FROM (
    SELECT
        trip_no,
        received_at,
        CAST(trip ->> 'dropoff_latitude' AS FLOAT) AS dropoff_lat,
        CAST(trip ->> 'pickup_latitude' AS FLOAT) AS pickup_lat,
        CAST(trip ->> 'dropoff_longitude' AS FLOAT) AS dropoff_lon,
        CAST(trip ->> 'pickup_longitude' AS FLOAT) AS pickup_lon,
        CAST(trip ->> 'pickup_datetime' AS TIMESTAMP) AS pickup_datetime
    FROM taxi_departures
)
"""

project.feature_set.create(
    name='basic_features',
    query=query,
    key_field='trip_no',
    ts_field='received_at',
    value_field='features'
)

## Create a first experiment

An experiment is essentially a model and a feature set. We'll start with a plain and simple linear regression, applied to the basic features defined previously.

In [50]:
from river import linear_model, preprocessing

model = preprocessing.StandardScaler() | linear_model.LinearRegression()

project.experiment('linear_regression').delete()
project.experiment.create(
    name='linear_regression',
    feature_set_name='basic_features',
    model=model
)

<beaver_sdk.Experiment at 0x16bec5660>

In [53]:
project.state(with_experiments=True)

{'name': 'taxi_trips',
 'task': 'REGRESSION',
 'stream_processor_name': 'mz-sp',
 'created_at': '2023-05-15T16:23:49.872418',
 'message_bus_name': 'rp-mb',
 'job_runner_name': 'rq-jr',
 'experiments': {'linear_regression': {'n_learnings': 5,
   'n_predictions': 26,
   'mse': 244220.94789379952,
   'mae': 446.1701976434857}}}

## Delete everything

Order matters (because of dependencies).

In [36]:
project = sdk.project('taxi_trips')
project.experiment('linear_regression').delete()
project.feature_set('basic_features').delete()
project.target.delete()
project.delete()
sdk.message_bus('rp-mb').delete()
sdk.stream_processor('mz-sp').delete()
sdk.job_runner('rq-jr').delete()

<Response [204]>

In [31]:
import psycopg2
import sys

dsn = "user=materialize password=materialize host=localhost port=6875 dbname=materialize sslmode=disable"
conn = psycopg2.connect(dsn)
conn.autocommit = True

with conn.cursor() as cur:
    cur.execute(f"DROP VIEW IF EXISTS linear_regression_dataset")
    cur.execute(f"DROP VIEW IF EXISTS taxi_trips_target")
    cur.execute(f"DROP VIEW IF EXISTS taxi_trips_predictions")
    cur.execute(f"DROP VIEW IF EXISTS basic_features")
    cur.execute(f"DROP VIEW IF EXISTS taxi_arrivals")
    cur.execute(f"DROP VIEW IF EXISTS taxi_departures")