Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

always send feedback for postgres replication messages #107

Merged
merged 1 commit into from
May 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 48 additions & 43 deletions meilisync/source/postgres.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import json
from asyncio import Queue
from typing import List
from typing import List, Any

import psycopg2
import psycopg2.errors
Expand Down Expand Up @@ -88,51 +88,56 @@ def _():

def _consumer(self, msg: ReplicationMessage):
payload = json.loads(msg.payload)
changes = payload.get("change")
if not changes:
return
next_lsn = payload["nextlsn"]

changes = payload.get("change", [])
for change in changes:
Comment on lines 89 to 94
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reworked quite a bit of this for neatness, hope that's okay!

The primary change was to payload.get("change", []) so that it doesn't early return.
I also changed payload.get("nextlsn") to payload["nextlsn"], AFAIK there will always be a nextlsn.

kind = change.get("kind")
table = change.get("table")
if table not in self.tables:
continue
columnnames = change.get("columnnames", [])
columnvalues = change.get("columnvalues", [])
columntypes = change.get("columntypes", [])

for i in range(len(columntypes)):
if columntypes[i] == "json":
columnvalues[i] = json.loads(columnvalues[i])

if kind == "update":
values = dict(zip(columnnames, columnvalues))
event_type = EventType.update
elif kind == "delete":
values = (
dict(zip(columnnames, columnvalues))
if columnvalues
else {change["oldkeys"]["keynames"][0]: change["oldkeys"]["keyvalues"][0]}
)
event_type = EventType.delete
elif kind == "insert":
values = dict(zip(columnnames, columnvalues))
event_type = EventType.create
else:
return
asyncio.new_event_loop().run_until_complete(
self.queue.put( # type: ignore
Event(
type=event_type,
table=table,
data=values,
progress={"start_lsn": payload.get("nextlsn")},
)
)
self.__handle_change(change, next_lsn)

# Always report success to the server to avoid a “disk full” condition.
# https://www.psycopg.org/docs/extras.html#psycopg2.extras.ReplicationCursor.consume_stream
msg.cursor.send_feedback(flush_lsn=msg.data_start)

def __handle_change(self, change: dict[str, Any], next_lsn: str):
table = change.get("table")
if table not in self.tables:
return

columnnames = change.get("columnnames", [])
columnvalues = change.get("columnvalues", [])
columntypes = change.get("columntypes", [])

for i in range(len(columntypes)):
if columntypes[i] == "json":
columnvalues[i] = json.loads(columnvalues[i])

kind = change.get("kind")
if kind == "update":
values = dict(zip(columnnames, columnvalues))
event_type = EventType.update
elif kind == "delete":
values = (
dict(zip(columnnames, columnvalues))
if columnvalues
else {change["oldkeys"]["keynames"][0]: change["oldkeys"]["keyvalues"][0]}
)
event_type = EventType.delete
elif kind == "insert":
values = dict(zip(columnnames, columnvalues))
event_type = EventType.create
else:
return

# Report success to the server to avoid a “disk full” condition.
# https://www.psycopg.org/docs/extras.html#psycopg2.extras.ReplicationCursor.consume_stream
msg.cursor.send_feedback(flush_lsn=msg.data_start)
asyncio.new_event_loop().run_until_complete(
self.queue.put( # type: ignore
Event(
type=event_type,
table=table,
data=values,
progress={"start_lsn": next_lsn},
)
)
)

async def get_count(self, sync: Sync):
with self.conn_dict.cursor() as cur:
Expand Down