-
-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sourcery Starbot ⭐ refactored evgenytsydenov/python_course #1
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -247,9 +247,8 @@ def _extract_email(self, msg: Dict[str, Any]) -> str: | |
""" | ||
headers = msg['payload']['headers'] | ||
sender = next(x for x in headers if x['name'] == 'From')['value'] | ||
match = re.search('.*<(?P<email>.*)>.*', sender) | ||
if match: | ||
sender = match.group('email') | ||
if match := re.search('.*<(?P<email>.*)>.*', sender): | ||
sender = match['email'] | ||
logger.debug(f'Sender email "{sender}" was extracted ' | ||
f'from the message with id "{msg["id"]}".') | ||
return sender | ||
|
@@ -265,10 +264,10 @@ def _extract_lesson_name(self, msg: Dict[str, Any]) -> str: | |
""" | ||
headers = msg['payload']['headers'] | ||
subject = next(x for x in headers if x['name'] == 'Subject')['value'] | ||
match = re.search('^(?P<label>.*)/(?P<lesson>.*)$', subject) | ||
les_name = '' | ||
if match: | ||
les_name = match.group('lesson') | ||
if match := re.search('^(?P<label>.*)/(?P<lesson>.*)$', subject): | ||
les_name = match['lesson'] | ||
else: | ||
les_name = '' | ||
Comment on lines
-268
to
+270
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
logger.debug(f'Lesson name "{les_name}" extracted ' | ||
f'from the message with id "{msg["id"]}".') | ||
return les_name | ||
|
@@ -381,18 +380,21 @@ def _get_label_id(self, label_name: str) -> str: | |
:return: label id. | ||
""" | ||
all_labels = self._gmail.users().labels().list(userId='me').execute() | ||
label_info = {} | ||
for label in all_labels['labels']: | ||
if label['name'] == label_name: | ||
label_info = label | ||
break | ||
label_info = next( | ||
( | ||
label | ||
for label in all_labels['labels'] | ||
if label['name'] == label_name | ||
), | ||
{}, | ||
) | ||
if label_info: | ||
logger.debug(f'Gmail label "{label_info}" already exists.') | ||
else: | ||
body = {'name': label_name, 'messageListVisibility': 'show', | ||
'labelListVisibility': 'labelShow'} | ||
label_info = self._gmail.users().labels() \ | ||
.create(userId='me', body=body).execute() | ||
.create(userId='me', body=body).execute() | ||
Comment on lines
-384
to
+397
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
logger.debug(f'New label "{label_info}" was created.') | ||
return label_info['id'] | ||
|
||
|
@@ -408,23 +410,24 @@ def _create_filter(self, label_id: str, fetch_keyword: str, | |
""" | ||
# List all filters | ||
filters = self._gmail.users().settings().filters() \ | ||
.list(userId='me').execute() | ||
.list(userId='me').execute() | ||
|
||
# Find if already exist | ||
criteria = {'to': fetch_alias, 'subject': fetch_keyword} | ||
action = {'addLabelIds': [label_id], | ||
'removeLabelIds': ['INBOX', 'SPAM']} | ||
filter_info = {} | ||
for gmail_filter in filters['filter']: | ||
if (gmail_filter['criteria'] == criteria) \ | ||
and (gmail_filter['action'] == action): | ||
filter_info = gmail_filter | ||
break | ||
|
||
if filter_info: | ||
if filter_info := next( | ||
( | ||
gmail_filter | ||
for gmail_filter in filters['filter'] | ||
if (gmail_filter['criteria'] == criteria) | ||
and (gmail_filter['action'] == action) | ||
), | ||
{}, | ||
): | ||
logger.debug(f'Filter {filter_info} already exists.') | ||
else: | ||
body = {'criteria': criteria, 'action': action} | ||
self._gmail.users().settings().filters() \ | ||
.create(userId='me', body=body).execute() | ||
.create(userId='me', body=body).execute() | ||
Comment on lines
-411
to
+432
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
logger.debug(f'Filter {filter_info} has been created.') |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,9 +75,9 @@ def _get_success_feedback(self, grade_result: GradeResult) -> (str, str): | |
""" | ||
timestamp = grade_result.timestamp.strftime(DATE_FORMAT) | ||
subject = f'{self._course_name} / {grade_result.lesson_name} ' \ | ||
f'/ {timestamp}' | ||
score = sum([task.score for task in grade_result.task_grades]) | ||
max_score = sum([task.max_score for task in grade_result.task_grades]) | ||
f'/ {timestamp}' | ||
score = sum(task.score for task in grade_result.task_grades) | ||
max_score = sum(task.max_score for task in grade_result.task_grades) | ||
Comment on lines
-78
to
+80
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
body = self._grades_body.format( | ||
first_name=grade_result.first_name, | ||
lesson_name=grade_result.lesson_name, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,10 +68,10 @@ def get_lesson_info(self, lesson_name: str) -> Dict[str, Any]: | |
les_tab = self._meta.tables['assignment'] | ||
q_filter = func.lower(les_tab.c.name) == lesson_name.lower() | ||
lesson_sql = select(['*']).where(q_filter) | ||
result = connection.execute(lesson_sql).first() | ||
les_info = {} | ||
if result: | ||
if result := connection.execute(lesson_sql).first(): | ||
les_info = dict(result) | ||
else: | ||
les_info = {} | ||
Comment on lines
-71
to
+74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
logger.debug(f'The following information about lesson with ' | ||
f'name "{lesson_name}" was loaded: {les_info}.') | ||
return les_info | ||
|
@@ -87,10 +87,10 @@ def get_user_info(self, email: str) -> Dict[str, Any]: | |
users_table = self._meta.tables['student'] | ||
q_filter = func.lower(users_table.c.email) == email.lower() | ||
user_sql = select(['*']).where(q_filter) | ||
result = connection.execute(user_sql).first() | ||
user_info = {} | ||
if result: | ||
if result := connection.execute(user_sql).first(): | ||
user_info = dict(result) | ||
else: | ||
user_info = {} | ||
Comment on lines
-90
to
+93
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
logger.debug(f'The following information about user with ' | ||
f'email "{email}" was loaded: {user_info}.') | ||
return user_info | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -225,18 +225,19 @@ def _is_notebook_valid(self, path: str, lesson_name: str) -> bool: | |
all_cells = json.load(file).get('cells', []) | ||
|
||
# Get only nbgrader cells | ||
for cell in all_cells: | ||
if 'nbgrader' in cell['metadata'].keys(): | ||
nb_cells.append( | ||
cell['metadata']['nbgrader'].get('grade_id')) | ||
nb_cells.extend( | ||
cell['metadata']['nbgrader'].get('grade_id') | ||
for cell in all_cells | ||
if 'nbgrader' in cell['metadata'].keys() | ||
) | ||
Comment on lines
-228
to
+232
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
except (JSONDecodeError, UnicodeDecodeError): | ||
logger.debug(f'File "{path}" does not have a json structure.') | ||
return False | ||
|
||
# Get valid cells | ||
with self._nb_grader.gradebook as gb: | ||
origin_cells = gb.find_notebook(lesson_name, lesson_name) \ | ||
.source_cells | ||
.source_cells | ||
true_cells = [cell.name for cell in origin_cells] | ||
return Counter(nb_cells) == Counter(true_cells) | ||
|
||
|
@@ -306,20 +307,17 @@ def _get_lesson_tasks(self, lesson_name: str) -> List[Task]: | |
tasks = [] | ||
task = None | ||
for cell in notebook['cells']: | ||
nb_data = cell['metadata'].get('nbgrader') | ||
if nb_data: | ||
if nb_data := cell['metadata'].get('nbgrader'): | ||
# If it is a task name cell | ||
if cell['cell_type'] == 'markdown': | ||
is_todo = re.match(pat, cell['source'][0]) | ||
if is_todo: | ||
task = Task(name=is_todo.group('name')) | ||
if is_todo := re.match(pat, cell['source'][0]): | ||
task = Task(name=is_todo['name']) | ||
continue | ||
|
||
# If it is a test cell | ||
if cell['cell_type'] == 'code': | ||
if nb_data['grade']: | ||
task.test_cell = nb_data['grade_id'] | ||
tasks.append(task) | ||
if cell['cell_type'] == 'code' and nb_data['grade']: | ||
task.test_cell = nb_data['grade_id'] | ||
tasks.append(task) | ||
Comment on lines
-309
to
+320
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
logger.debug(f'Task names were extracted for lesson "{lesson_name}".') | ||
return tasks | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,13 +40,13 @@ def connect(self) -> None: | |
|
||
# Get id of cloud folder | ||
query = f"mimeType = 'application/vnd.google-apps.folder' " \ | ||
f"and name = '{self.cloud_root_name}' " \ | ||
f"and trashed != True" | ||
cloud_file_id = self._find_cloud_files(query, attributes=['id']) | ||
if not cloud_file_id: | ||
f"and name = '{self.cloud_root_name}' " \ | ||
f"and trashed != True" | ||
if cloud_file_id := self._find_cloud_files(query, attributes=['id']): | ||
self._cloud_root_id = cloud_file_id[0]['id'] | ||
else: | ||
raise ValueError('The root cloud folder for publishing must be ' | ||
'pre created.') | ||
self._cloud_root_id = cloud_file_id[0]['id'] | ||
Comment on lines
-43
to
-49
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
||
def _create_cloud_path(self, cloud_path: str) -> str: | ||
"""Create all intermediate folders on the way to the cloud path. | ||
|
@@ -60,12 +60,13 @@ def _create_cloud_path(self, cloud_path: str) -> str: | |
return parent_id | ||
for name in cloud_path.split(os.sep): | ||
query = f"name = '{name}' and '{parent_id}' in parents " \ | ||
f"and trashed != True" | ||
f"and trashed != True" | ||
folder = self._find_cloud_files(query, ['id']) | ||
if not folder: | ||
parent_id = self._create_cloud_folder(name, parent_id) | ||
else: | ||
parent_id = folder[0]['id'] | ||
parent_id = ( | ||
folder[0]['id'] | ||
if folder | ||
else self._create_cloud_folder(name, parent_id) | ||
) | ||
Comment on lines
-63
to
+69
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
return parent_id | ||
|
||
def sync(self, local_file: str, cloud_folder_path: str = '.', | ||
|
@@ -96,17 +97,16 @@ def sync(self, local_file: str, cloud_folder_path: str = '.', | |
|
||
# Find the file and sync it | ||
query = f"name = '{local_name}' and '{parent_id}' in parents " \ | ||
f"and trashed != True" | ||
cloud_file = self._find_cloud_files(query, ['id']) | ||
if not cloud_file: | ||
logger.debug(f'File or folder "{local_name}" does not exist ' | ||
f'in the cloud path "{cloud_folder_path}".') | ||
cloud_file_id = self._upload_file(local_file, parent_id) | ||
else: | ||
f"and trashed != True" | ||
if cloud_file := self._find_cloud_files(query, ['id']): | ||
cloud_file_id = cloud_file[0]['id'] | ||
logger.debug(f'File or folder "{local_name}" exists ' | ||
f'in the cloud path "{cloud_folder_path}".') | ||
self._update_cloud_file(cloud_file_id, local_file) | ||
else: | ||
logger.debug(f'File or folder "{local_name}" does not exist ' | ||
f'in the cloud path "{cloud_folder_path}".') | ||
cloud_file_id = self._upload_file(local_file, parent_id) | ||
Comment on lines
-99
to
+109
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
logger.debug(f'Content of the local object "{local_file}" was ' | ||
f'synchronized with the cloud file "{cloud_name}".') | ||
|
||
|
@@ -115,15 +115,15 @@ def sync(self, local_file: str, cloud_folder_path: str = '.', | |
file_params = self._get_cloud_file( | ||
cloud_file_id, ['permissions', 'name']) | ||
file_params[link_type] = f'http://drive.google.com/' \ | ||
f'thumbnail?id={cloud_file_id}' | ||
f'thumbnail?id={cloud_file_id}' | ||
else: | ||
file_params = self._get_cloud_file( | ||
cloud_file_id, ['permissions', link_type, 'name']) | ||
if to_share: | ||
is_shared = False | ||
for p in file_params['permissions']: | ||
if (p['id'] == 'anyoneWithLink') and (p['role'] == 'reader'): | ||
is_shared = True | ||
is_shared = any( | ||
(p['id'] == 'anyoneWithLink') and (p['role'] == 'reader') | ||
for p in file_params['permissions'] | ||
) | ||
if is_shared: | ||
logger.debug(f'File or folder "{file_params["name"]}" with ' | ||
f'id "{cloud_file_id}" is already shared.') | ||
|
@@ -297,18 +297,15 @@ def _find_cloud_files(self, query: str, attributes: Iterable[str]) \ | |
results['files'] += next_page['files'] | ||
return results['files'] | ||
|
||
def _get_cloud_file(self, file_id: str, attributes: Iterable[str]) \ | ||
-> Dict[str, Any]: | ||
def _get_cloud_file(self, file_id: str, attributes: Iterable[str]) -> Dict[str, Any]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
"""Get attributes of cloud file. | ||
|
||
:param file_id: if of file. | ||
:param attributes: attributes to get. | ||
:return: dict with attributes. | ||
""" | ||
fields = f'{", ".join(attributes)}' | ||
results = self._gdrive.files().get( | ||
fileId=file_id, fields=fields).execute() | ||
return results | ||
return self._gdrive.files().get(fileId=file_id, fields=fields).execute() | ||
|
||
def _get_ignore_files(self) -> List[str]: | ||
"""Get file patterns to ignore when publishing. | ||
|
@@ -318,8 +315,8 @@ def _get_ignore_files(self) -> List[str]: | |
patterns = [] | ||
path = os.path.join(os.path.dirname(__file__), '.publishignore') | ||
with open(path, 'r') as file: | ||
for line in file.readlines(): | ||
if line.strip() and not line.startswith('#'): | ||
patterns.append(line) | ||
patterns.extend( | ||
line for line in file if line.strip() and not line.startswith('#') | ||
) | ||
Comment on lines
-321
to
+320
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
logger.debug('Publish ignore files were loaded.') | ||
return patterns |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,9 +33,7 @@ def formatTime(self, record: logging.LogRecord, | |
:return: time in string format. | ||
""" | ||
dt = self.converter(record.created) | ||
if datefmt: | ||
return dt.strftime(datefmt) | ||
return dt.isoformat() | ||
return dt.strftime(datefmt) if datefmt else dt.isoformat() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
||
|
||
def _get_file_handler(path: str) -> logging.FileHandler: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,8 +58,9 @@ def send(self, destination: str, subject: str, | |
file_attachment.set_payload(file_obj.read()) | ||
encoders.encode_base64(file_attachment) | ||
file_attachment.add_header( | ||
f'Content-Disposition', | ||
f'attachment; filename="{file_name}"') | ||
'Content-Disposition', | ||
f'attachment; filename="{file_name}"', | ||
) | ||
Comment on lines
-61
to
+63
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
message.attach(file_attachment) | ||
text = message.as_string() | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function
GmailExchanger._extract_email
refactored with the following changes:use-named-expression
)use-getitem-for-re-match-groups
)