Skip to content

Commit

Permalink
Clean up retries to use a tenacity retry decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
BenGalewsky committed Oct 14, 2024
1 parent 2b6ee39 commit 5d69990
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 49 deletions.
20 changes: 10 additions & 10 deletions servicex_app/servicex_app/code_gen_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,23 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import requests
from tenacity import Retrying, stop_after_attempt, wait_exponential_jitter
from requests_toolbelt.multipart import decoder

from servicex_app.models import TransformRequest
from servicex_app.reliable_requests import request_timeout, servicex_retry


class CodeGenAdapter:
def __init__(self, code_gen_service_urls, transformer_manager):
self.code_gen_service_urls = code_gen_service_urls
self.transformer_manager = transformer_manager

@servicex_retry()
def post_request(self, post_url, post_obj):
result = requests.post(post_url + "/servicex/generated-code", json=post_obj,
timeout=request_timeout)
return result

def generate_code_for_selection(
self, request_record: TransformRequest,
namespace: str,
Expand All @@ -60,16 +67,9 @@ def generate_code_for_selection(
if not post_url:
raise ValueError(f'{user_codegen_name}, code generator unavailable for use')

postObj = {
result = self.post_request(post_url + "/servicex/generated-code", post_obj={
"code": request_record.selection,
}

for attempt in Retrying(stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=0.1, max=30),
reraise=True):
with attempt:
result = requests.post(post_url + "/servicex/generated-code", json=postObj,
timeout=(0.5, None))
})

if result.status_code != 200:
msg = result.json()['Message']
Expand Down
16 changes: 9 additions & 7 deletions servicex_app/servicex_app/docker_repo_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,21 @@
import re

import requests
from tenacity import Retrying, stop_after_attempt, wait_exponential_jitter
from flask import current_app

from servicex_app.reliable_requests import servicex_retry, request_timeout


class DockerRepoAdapter:
def __init__(self, registry_endpoint="https://hub.docker.com"):
self.registry_endpoint = registry_endpoint

@servicex_retry()
def get_image_by_tag(self, repo: str, image: str, tag: str) -> requests.Response:
query = f'{self.registry_endpoint}/v2/repositories/{repo}/{image}/tags/{tag}'
r = requests.get(query, timeout=request_timeout)
return r

def check_image_exists(self, tagged_image: str) -> bool:
"""
Checks that the given Docker image
Expand All @@ -47,13 +54,8 @@ def check_image_exists(self, tagged_image: str) -> bool:
return False

(repo, image, tag) = search_result.groups()
r = self.get_image_by_tag(repo, image, tag)

query = f'{self.registry_endpoint}/v2/repositories/{repo}/{image}/tags/{tag}'
for attempt in Retrying(stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=0.1, max=30),
reraise=True):
with attempt:
r = requests.get(query, timeout=(0.5, None))
if r.status_code == 404:
return False

Expand Down
20 changes: 12 additions & 8 deletions servicex_app/servicex_app/mailgun_adaptor.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
import requests
from tenacity import Retrying, stop_after_attempt, wait_exponential_jitter
from flask import current_app, render_template

from servicex_app.reliable_requests import servicex_retry, request_timeout


class MailgunAdaptor:
def __init__(self):
self.api_key = current_app.config.get('MAILGUN_API_KEY')
self.domain = current_app.config.get('MAILGUN_DOMAIN')
self.endpoint = f"https://api.mailgun.net/v3/{self.domain}/messages"

@servicex_retry()
def post_mailgun(self, data) -> requests.Response:
res = requests.post(self.endpoint, data,
auth=("api", self.api_key),
timeout=request_timeout)
return res

def send(self, email: str, template_name: str):
"""
Sends an email to the given address using the given template.
Expand All @@ -23,10 +31,6 @@ def send(self, email: str, template_name: str):
"subject": "Welcome to ServiceX!",
"html": render_template(f"emails/{template_name}")
}
for attempt in Retrying(stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=0.1, max=30),
reraise=True):
with attempt:
res = requests.post(self.endpoint, data, auth=("api", self.api_key),
timeout=(0.5, None))
res.raise_for_status()

res = self.post_mailgun(data)
res.raise_for_status()
54 changes: 54 additions & 0 deletions servicex_app/servicex_app/reliable_requests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright (c) 2024, IRIS-HEP
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from functools import wraps

from tenacity import retry, stop_after_attempt, wait_exponential_jitter

# This is the default timeout settings for requests. It represents the time to make a
# connection and then the time to wait for a response.
request_timeout = (0.5, None)


# Use this decorator on all functions that wrap requests to
# servicex microservices.
def servicex_retry(
max_attempts=3,
wait_min=0.1,
wait_max=30
):
def decorator(func):
@retry(
reraise=True,
stop=stop_after_attempt(max_attempts),
wait=wait_exponential_jitter(initial=wait_min, max=wait_max)
)
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
return decorator
28 changes: 15 additions & 13 deletions servicex_app/servicex_app/resources/users/slack_interaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,29 @@
import time

import requests
from tenacity import retry, Retrying, stop_after_attempt, wait_exponential_jitter
from flask import request, current_app, Response
from sqlalchemy.orm.exc import NoResultFound

from servicex_app.resources.servicex_resource import ServiceXResource
from servicex_app.models import UserModel
from servicex_app.web.slack_msg_builder import signup_ia, missing_slack_app, request_expired, \
from servicex_app.reliable_requests import servicex_retry, request_timeout
from servicex_app.resources.servicex_resource import ServiceXResource
from servicex_app.web.slack_msg_builder import signup_ia, missing_slack_app, \
request_expired, \
verification_failed, user_not_found


@retry(stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=0.1, max=30),
reraise=True)
@servicex_retry()
def respond(url, message):
slack_response = requests.post(url, message, timeout=(0.5, None))
slack_response = requests.post(url, message, timeout=request_timeout)
slack_response.raise_for_status()


@servicex_retry()
def slack_submit(response_url=None, response_msg=None):
slack_response = requests.post(response_url, response_msg, timeout=request_timeout)
return slack_response


class SlackInteraction(ServiceXResource):
def post(self) -> Response:
body = request.get_data().decode('utf-8')
Expand Down Expand Up @@ -61,12 +66,9 @@ def post(self) -> Response:
respond(response_url, user_not_found(str(err)))
return Response(status=404)
response_msg = signup_ia(original_msg, initiating_user, action_id)
for attempt in Retrying(stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=0.1, max=30),
reraise=True):
with attempt:
slack_response = requests.post(response_url, response_msg, timeout=(0.5, None))
slack_response.raise_for_status()

slack_response = slack_submit(response_url, response_msg)
slack_response.raise_for_status()
elif action_id == "reject_user":
# todo blocked by PR for delete-user endpoint
raise NotImplementedError
Expand Down
24 changes: 13 additions & 11 deletions servicex_app/servicex_app/web/create_profile.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import requests
from tenacity import Retrying, stop_after_attempt, wait_exponential_jitter
from flask import request, render_template, redirect, url_for, session, \
current_app, flash
from flask_jwt_extended import create_refresh_token
Expand All @@ -8,6 +7,15 @@
from servicex_app.decorators import oauth_required
from .forms import ProfileForm
from .slack_msg_builder import signup
from ..reliable_requests import servicex_retry, request_timeout


@servicex_retry()
def post_signup(webhook_url=None, signup_data=None):
res = requests.post(webhook_url,
signup_data,
timeout=request_timeout)
return res


@oauth_required
Expand Down Expand Up @@ -44,16 +52,10 @@ def create_profile():
webhook_url = current_app.config.get("SIGNUP_WEBHOOK_URL")
msg_segments = ["Profile created!"]
if webhook_url and new_user.pending:
for attempt in Retrying(stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=0.1, max=30),
reraise=True):
with attempt:
res = requests.post(webhook_url, signup(new_user.email),
timeout=(0.5, None))
# Raise exception on error (e.g. bad request or forbidden url)
res.raise_for_status()
msg_segments += ["Your account is pending approval.",
"We'll email you when it's ready."]
res = post_signup(webhook_url, signup(new_user.email))
res.raise_for_status()
msg_segments += ["Your account is pending approval.",
"We'll email you when it's ready."]
current_app.logger.info(f"Created profile for {new_user.id}")
flash(' '.join(msg_segments), 'success')
return redirect(url_for('profile'))
Expand Down

0 comments on commit 5d69990

Please sign in to comment.