Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Airflow@1.10/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "viewflow"
version = "0.1.0"
version = "0.2.0"
description = "Viewflow is an Airflow-based framework that allows data scientists to create data models without writing Airflow code."
authors = ["Vincent Vankrunkelsven <vincent@datacamp.com>", "Ramnath Vaidyanathan <ramnath@datacamp.com>", "Gaëtan Podevijn <gaetan@datacamp.com>"]
readme = "README.md"
Expand Down
32 changes: 23 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Do you want more context on why we built and released Viewflow? Check out our an

## Viewflow demo

We created a demo that shows how Viewflow works. The demo creates multiple DAGs: `viewflow-demo-1` through `viewflow-demo-4`. These DAGs create a total of four views in a local Postgres database. Check out the view files in [demo/dags/](./demo/dags/). Some of the following commands are different based on which Airflow version you're using. For new users, Airflow 2 is the best option. However, you can also run the demo using the older Airflow 1.10 version by using the indicated commands.
We created a demo that shows how Viewflow works. The demo creates multiple DAGs: `viewflow-demo-1` through `viewflow-demo-3`. These DAGs create a total of four views in a local Postgres database. Check out the view files in [demo/dags/](./demo/dags/). Some of the following commands are different based on which Airflow version you're using. For new users, Airflow 2 is the best option. However, you can also run the demo using the older Airflow 1.10 version by using the indicated commands.

### Run the demo
We use `docker-compose` to instantiate an Apache Airflow instance and a Postgres database. The Airflow container and the Postgres container are defined in the `docker-compose-airflow<version>.yml` files. The first time you want to run the demo, you will first have to build the Apache Airflow docker image that embeds Viewflow:
Expand All @@ -28,13 +28,12 @@ docker-compose -f docker-compose-airflow2.yml up # Airflow 2
docker-compose -f docker-compose-airflow1.10.yml up # Airflow 1.10
```

Go to your local Apache Airflow instance on [http://localhost:8080](http://localhost:8080). There are four DAGs called `viewflow-demo-1` through `viewflow-demo-4`. Notice how Viewflow automatically generated these DAGs based on the example queries in the subfolders of [demo/dags/](./demo/dags/)!
Go to your local Apache Airflow instance on [http://localhost:8080](http://localhost:8080). There are three DAGs called `viewflow-demo-1` through `viewflow-demo-3`. Notice how Viewflow automatically generated these DAGs based on the example queries in the subfolders of [demo/dags/](./demo/dags/)!

<img src="./img/viewflow-demo-1.png" width="800">

<img src="./img/viewflow-demo-2.png" width="800">
By default, the DAGs are disabled. You will first have to turn them on. This will trigger the DAGs.

By default, the DAGs are disabled. Turn the DAGs on by clicking on the button `Off`. This will trigger the DAGs.
<img src="./img/airflow_web_homepage.png" width="800">
<img src="./img/viewflow-demo-1.png" width="800">

### Query the views

Expand Down Expand Up @@ -193,7 +192,7 @@ Viewflow expects some metadata that must be included in the SQL and Python files
* **schema**: The name of the schema in which Viewflow creates the view. It's also used by Viewflow to create the dependencies.
* **connection_id**: Airflow connection name used to connect to the database (See Section [*Create an Airflow connection to your destination*](https://github.com/datacamp/viewflow#create-an-airflow-connection-to-your-destination)).

The newly created view has the same name as the filename of the SQL query, Python script or R(md) script.
The newly created view has the same name as the filename (actually the file stem, without extension) of the SQL query, Python script or R(md) script. Viewflow materializes the view in the database with this name, so it must be unique over all DAGs!

### SQL views

Expand Down Expand Up @@ -244,14 +243,14 @@ Please note that Viewflow expects the Python function that creates the view to h

### R views

Viewflow handles R scripts similar to the existing SQL and Python files. Additionally, there's an element of automatisation. You simply define the view in R code, Viewflow will automatically read the necessary tables and write the new view to the database. Note that you need to define the new view in the R script with the same name as the R script (which is also the name of the table where the view is materialized in the database).
Viewflow handles R scripts similar to the existing SQL and Python files. Additionally, there's an element of automatisation. You simply define the view in R code, Viewflow will automatically read the necessary tables and write the new view to the database. Note that you need to define the new view in the R script with the same name as the R script (which is also the name of the table of the materialized view in the database).

By default, other tables are expected to be referenced as `<schema_name>.<table_name>`.
This default behaviour can be changed by adding a new function in [dependencies_r_patterns.py](./viewflow/parsers/dependencies_r_patterns.py) and adding a line `dependency_function: <your_custom_function>` to the metadata of the R script. The script [user_xp_duplicate.R](./demo/dags/viewflow-demo-3/user_xp_duplicate.R) illustrates this.

### Rmd views

Rmd scripts can be used mostly like R scripts. For Rmd scripts, you do have to explicitly configure the automated reading and writing of tables by adding `automate_read_write: True` to the metadata. By default, the script is executed as is. The task [top_3_user_xp_duplicate.Rmd](./demo/dags/viewflow-demo-4/top_3_user_xp_duplicate.Rmd) contains an explanation of the usage of Rmd scripts.
Rmd scripts can be used mostly like R scripts. For Rmd scripts, you do have to explicitly configure the automated reading and writing of tables by adding `automate_read_write: True` to the metadata. By default, the script is executed as is. The task [top_3_user_xp_duplicate.Rmd](./demo/dags/viewflow-demo-3/top_3_user_xp_duplicate.Rmd) contains an explanation of the usage of Rmd scripts.


## Configuring callbacks
Expand All @@ -276,6 +275,21 @@ on_retry_callback: on_retry_callback_custom

Of course, options 1, 2 and 3 can be combined to efficiently configure the callbacks of a multitude of tasks.

## Incremental updates

SQL views offer an extra feature for advanced users: incremental updating. In some cases, it's possible to update the materialized view very efficiently instead of creating the view from scratch. We will illustrate the advantages and disadvantages of incremental updates with an example: [emails_blog.sql](./demo/dags/viewflow-demo-2/emails_blog.sql).

In the query, the `users` table is joined with the `notifications` table. Keep in mind that this query is run on a regular basis, e.g. every day. The key to understanding the incremental update is the filter in the query: the `notifications.updated_at` field is required to be at least as large as the maximal value in the "old" materialized view. This filter will effectively only select rows corresponding to recently created/changed rows in the `notifications` table. Viewflow will then make sure the selected rows are updated or inserted in the materialized view. Under the hood, this is implemented as in [this link](https://docs.aws.amazon.com/redshift/latest/dg/merge-replacing-existing-rows.html). For this to work, you have to specify the fields of the primary key of the materialized view in the metadata. In summary, there are 3 additional mandatory fields in the metadata: `type`, `primary_key` and `time_parameters`.

The main advantage is now clear: the incremental update is incredibly efficient, especially if you run the query frequently for a long time. A disadvantage also becomes clear in the example: you have to be careful about stale data. Because the example query only returns results corresponding to recently changed rows of the `notifications` table, changes to the `users.email` field can go unnoticed. If a user's email is changed while the `notifications` table stays the same, then the materialized view will still contain the old email address after running the incremental update! This issue could easily be solved by adding an `updated_at` field to the `users` table and also selecting recently changed rows from this table.

```sql
SELECT user_id, notification_mode, email, n.updated_at
FROM viewflow_raw.users u INNER JOIN viewflow_raw.notifications n ON n.user_id = u.id
WHERE
category = 'blog' AND
(u.updated_at >= {{min_time}} OR n.updated_at >= {{min_time}})
```

# Contributing to Viewflow

Expand Down
28 changes: 28 additions & 0 deletions demo/dags/viewflow-demo-2/emails_blog.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
---
owner: data@datacamp.com
type: IncrementalPostgresOperator
description: For all users, list the email address and notification mode of the category blog.
fields:
user_id: The user ID
notification_mode: The detailed mode for which notifications to receive
email: Email address of the user
schema: viewflow_demo
connection_id: postgres_demo
primary_key: [user_id]
time_parameters:
initial:
min_time: '''2020-01-01 12:00:00'''
update:
min_time: (SELECT max(updated_at) FROM viewflow_demo.emails_blog)
---
*/

SELECT user_id, notification_mode, email, updated_at
FROM
viewflow_raw.notifications n
INNER JOIN viewflow_raw.users u
ON n.user_id = u.id
WHERE
category = 'blog' AND
updated_at >= {{min_time}}
8 changes: 0 additions & 8 deletions demo/dags/viewflow-demo-4/config.yml

This file was deleted.

24 changes: 23 additions & 1 deletion demo/scripts/load_postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,26 @@ VALUES
(8, 5, '2021-01-21 10:00:00', '2021-01-21 14:00:00'),
(9, 1, '2021-02-15 10:00:00', '2021-02-16 14:00:00'),
(10, 1, '2021-03-15 10:00:00', '2021-03-15 14:00:00'),
(10, 3, '2021-03-16 10:00:00', '2021-03-16 14:00:00');
(10, 3, '2021-03-16 10:00:00', '2021-03-16 14:00:00');


DROP TABLE IF EXISTS viewflow_raw.notifications;
CREATE TABLE viewflow_raw.notifications (
user_id INTEGER,
category VARCHAR,
notification_mode VARCHAR,
updated_at TIMESTAMP,
PRIMARY KEY (user_id, category)
);
INSERT INTO
viewflow_raw.notifications (user_id, category, notification_mode, updated_at)
VALUES
(1, 'daily', 'off', '2021-12-01 12:00:00'),
(1, 'recommended', 'off', '2021-12-01 12:00:00'),
(1, 'blog', 'selection', '2021-12-01 12:00:00'),
(2, 'daily', 'all', '2022-11-01 12:00:00'),
(2, 'recommended', 'off', '2022-11-01 12:00:00'),
(2, 'blog', 'all', '2022-11-01 12:00:00'),
(3, 'daily', 'selection', '2023-10-01 12:00:00'),
(3, 'recommended', 'selection', '2023-10-01 12:00:00'),
(3, 'blog', 'all', '2023-10-01 12:00:00');
Binary file added img/airflow_web_homepage.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed img/viewflow-demo-2.png
Binary file not shown.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "viewflow"
version = "0.1.0"
version = "0.2.0"
license = "MIT"
description = "Viewflow is an Airflow-based framework that allows data scientists to create data models without writing Airflow code."
authors = ["Vincent Vankrunkelsven <vincent@datacamp.com>", "Ramnath Vaidyanathan <ramnath@datacamp.com>", "Gaëtan Podevijn <gaetan@datacamp.com>"]
Expand Down
99 changes: 99 additions & 0 deletions tests/adapters/test_postgres_incremental.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import viewflow
from datetime import datetime, date

from airflow.models import TaskInstance, Connection
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow import settings
from airflow.utils import db


def _create_postgres_session():
session = settings.Session()
conn = Connection(
conn_id="postgres_viewflow",
conn_type="postgres",
login="user",
password="passw0rd",
schema="viewflow",
host="localhost",
port=5432,
)
db.merge_conn(conn, session)
return session


def test_incremental_updates():
session = _create_postgres_session()

dag = viewflow.create_dag("./tests/projects/postgresql/incremental_operator")
task = dag.get_task("emails_blog")

with PostgresHook(postgres_conn_id="postgres_viewflow").get_conn() as conn:
with conn.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS viewflow.emails_blog")

# Table 'emails_blog' does not yet exist --> query must be run with initial time parameters
ti = TaskInstance(task, datetime(2020, 1, 1))
ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True, session=session)
with PostgresHook(postgres_conn_id="postgres_viewflow").get_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM viewflow.emails_blog")
(count,) = cur.fetchone()
assert count == 1

cur.execute("SELECT * FROM viewflow.emails_blog")
(user_id, notification_mode, email, updated_at, __view_generated_at) = cur.fetchone()
assert user_id == 1
assert notification_mode == "selection"
assert email == "test1@datacamp.com"
assert updated_at == datetime.strptime("2021-12-01 12:00:00", "%Y-%m-%d %H:%M:%S")
assert __view_generated_at == date.today()

# First incremental update --> additional rows are added (only 1 in this case)
ti = TaskInstance(task, datetime(2020, 1, 1))
ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True, session=session)
with PostgresHook(postgres_conn_id="postgres_viewflow").get_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM viewflow.emails_blog")
(count,) = cur.fetchone()
assert count == 2

# Second incremental update --> additional row is added
ti = TaskInstance(task, datetime(2020, 1, 1))
ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True, session=session)
with PostgresHook(postgres_conn_id="postgres_viewflow").get_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM viewflow.emails_blog")
(count,) = cur.fetchone()
assert count == 3

# User 1 disables the blog notifications
with PostgresHook(postgres_conn_id="postgres_viewflow").get_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
UPDATE viewflow.notifications
SET notification_mode='off', updated_at=timestamp '2024-9-01 12:00:00'
WHERE user_id=1 AND category='blog'
""")

# Third incremental update --> changed row must be updated
ti = TaskInstance(task, datetime(2020, 1, 1))
ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True, session=session)
with PostgresHook(postgres_conn_id="postgres_viewflow").get_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM viewflow.emails_blog")
(count,) = cur.fetchone()
assert count == 3

cur.execute("SELECT notification_mode, updated_at FROM viewflow.emails_blog WHERE user_id = 1")
(notification_mode, updated_at) = cur.fetchone()

assert notification_mode == "off"
assert updated_at == datetime.strptime("2024-9-01 12:00:00", "%Y-%m-%d %H:%M:%S")

# Restore change to user 1's notification mode
cur.execute("""
UPDATE viewflow.notifications
SET notification_mode='selection', updated_at=timestamp '2021-12-01 12:00:00'
WHERE user_id=1 AND category='blog';
""")
32 changes: 17 additions & 15 deletions tests/fixtures/load_postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,24 @@ VALUES
('test7@datacamp.com', 'testtest7'),
('test8@datacamp.com', 'testtest8');

DROP TABLE IF EXISTS viewflow.incremental_users;

CREATE TABLE viewflow.incremental_users (
user_id SERIAL,
email VARCHAR,
password VARCHAR
DROP TABLE IF EXISTS viewflow.notifications;
CREATE TABLE viewflow.notifications (
user_id INTEGER,
category VARCHAR,
notification_mode VARCHAR,
updated_at TIMESTAMP,
PRIMARY KEY (user_id, category)
);

INSERT INTO
viewflow.incremental_users (email, password)
viewflow.notifications (user_id, category, notification_mode, updated_at)
VALUES
('test_incremental1@datacamp.com', 'testtest1'),
('test_incremental2@datacamp.com', 'testtest2'),
('test_incremental3@datacamp.com', 'testtest3'),
('test_incremental4@datacamp.com', 'testtest4'),
('test_incremental5@datacamp.com', 'testtest5'),
('test_incremental6@datacamp.com', 'testtest6'),
('test_incremental7@datacamp.com', 'testtest7'),
('test_incremental8@datacamp.com', 'testtest8');
(1, 'daily', 'off', '2021-12-01 12:00:00'),
(1, 'recommended', 'off', '2021-12-01 12:00:00'),
(1, 'blog', 'selection', '2021-12-01 12:00:00'),
(2, 'daily', 'all', '2022-11-01 12:00:00'),
(2, 'recommended', 'off', '2022-11-01 12:00:00'),
(2, 'blog', 'all', '2022-11-01 12:00:00'),
(3, 'daily', 'selection', '2023-10-01 12:00:00'),
(3, 'recommended', 'selection', '2023-10-01 12:00:00'),
(3, 'blog', 'all', '2023-10-01 12:00:00');
4 changes: 4 additions & 0 deletions tests/projects/postgresql/incremental_operator/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
default_args:
owner: data@datacamp.com
schedule_interval: 0 5 * * *
start_date: "2020-04-29"
32 changes: 32 additions & 0 deletions tests/projects/postgresql/incremental_operator/emails_blog.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
---
owner: engineering@datacamp.com
type: IncrementalPostgresOperator
description: For all users, list the email address and notification mode of the category blog.
fields:
user_id: The user ID
notification_mode: The detailed mode for which notifications to receive
email: Email address of the user
schema: viewflow
connection_id: postgres_viewflow
time_parameters:
initial:
min_time: '''2020-01-01 12:00:00'''
max_time: '''2021-12-31 12:00:00'''
update:
min_time: (SELECT max(updated_at) FROM viewflow.emails_blog)
max_time: (SELECT (max(updated_at) + interval '1 day' * 365) FROM viewflow.emails_blog)
primary_key: [user_id]
---
*/

SELECT u.user_id, notification_mode, email, updated_at
FROM
viewflow.users u
INNER JOIN viewflow.notifications n
ON n.user_id = u.user_id
WHERE
category = 'blog' AND
updated_at >= {{min_time}} AND
updated_at < {{max_time}}

Loading