Skip to content

Commit

Permalink
597 make sidecar and transformer containers communicate over socket (#…
Browse files Browse the repository at this point in the history
…601)

* Remove comented out code

---------

Co-authored-by: Ben Galewsky <ben@peartreestudio.net>
  • Loading branch information
ivukotic and BenGalewsky authored Jul 11, 2023
1 parent dedb7e7 commit 6b4235d
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 331 deletions.
7 changes: 2 additions & 5 deletions transformer_sidecar/.gitignore
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
<<<<<<< HEAD
.python-version
=======
# Byte-compiled / optimized / DLL files
__pycache__/
*.pyc

*.py[cod]
*$py.class

Expand Down Expand Up @@ -161,5 +160,3 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/

>>>>>>> 0bcd4c13 (Migrate classes we need from the transformer library)
1 change: 0 additions & 1 deletion transformer_sidecar/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
rabbitmq
pyarrow
psutil
watchdog
pika>=1.1.0
minio>=7.1.0
retry
Expand Down
100 changes: 63 additions & 37 deletions transformer_sidecar/scripts/watch.sh
Original file line number Diff line number Diff line change
@@ -1,52 +1,78 @@
#!/usr/bin/env bash
# This script is used in the science container to watch for new transform
# requests in the form of JSON files. From this file, it extracts the source
# input file URI, a scratch output filename and a final file path to write
# requests in the form of JSON documents coming over the socket on port 8081.
# From this doc, it extracts the source
# input file URI, an output filename and a file path to write
# the transformed file upon completion.
# Signal to the sidecar that we are done by writing a file with the extension
# .done to the directory. Also write the log files to the shared volume.
# Finally, if there is an error then write a .failure file to signal to the
# sidecar.
# To signal to the sidecar that the transformation is done, we send result over
# the socket. Also write the log files to the shared volume.
#
# Communication steps for each input file
# watch sends "GeT" in order to ask for a json doc
# watch runs transformation
# writes to the socket "success." or "failure."
# listens for the confirmation.

# Arguments:
# 1. A "language" used to execute the transform script. Usually bash or python
# 2. The script to execute
# 3. Path to the shared volume
#
lang=$1
cmd=$2
path=$3

while true; do
if [ -f $path/*.json ]; then

start=$(date +%s%N | cut -b1-13)

for file in `ls $path/*.json`; do
download_path=`grep -o '"downloadPath": "[^\"]*"' $file |tr -d '"' |tr -d ',' | awk '{print $2}' `
output_file=`grep -o '"safeOutputFileName": "[^\"]*"' $file |tr -d '"' |tr -d ',' | awk '{print $2}' `
completed_file=`grep -o 'completedFileName": "[^\"]*"' $file |tr -d '"' |tr -d ',' | awk '{print $2}' `
output_format=`grep -o '"result-format": "[^\"]*"' $file |tr -d '"' |tr -d ',' | awk '{print $2}' `

echo "Attempting $download_path -> $output_file -> $completed_file with $output_format format"
$lang "$cmd" "$download_path" "$output_file" "$output_format" 2>&1 | tee $file.log

if [ "${PIPESTATUS[0]}" == 0 ]; then
echo "Success. skipping rest of input_files"
mv "$output_file" "$completed_file"
touch "$file".done
rm "$file"
else
echo "Operation failed for $download_path"
touch "$file".failed
rm "$file"
fi
done;

end=$(date +%s%N | cut -b1-13)
echo "Elapsed Time: $(($end-$start)) ms"
echo "connecting..."

coproc nc { nc localhost 8081; }

while [[ $nc_PID ]] ; do

start=$(date +%s%N | cut -b1-13)
echo "start: $start"
# ask for a file to be processed
printf >&${nc[1]} '%s\n' "GeT"
read -r -u${nc[0]} line
printf >&2 '%s\n' "Received request:" "$line"
if [[ -z "$line" ]]; then
echo "received an empty line"
break
fi
download_path=$(echo $line | jq -r '.downloadPath')
output_file=$(echo $line | jq -r '.safeOutputFileName')
output_format=$(echo $line | jq -r '."result-format"')

echo "Attempting $download_path -> $output_file with $output_format format"
$lang "$cmd" "$download_path" "$output_file" "$output_format" 2>&1 | tee $path/abc.log

# sending status back
if [ "${PIPESTATUS[0]}" == 0 ]; then
echo "Success. skipping rest of input_files"
printf >&${nc[1]} '%s\n' "success."
else
echo 'SLEEPING in watch'
sleep 0.1
echo "Operation failed for $download_path"
printf >&${nc[1]} '%s\n' "failure."
fi

# get confirmation.
read -r -u${nc[0]} line
printf >&2 '%s\n' "Reply:" "$line"

end=$(date +%s%N | cut -b1-13)
echo "Elapsed Time: $(($end-$start)) ms"
done


# {
# "request-id": "decfc25d-4b19-47c9-a742-6259450142c7",
# "file-id": null,
# "columns": null,
# "paths": "root://lcg-lrz-rootd.grid.lrz.de:1094/pnfs/lrz-muenchen.de/data/atlas/dq2/atlaslocalgroupdisk/rucio/user/ivukotic/53/26/00284890-DCDB-E511-9F9D-02163E012FCA.root,root://eosatlas.cern.ch:1094//eos/atlas/atlasscratchdisk/rucio/user/ivukotic/53/26/00284890-DCDB-E511-9F9D-02163E012FCA.root",
# "tree-name": null,
# "service-endpoint": "http://servicex-release-testing-2-servicex-app:8000/servicex/internal/transformation/decfc25d-4b19-47c9-a742-6259450142c7",
# "chunk-size": "1000",
# "result-destination": "object-store",
# "result-format": "parquet",
# "downloadPath": "root://xcache.af.uchicago.edu:1094//root://lcg-lrz-rootd.grid.lrz.de:1094/pnfs/lrz-muenchen.de/data/atlas/dq2/atlaslocalgroupdisk/rucio/user/ivukotic/53/26/00284890-DCDB-E511-9F9D-02163E012FCA.root",
# "safeOutputFileName": "/servicex/output/decfc25d-4b19-47c9-a742-6259450142c7/scratch/root:::xcache.af.uchicago.edu:1094::root:::lcg-lrz-rootd.grid.lrz.de:1094:pnfs:lrz-muenchen.de:data:atlas:dq2:atlaslocalgroupdisk:rucio:user:ivukotic:53:26:00284890-DCDB-E511-9F9D-02163E012FCA.root.parquet",
# "completedFileName": "/servicex/output/decfc25d-4b19-47c9-a742-6259450142c7/root:::xcache.af.uchicago.edu:1094::root:::lcg-lrz-rootd.grid.lrz.de:1094:pnfs:lrz-muenchen.de:data:atlas:dq2:atlaslocalgroupdisk:rucio:user:ivukotic:53:26:00284890-DCDB-E511-9F9D-02163E012FCA.root.parquet"
# }
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,14 @@ def upload_file(self, bucket, object_name, path):
result = self.minio_client.fput_object(bucket_name=bucket,
object_name=object_name,
file_path=path)
self.logger.debug(
"created object", extra={'name': result.object_name})
except MinioException:
self.logger.error("Minio error", exc_info=True)
self.logger.info(
f"OSM > created object {result.object_name} object, etag: {result.etag}")
except S3Error:
self.logger.error("S3Error", exc_info=True)
except MinioException:
self.logger.error("Minio error", exc_info=True)

try:
os.remove(path)
except FileNotFoundError:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ def __init__(self, request_id: str,
def service_work_queue(self):
while True:
item = self.input_queue.get()
self.logger.debug("Got an item", extra={'requestId': self.request_id})
if item.is_complete():
self.logger.debug("We are done", extra={'requestId': self.request_id})
self.logger.info("OSU We are done!",
extra={'requestId': self.request_id})
break
else:
self.object_store.upload_file(self.request_id,
Expand Down
112 changes: 56 additions & 56 deletions transformer_sidecar/src/transformer_sidecar/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import glob

import time

import json
Expand All @@ -39,6 +39,8 @@
from queue import Queue
from typing import NamedTuple

import socket

from object_store_manager import ObjectStoreManager
from object_store_uploader import ObjectStoreUploader
from rabbit_mq_manager import RabbitMQManager
Expand All @@ -48,12 +50,15 @@
from transformer_sidecar.transformer_stats import TransformerStats
from transformer_sidecar.transformer_stats.aod_stats import AODStats # NOQA: 401
from transformer_sidecar.transformer_stats.uproot_stats import UprootStats # NOQA: 401
from watched_directory import WatchedDirectory

object_store = None
posix_path = None
startup_time = None

serv = None
conn = None
upload_queue = None

# Use this to make sure we don't generate output file names that are crazy long
MAX_PATH_LEN = 255

Expand Down Expand Up @@ -114,14 +119,6 @@ def fill_stats_parser(stats_parser_name: str, logfile_path: Path) -> Transformer
return globals()[stats_parser_name](logfile_path)


def clear_files(request_path: Path, file_id: str) -> None:
for f in glob.glob(f"{file_id}.json*", root_dir=request_path):
try:
os.remove(f)
except FileNotFoundError:
pass


# noinspection PyUnusedLocal
def callback(channel, method, properties, body):
"""
Expand All @@ -130,14 +127,14 @@ def callback(channel, method, properties, body):
transformed.
Each request may include a list of replicas to try. This service will loop through
the replicas and produce a json file for the science package to actually work from.
This control file will have the replica for it to try to transform along with a
the replicas and produce a json document for the science package to actually work from.
This control document will have the replica for it to try to transform along with a
path in the output directory for the generated parquet or root file to be written.
This json file is written to the shared volume and then a thread is kicked off to
wait for a .done or .failure file to be created. This is the science package's way of
showing it is done with the request. There will either be a parquet/root file in the
output directory or at least a log file.
This control document is sent via a socket to the science image.
Once science image has done its job, it sends back a message over the socket.
The sidecar will then add the parquet/root file in the output directory to the
S3 upload queue.
We will examine this log file to see if the transform succeeded or failed
"""
Expand Down Expand Up @@ -175,8 +172,7 @@ def callback(channel, method, properties, body):
request_path = os.path.join(posix_path, _request_id)
os.makedirs(request_path, exist_ok=True)

# scratch dir where the transformer temporarily writes the results. This
# directory isn't monitored, so we can't pick up partial results
# scratch dir where the transformer temporarily writes the results.
scratch_path = os.path.join(posix_path, _request_id, 'scratch')
os.makedirs(scratch_path, exist_ok=True)

Expand Down Expand Up @@ -208,44 +204,39 @@ def callback(channel, method, properties, body):
hashed_file_name
)

# Final results are written here and picked up by the watched directory thread
transform_request['completedFileName'] = os.path.join(
request_path,
hashed_file_name
)

# creating json file for use by science transformer
jsonfile = str(_file_id) + '.json'
with open(os.path.join(request_path, jsonfile), 'w') as outfile:
json.dump(transform_request, outfile)

# Queue to communicate between WatchedDirectory and object file uploader
upload_queue = Queue()

# Watch for new files appearing in the shared directory
watcher = WatchedDirectory(Path(request_path), upload_queue,
logger=logger, servicex=servicex)

# And upload them to the object store
uploader = ObjectStoreUploader(request_id=_request_id, input_queue=upload_queue,
object_store=object_store, logger=logger)

watcher.start()
uploader.start()

# Wait for both threads to complete
watcher.observer.join()
print(f"Watched Directory Thread is done. Status:{watcher.status}")
uploader.join()
print("Uploader is done")
while True:
print('waiting for the GeT')
req = conn.recv(4096)
if not req:
print('problem in getting GeT')
break
req1 = req.decode('utf8')
print("REQ >>>>>>>>>>>>>>>", req1)
if req1.startswith('GeT'):
break

res = json.dumps(transform_request)+"\n"
print("sending:", res)
conn.send(res.encode())

print('WAITING FOR STATUS...')
req = conn.recv(4096)
if not req:
break
req2 = req.decode('utf8').strip()
print('STATUS RECEIVED :', req2)
if req2 == 'success.':
upload_queue.put(ObjectStoreUploader.WorkQueueItem(
Path(transform_request['safeOutputFileName'])))
conn.send("confirmed.\n".encode())

# Grab the logs
transformer_stats = fill_stats_parser(
transformer_capabilities['stats-parser'],
Path(os.path.join(request_path, jsonfile + '.log'))
Path(os.path.join(request_path, 'abc.log'))
)

if watcher.status == WatchedDirectory.TransformStatus.SUCCESS:
if req2 == 'success.':
transform_success = True
ts = {
"requestId": _request_id,
Expand All @@ -256,8 +247,6 @@ def callback(channel, method, properties, body):
logger.info("Transformer stats.", extra=ts)
break

clear_files(Path(request_path), _file_id)

# If none of the replicas resulted in a successful transform then we have
# a hard failure with this file.
if not transform_success:
Expand All @@ -269,16 +258,14 @@ def callback(channel, method, properties, body):
}
logger.error("Hard Failure", extra=hf)

shutil.rmtree(request_path)

if transform_success:
servicex.put_file_complete(_request_id, _file_path, _file_id, "success",
num_messages=0,
total_time=total_time,
total_events=transformer_stats.total_events,
total_bytes=transformer_stats.file_size)
total_bytes=transformer_stats.file_size
)
else:

servicex.put_file_complete(_request_id, file_path=_file_path, file_id=_file_id,
status='failure', num_messages=0,
total_time=0, total_events=0,
Expand Down Expand Up @@ -357,7 +344,20 @@ def callback(channel, method, properties, body):
'iowait': startup_time.iowait
})

serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv.bind(('localhost', 8081))
serv.listen()
conn, addr = serv.accept()

upload_queue = Queue()
uploader = ObjectStoreUploader(request_id=args.request_id, input_queue=upload_queue,
object_store=object_store, logger=logger)
uploader.start()

if args.request_id:
rabbitmq = RabbitMQManager(args.rabbit_uri,
args.request_id,
callback)

uploader.join()
logger.info("Uploader is done", extra={'requestId': args.request_id})
Loading

0 comments on commit 6b4235d

Please sign in to comment.