Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix two possible issues with the environment escape communication #1555

Merged
merged 1 commit into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions metaflow/plugins/env_escape/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import fcntl
import gc
import os
import importlib
import itertools
Expand Down Expand Up @@ -60,6 +61,7 @@ def inner_init(self, python_executable, pythonpath, max_pickle_version, config_d
# have an exception
self._poller = None
self._poller_lock = threading.Lock()
self._active_pid = os.getpid()
self._server_process = None
self._socket_path = None

Expand All @@ -70,7 +72,7 @@ def inner_init(self, python_executable, pythonpath, max_pickle_version, config_d
# The client launches the server when created; we use
# Unix sockets for now
server_module = ".".join([__package__, "server"])
self._socket_path = "/tmp/%s_%d" % (server_config, os.getpid())
self._socket_path = "/tmp/%s_%d" % (server_config, self._active_pid)
if os.path.exists(self._socket_path):
raise RuntimeError("Existing socket: %s" % self._socket_path)
env = os.environ.copy()
Expand Down Expand Up @@ -390,8 +392,18 @@ def _get_canonical_name(self, name):
return name

def _communicate(self, msg):
with self._poller_lock:
return self._locked_communicate(msg)
if os.getpid() != self._active_pid:
raise RuntimeError(
"You cannot use the environment escape across process boundaries."
)
# We also disable the GC because in some rare cases, it may try to delete
# a remote object while we are communicating which will cause a deadlock
try:
gc.disable()
with self._poller_lock:
return self._locked_communicate(msg)
finally:
gc.enable()

def _locked_communicate(self, msg):
self._channel.send(msg)
Expand Down
10 changes: 2 additions & 8 deletions metaflow/plugins/env_escape/stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
"___remote_class_name___",
"___identifier___",
"___connection___",
"___refcount___",
"___local_overrides___" "__class__",
"__init__",
"__del__",
Expand Down Expand Up @@ -78,6 +77,7 @@ def __repr__(self):

def with_metaclass(meta, *bases):
"""Create a base class with a metaclass."""

# Compatibility 2/3. Remove when only 3 support
class metaclass(type):
def __new__(cls, name, this_bases, d):
Expand All @@ -99,7 +99,6 @@ class Stub(with_metaclass(StubMetaClass, object)):
"___identifier___",
"___connection___",
"__weakref__",
"___refcount___",
]

# def __iter__(self): # FIXME: Keep debugger QUIET!!
Expand All @@ -109,14 +108,10 @@ def __init__(self, connection, remote_class_name, identifier):
self.___remote_class_name___ = remote_class_name
self.___identifier___ = identifier
self.___connection___ = connection
self.___refcount___ = 1

def __del__(self):
try:
pass
self.___refcount___ -= 1
if self.___refcount___ == 0:
fwd_request(self, OP_DEL)
fwd_request(self, OP_DEL)
except Exception:
# raised in a destructor, most likely on program termination,
# when the connection might have already been closed.
Expand Down Expand Up @@ -262,7 +257,6 @@ def create_class(
setattr_overrides,
class_methods,
):

class_dict = {"__slots__": ()}
for name, doc in class_methods.items():
method_type = NORMAL_METHOD
Expand Down
Loading