Skip to content

Commit

Permalink
fix: missing loglines from tailing logs with GCP (#2112)
Browse files Browse the repository at this point in the history
  • Loading branch information
saikonen authored Oct 23, 2024
1 parent e4238b2 commit 5fd5675
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 8 deletions.
2 changes: 1 addition & 1 deletion metaflow/plugins/azure/azure_tail.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def _fill_buf(self):
if data is None:
return None
if data:
buf = BytesIO(data)
buf = BytesIO(self._tail + data)
self._pos += len(data)
self._tail = b""
return buf
Expand Down
16 changes: 10 additions & 6 deletions metaflow/plugins/gcp/gs_tail.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
class GSTail(object):
def __init__(self, blob_full_uri):
"""Location should be something like gs://<bucket_name>/blob"""
bucket_name, blob_name = parse_gs_full_path(blob_full_uri)
if not blob_name:
self.bucket_name, self.blob_name = parse_gs_full_path(blob_full_uri)
if not self.blob_name:
raise MetaflowException(
msg="Failed to parse blob_full_uri into gs://<bucket_name>/<blob_name> (got %s)"
% blob_full_uri
)
client = get_gs_storage_client()
bucket = client.bucket(bucket_name)
self._blob_client = bucket.blob(blob_name)
self.bucket = client.bucket(self.bucket_name)
self._blob_client = self.bucket.blob(self.blob_name)
self._pos = 0
self._tail = b""

Expand All @@ -46,7 +46,11 @@ def __iter__(self):
def _make_range_request(self):
try:
# Yes we read to the end... memory blow up is possible. We can improve by specifying length param
return self._blob_client.download_as_bytes(start=self._pos)
# NOTE: We must re-instantiate the whole client here due to a behavior with the GS library,
# otherwise download_as_bytes will simply return the same content for consecutive requests with the same attributes,
# even if the blob has grown in size.
blob_client = self.bucket.blob(self.blob_name)
return blob_client.download_as_bytes(start=self._pos)
except NotFound:
return None
except ClientError as e:
Expand All @@ -63,7 +67,7 @@ def _fill_buf(self):
if data is None:
return None
if data:
buf = BytesIO(data)
buf = BytesIO(self._tail + data)
self._pos += len(data)
self._tail = b""
return buf
Expand Down
13 changes: 12 additions & 1 deletion metaflow/plugins/kubernetes/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,12 +710,23 @@ def wait_for_launch(job):
wait_for_launch(self._job)

# 2) Tail logs until the job has finished
self._output_final_logs = False

def _has_updates():
if self._job.is_running:
return True
# Make sure to output final tail for a job that has finished.
if not self._output_final_logs:
self._output_final_logs = True
return True
return False

tail_logs(
prefix=prefix(),
stdout_tail=stdout_tail,
stderr_tail=stderr_tail,
echo=echo,
has_log_updates=lambda: self._job.is_running,
has_log_updates=_has_updates,
)
# 3) Fetch remaining logs
#
Expand Down

0 comments on commit 5fd5675

Please sign in to comment.