Skip to content

Commit

Permalink
Use upsert and move hasura helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
francojreyes committed Feb 20, 2024
1 parent 07a5fa6 commit ef93415
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 128 deletions.
124 changes: 124 additions & 0 deletions app/helpers/hasura.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import os
import requests

from dotenv import load_dotenv

# Ensure HASURA_GRAPHQL_ env vars are set
load_dotenv()
HGQLA_SECRET = os.environ.get("HASURA_GRAPHQL_ADMIN_SECRET")
if not HGQLA_SECRET:
print("HASURA_GRAPHQL_ADMIN_SECRET not set")
exit(1)

HGQL_HOST = os.environ.get('HASURA_GRAPHQL_HOST')
if not HGQL_HOST:
print("HASURA_GRAPHQL_HOST not set")
exit(1)

HGQL_PORT = os.environ.get('HASURA_GRAPHQL_PORT')
if not HGQL_PORT:
print("HASURA_GRAPHQL_PORT not set")
exit(1)


def send_hasura_api_query(query: dict):
return requests.post(
f"http://{HGQL_HOST}:{HGQL_PORT}/v1/metadata",
headers={
"X-Hasura-Admin-Secret": HGQLA_SECRET
},
json=query
)


# The below functions are used to adhere to Hasura's relationship nomenclature
# https://hasura.io/docs/latest/schema/postgres/using-existing-database/
# Possibly use the `inflect` module if they aren't sufficient
def plural(s: str) -> str:
return s if s.endswith("s") else s + "s"


def singular(s: str) -> str:
return s if not s.endswith("s") else s[:-1]


def infer_relationships(table_name: str) -> list[object]:
"""
Use pg_suggest_relationships to infer any relations from foreign keys
in the given table. Returns an array containing queries to track each
relationship.
See https://hasura.io/docs/latest/api-reference/metadata-api/relationship/
"""
res = send_hasura_api_query({
"type": "pg_suggest_relationships",
"version": 1,
"args": {
"omit_tracked": True,
"tables": [table_name]
}
})

queries = []
for rel in res.json()["relationships"]:
if rel["type"] == "object":
queries.append({
"type": "pg_create_object_relationship",
"args": {
"source": "default",
"table": rel["from"]["table"]["name"],
"name": singular(rel["to"]["table"]["name"]),
"using": {
"foreign_key_constraint_on": rel["from"]["columns"]
}
}
})
elif rel["type"] == "array":
queries.append({
"type": "pg_create_array_relationship",
"args": {
"source": "default",
"table": rel["from"]["table"]["name"],
"name": plural(rel["to"]["table"]["name"]),
"using": {
"foreign_key_constraint_on": {
"table": rel["to"]["table"]["name"],
"columns": rel["to"]["columns"]
}
}
}
})

return queries


def track_table(table_name: str):
send_hasura_api_query({
"type": "pg_track_table",
"args": {
"source": "default",
"schema": "public",
"name": table_name
}
})

# Allow anonymous access
send_hasura_api_query({
"type": "pg_create_select_permission",
"args": {
"source": "default",
"table": table_name,
"role": "anonymous",
"permission": {
"columns": "*",
"filter": {},
"allow_aggregations": True
}
}
})

# Track relationships
send_hasura_api_query({
"type": "bulk",
"args": infer_relationships(table_name)
})
182 changes: 54 additions & 128 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,15 @@
from typing import Any, Literal, Optional

import psycopg2
import requests
import uvicorn
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from psycopg2 import Error
from psycopg2.extras import execute_values
from psycopg2.extensions import connection, cursor
from pydantic import BaseModel, Field

# Ensure HASURA_GRAPHQL_ env vars are set
load_dotenv()
HGQLA_SECRET = os.environ.get("HASURA_GRAPHQL_ADMIN_SECRET")
if not HGQLA_SECRET:
print("HASURA_GRAPHQL_ADMIN_SECRET not set")
exit(1)

HGQL_HOST = os.environ.get('HASURA_GRAPHQL_HOST')
if not HGQL_HOST:
print("HASURA_GRAPHQL_HOST not set")
exit(1)

HGQL_PORT = os.environ.get('HASURA_GRAPHQL_PORT')
if not HGQL_PORT:
print("HASURA_GRAPHQL_PORT not set")
exit(1)
from app.helpers.hasura import track_table


class Metadata(BaseModel):
Expand All @@ -36,18 +19,18 @@ class Metadata(BaseModel):
sql_up: str # SQL to set UP table and related data types/indexes
sql_down: str # SQL to tear DOWN a table (should be the opp. of up)
columns: list[str] # list of column names that require insertion
write_mode: Optional[Literal['append', 'truncate']] = Field('truncate', description='mode in which to write to the database')
write_mode: Literal['append', 'overwrite'] = Field('overwrite', description='mode in which to write to the database')


conn = None
cur = None
conn: connection = None
cur: cursor = None

try:
conn = psycopg2.connect(user=os.environ.get('POSTGRES_USER'),
password=os.environ.get('POSTGRES_PASSWORD'),
host=os.environ.get('POSTGRES_HOST'),
port=os.environ.get('POSTGRES_PORT'),
database=os.environ.get('POSTGRES_DB'))
password=os.environ.get('POSTGRES_PASSWORD'),
host=os.environ.get('POSTGRES_HOST'),
port=os.environ.get('POSTGRES_PORT'),
database=os.environ.get('POSTGRES_DB'))
cur = conn.cursor()

app = FastAPI()
Expand Down Expand Up @@ -109,75 +92,50 @@ def create_table(metadata: Metadata) -> bool:
return False


def send_hasura_api_query(query: dict):
return requests.post(
f"http://{HGQL_HOST}:{HGQL_PORT}/v1/metadata",
headers={
"X-Hasura-Admin-Secret": HGQLA_SECRET
},
json=query
)
def get_primary_key_columns(table_name: str) -> list[str]:
cmd = f"""
SELECT c.column_name
FROM information_schema.columns c
JOIN information_schema.key_column_usage kcu
ON c.table_name = kcu.table_name
AND c.column_name = kcu.column_name
JOIN information_schema.table_constraints tc
ON kcu.table_name = tc.table_name
AND kcu.constraint_name = tc.constraint_name
WHERE c.table_name = '{table_name}'
AND tc.constraint_type = 'PRIMARY KEY';
"""
cur.execute(cmd)

return [row[0] for row in cur.fetchall()]

# The below functions are used to adhere to Hasura's relationship nomenclature
# https://hasura.io/docs/latest/schema/postgres/using-existing-database/
# Possibly use the `inflect` module if they aren't sufficient
def plural(s: str) -> str:
return s if s.endswith("s") else s + "s"

def execute_upsert(metadata: Metadata, payload: list[Any]):
columns = [f'"{col}"' for col in metadata.columns]
key_columns = [f'"{col}"' for col in get_primary_key_columns(metadata.table_name)]
non_key_columns = [col for col in columns if col not in key_columns]

def singular(s: str) -> str:
return s if not s.endswith("s") else s[:-1]
cmd = f"""
INSERT INTO {metadata.table_name}({", ".join(columns)}) VALUES %s
ON CONFLICT ({", ".join(key_columns)})
DO UPDATE SET {", ".join(f"{col} = EXCLUDED.{col}" for col in non_key_columns)};
"""
values = [tuple(row[col] for col in metadata.columns) for row in payload]

execute_values(cur, cmd, values)

def infer_relationships(table_name: str) -> list[object]:
"""
Use pg_suggest_relationships to infer any relations from foreign keys
in the given table. Returns an array containing queries to track each
relationship.

See https://hasura.io/docs/latest/api-reference/metadata-api/relationship/
def execute_delete(metadata: Metadata, payload: list[Any]):
key_columns = get_primary_key_columns(metadata.table_name)
quoted_key_columns = [f'"{col}"' for col in key_columns]

cmd = f"""
DELETE FROM {metadata.table_name}
WHERE ({", ".join(quoted_key_columns)}) NOT IN %s;
"""
res = send_hasura_api_query({
"type": "pg_suggest_relationships",
"version": 1,
"args": {
"omit_tracked": True,
"tables": [table_name]
}
})

queries = []
for rel in res.json()["relationships"]:
if rel["type"] == "object":
queries.append({
"type": "pg_create_object_relationship",
"args": {
"source": "default",
"table": rel["from"]["table"]["name"],
"name": singular(rel["to"]["table"]["name"]),
"using": {
"foreign_key_constraint_on": rel["from"]["columns"]
}
}
})
elif rel["type"] == "array":
queries.append({
"type": "pg_create_array_relationship",
"args": {
"source": "default",
"table": rel["from"]["table"]["name"],
"name": plural(rel["to"]["table"]["name"]),
"using": {
"foreign_key_constraint_on": {
"table": rel["to"]["table"]["name"],
"columns": rel["to"]["columns"]
}
}
}
})

return queries
values = [tuple(row[col] for col in key_columns) for row in payload]

cur.execute(cmd, values)


@app.post("/insert")
Expand All @@ -191,19 +149,16 @@ def insert(metadata: Metadata, payload: list[Any]):
raise HTTPException(status_code=400, detail=err_msg)

try:
# execute whatever SQL is required
# Execute whatever SQL is required
if metadata.sql_execute:
cur.execute(metadata.sql_execute)
if metadata.write_mode == 'truncate':
# Remove old data
cmd = f'TRUNCATE {metadata.table_name} CASCADE'
cur.execute(cmd)

# Insert new data
values = [tuple(row[col] for col in metadata.columns) for row in payload]
metadata.columns = [f'"{col}"' for col in metadata.columns]
cmd = f'INSERT INTO {metadata.table_name}({", ".join(metadata.columns)}) VALUES %s'
execute_values(cur, cmd, values)

execute_upsert(metadata, payload)

if metadata.write_mode == 'overwrite':
# Delete rows not in payload
execute_delete(metadata, payload)

except (Exception, Error) as error:
err_msg = "Error while inserting into PostgreSQL table: " + str(error)
print(err_msg)
Expand All @@ -212,38 +167,9 @@ def insert(metadata: Metadata, payload: list[Any]):

conn.commit()

# Run Hasura actions - must be done after transaction committed
# Run Hasura actions - must be done after transaction committed otherwise Hasura won't see the table
if created:
# Track table
send_hasura_api_query({
"type": "pg_track_table",
"args": {
"source": "default",
"schema": "public",
"name": metadata.table_name.lower()
}
})

# Allow anonymous access
send_hasura_api_query({
"type": "pg_create_select_permission",
"args": {
"source": "default",
"table": metadata.table_name.lower(),
"role": "anonymous",
"permission": {
"columns": "*",
"filter": {},
"allow_aggregations": True
}
}
})

# Track relationships
send_hasura_api_query({
"type": "bulk",
"args": infer_relationships(metadata.table_name.lower())
})
track_table(metadata.table_name.lower())

return {}

Expand Down

0 comments on commit ef93415

Please sign in to comment.