Skip to content

Commit

Permalink
use upsert logic for insertion (#9)
Browse files Browse the repository at this point in the history
* Use upsert and move hasura helpers

* Update docs to reflect upsert

* Support running actions after insertion

* Fixed bugs
  • Loading branch information
francojreyes authored Mar 26, 2024
1 parent e04e395 commit 3b17593
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 144 deletions.
28 changes: 16 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,22 +161,26 @@ The scrape job should produce JSON output and send a HTTP POST request to `http:

Inserts data into the PostgreSQL table specified by `table_name`. If such a table does not yet exist, it is created as specified in `sql_up`.

Insertion into Hasuragres uses 'upsertion' logic. This means that when inserting rows, if there is already an existing row with the same primary key, we update that row rather than raising an error. This approach is also used if `write_mode` is set to `"overwrite"` so that rows still present after overwriting don't get deleted and cascade down to any dependent tables.

Hasuragres keeps track of the SQL scripts used to create each table. If there is an inconsistency between the stored `sql_up` script and the one provided in the request, then the table is re-created using the new `sql_up`. The stored `sql_down` is used to drop the old table - it's important that `sql_down` is correct, otherwise updates to the table structure may fail.

When a table is created, it is automatically tracked in Hasura and added to the GraphQL Schema. Corresponding query fields are created called `<table_name>` and `<table_name>_by_pk` (note that `table_name` will be in lowercase), with fields for each column of the table. Furthermore, any foreign key relationships are inferred, and fields containing nested objects are added to each relevant queryable data type. More information can be found [here](https://hasura.io/docs/latest/getting-started/how-it-works/index/).

#### Parameters

| name | type | required | description |
|------------------------|--------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `metadata` | object | Yes | Instructions for creating/inserting into PostgreSQL tables. |
| `metadata.table_name` | str | Yes | Name of table to create/insert into.<br/><br/>Must match name of table created in `metadata.sql_up` (case insensitive). |
| `metadata.columns` | list[str] | Yes | List of column names that require insertion.<br/><br/>Must match column names in table created in `metadata.sql_up`, as well as the keys of each object in `payload` (case sensitive). |
| `metadata.write_mode` | str | No | One of `"truncate"`, meaning all rows in the table should be deleted before insertion, or `"append"`, meaning add to the existing data.<br/><br/>Defaults to `truncate`. |
| `metadata.sql_execute` | str | No | SQL commands to run *before* each insertion. |
| `metadata.sql_up` | str | Yes | SQL commands used to set UP (create) a table to store the scraped data, as well as any related data types. |
| `metadata.sql_down` | str | Yes | SQL commands to tear DOWN (drop) all objects created by `metadata.sql_up`.<br/><br/>Should use the CASCADE option when dropping, otherwise the script may fail unexpectedly when other tables rely on this one. |
| `payload` | list[object] | Yes | List of objects to insert into the database.<br/><br/>Ideally, this is simply the JSON output of the scraper. |
| name | type | required | description |
|-----------------------|--------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `metadata` | object | Yes | Instructions for creating/inserting into PostgreSQL tables. |
| `metadata.table_name` | str | Yes | Name of table to create/insert into.<br/><br/>Must match name of table created in `metadata.sql_up` (case insensitive). |
| `metadata.columns` | list[str] | Yes | List of column names that require insertion.<br/><br/>Must match column names in table created in `metadata.sql_up`, as well as the keys of each object in `payload` (case sensitive). |
| `metadata.write_mode` | str | No | One of `"overwrite"` or `"append"`.<br/><br/>Defaults to `"overwrite"`. |
| `metadata.sql_before` | str | No | SQL command to run *before* the insertion. |
| `metadata.sql_after` | str | No | SQL command to run *after* the insertion. |
| `metadata.sql_up` | str | Yes | SQL commands used to set UP (create) a table to store the scraped data, as well as any related data types. |
| `metadata.sql_down` | str | Yes | SQL commands to tear DOWN (drop) all objects created by `metadata.sql_up`.<br/><br/>Should use the CASCADE option when dropping, otherwise the script may fail unexpectedly when other tables rely on this one. |
| `payload` | list[object] | Yes | List of objects to insert into the database.<br/><br/>Ideally, this is simply the JSON output of the scraper. |


#### Example Request

Expand Down Expand Up @@ -205,9 +209,9 @@ If you want to connect multiple scrapers to the same table, for example if you h

Both scrapers should maintain an up-to-date copy of the `sql_up` and `sql_down` commands sent to Hasuragres. Furthermore, if you need to update these commands, please be sure to update all scrapers around the same time without much delay between each. If at any point the scrapers have different versions of the SQL, then any inserts will simply drop the table and all data from the other scraper(s).

It is also important that you make use of the `sql_execute` and `write_mode` fields of the insert metadata. By default, inserts are set to truncate the table they insert to, which would only allow data from one scraper at any one time. For multiple scrapers, they should each be in `"append"` mode so that scrapers can add on to the data from other scrapers.
It is also important that you make use of the `sql_before` and `write_mode` fields of the insert metadata. By default, inserts are set to truncate the table they insert to, which would only allow data from one scraper at any one time. For multiple scrapers, they should each be in `"append"` mode so that scrapers can add on to the data from other scrapers.

Also, `sql_execute` should contain commands(s) to remove only those rows that were previously inserted by the scraper - it may be useful to add some field to the schema that identifies the source of each row if there is no easy way to distinguish between the data sources.
Also, `sql_before` should contain commands(s) to remove only those rows that were previously inserted by the scraper - it may be useful to add some field to the schema that identifies the source of each row if there is no easy way to distinguish between the data sources.

## Testing Scrapers

Expand Down
124 changes: 124 additions & 0 deletions app/helpers/hasura.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import os
import requests

from dotenv import load_dotenv

# Ensure HASURA_GRAPHQL_ env vars are set
load_dotenv()
HGQLA_SECRET = os.environ.get("HASURA_GRAPHQL_ADMIN_SECRET")
if not HGQLA_SECRET:
print("HASURA_GRAPHQL_ADMIN_SECRET not set")
exit(1)

HGQL_HOST = os.environ.get('HASURA_GRAPHQL_HOST')
if not HGQL_HOST:
print("HASURA_GRAPHQL_HOST not set")
exit(1)

HGQL_PORT = os.environ.get('HASURA_GRAPHQL_PORT')
if not HGQL_PORT:
print("HASURA_GRAPHQL_PORT not set")
exit(1)


def send_hasura_api_query(query: dict):
return requests.post(
f"http://{HGQL_HOST}:{HGQL_PORT}/v1/metadata",
headers={
"X-Hasura-Admin-Secret": HGQLA_SECRET
},
json=query
)


# The below functions are used to adhere to Hasura's relationship nomenclature
# https://hasura.io/docs/latest/schema/postgres/using-existing-database/
# Possibly use the `inflect` module if they aren't sufficient
def plural(s: str) -> str:
return s if s.endswith("s") else s + "s"


def singular(s: str) -> str:
return s if not s.endswith("s") else s[:-1]


def infer_relationships(table_name: str) -> list[object]:
"""
Use pg_suggest_relationships to infer any relations from foreign keys
in the given table. Returns an array containing queries to track each
relationship.
See https://hasura.io/docs/latest/api-reference/metadata-api/relationship/
"""
res = send_hasura_api_query({
"type": "pg_suggest_relationships",
"version": 1,
"args": {
"omit_tracked": True,
"tables": [table_name]
}
})

queries = []
for rel in res.json()["relationships"]:
if rel["type"] == "object":
queries.append({
"type": "pg_create_object_relationship",
"args": {
"source": "default",
"table": rel["from"]["table"]["name"],
"name": singular(rel["to"]["table"]["name"]),
"using": {
"foreign_key_constraint_on": rel["from"]["columns"]
}
}
})
elif rel["type"] == "array":
queries.append({
"type": "pg_create_array_relationship",
"args": {
"source": "default",
"table": rel["from"]["table"]["name"],
"name": plural(rel["to"]["table"]["name"]),
"using": {
"foreign_key_constraint_on": {
"table": rel["to"]["table"]["name"],
"columns": rel["to"]["columns"]
}
}
}
})

return queries


def track_table(table_name: str):
send_hasura_api_query({
"type": "pg_track_table",
"args": {
"source": "default",
"schema": "public",
"name": table_name
}
})

# Allow anonymous access
send_hasura_api_query({
"type": "pg_create_select_permission",
"args": {
"source": "default",
"table": table_name,
"role": "anonymous",
"permission": {
"columns": "*",
"filter": {},
"allow_aggregations": True
}
}
})

# Track relationships
send_hasura_api_query({
"type": "bulk",
"args": infer_relationships(table_name)
})
Loading

0 comments on commit 3b17593

Please sign in to comment.