Skip to content

Commit

Permalink
Version 0.7.0.
Browse files Browse the repository at this point in the history
  • Loading branch information
merretbuurman committed Feb 16, 2017
1 parent fca8ae7 commit 2a8ff28
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 89 deletions.
4 changes: 2 additions & 2 deletions esgfpid/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
RABBIT_SYN_TIMEOUT_MILLISEC=10

# Asynchronous rabbit module:
RABBIT_ASYN_RECONNECTION_SECONDS=2 # after how much time try to reconnect if connection was closed?
RABBIT_ASYN_RECONNECTION_MAX_TRIES=60 # how many times should the module try reconnecting? server reboot can take quite a while!
RABBIT_ASYN_RECONNECTION_SECONDS=0.5 # after how much time try to retry connecting to same hosts if connection was closed?
RABBIT_ASYN_RECONNECTION_MAX_TRIES=2 # how many times should the module try reconnecting? Not so many times, rather throw exception to publisher.
RABBIT_ASYN_SOCKET_TIMEOUT=2 # defaults to 0.25 sec
RABBIT_ASYN_CONNECTION_ATTEMPTS=1 # defaults to 1
RABBIT_ASYN_CONNECTION_RETRY_DELAY_SECONDS=1 # no default
Expand Down
5 changes: 5 additions & 0 deletions esgfpid/rabbit/asynchronous/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ def __init__(self, custom_message=None):

super(self.__class__, self).__init__(self.msg)

class PIDServerException(Exception):

def __init__(self, custom_message):
super(self.__class__, self).__init__(custom_message)

class OperationNotAllowed(Exception):

def __init__(self, custom_message=None, op=None):
Expand Down
2 changes: 1 addition & 1 deletion esgfpid/rabbit/asynchronous/rabbitthread.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ def tell_publisher_to_stop_waiting_for_thread_to_accept_events(self):
the main thread, but that causes problems, as it messes
up the proper and punctual receival of RabbitMQ confirms.
'''
def tell_publisher_to_stop_waiting(self):
def tell_publisher_to_stop_waiting_for_gentle_finish(self):
self.__gently_finish_ready.set()

'''
Expand Down
180 changes: 102 additions & 78 deletions esgfpid/rabbit/asynchronous/thread_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import esgfpid.defaults as defaults
import esgfpid.rabbit.connparams
from esgfpid.utils import loginfo, logdebug, logtrace, logerror, logwarn, log_every_x_times
from .exceptions import PIDServerException

LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.NullHandler())
Expand Down Expand Up @@ -72,7 +73,7 @@ def first_connection(self):
self.__start_waiting_for_events()
logtrace(LOGGER, 'Had started waiting for events, but stopped.')

def __start_waiting_for_events(self, max_retries=10, retry_seconds=0.5): # TODO Put these values into config!
def __start_waiting_for_events(self):
'''
This waits until the whole chain of callback methods triggered by
"trigger_connection_to_rabbit_etc()" has finished, and then starts
Expand All @@ -86,58 +87,75 @@ def __start_waiting_for_events(self, max_retries=10, retry_seconds=0.5): # TODO
might get called before the __init__ has finished? I'd rather stay on the
safe side, as my experience of threading in Python is limited.
'''
counter_of_tries = 0
while True:
counter_of_tries += 1

# Start ioloop if connection object ready:
if self.thread._connection is not None:
try:
logdebug(LOGGER, 'Starting ioloop...')
logtrace(LOGGER, 'ioloop is owned by connection %s...', self.thread._connection)
logdebug(LOGGER, 'Starting ioloop, can now fire events...')

# Tell the main thread that we're now open for events.
# As soon as the thread._connection object is not None anymore, it
# can receive events.
# TODO Or do we need to wait for the ioloop to be started? In that case,
# the "...stop_waiting..." would have to be called after starting the
# ioloop, which does not work, as the ioloop.start() blocks.
self.thread.tell_publisher_to_stop_waiting_for_thread_to_accept_events()
self.thread.continue_gently_closing_if_applicable()

# Start ioloop if connection object ready:
if self.thread._connection is not None:
try:
logdebug(LOGGER, 'Starting ioloop...')
logtrace(LOGGER, 'ioloop is owned by connection %s...', self.thread._connection)

# Tell the main thread that we're now open for events.
# As soon as the thread._connection object is not None anymore, it
# can receive events.
self.thread.tell_publisher_to_stop_waiting_for_thread_to_accept_events()
self.thread.continue_gently_closing_if_applicable()
self.thread._connection.ioloop.start()

except pika.exceptions.ProbableAuthenticationError as e:

# If the library was stopped by the user in the mean time,
# we do not try to reconnect.
if self.statemachine.is_PERMANENTLY_UNAVAILABLE():
logerror(LOGGER, 'Caught Authentication Exception during connection ("%s"). Will not reconnect.', e.__class__.__name__)
# We do not do anything anymore, as the shutdown has been
# handled already, otherwise, the state would not be
# permanently unavailable.

# In normal cases, we do try to reconnect. As we will try
# to reconnect, we set state to waiting to connect.
else:
logerror(LOGGER, 'Caught Authentication Exception during connection ("%s"). Will try to reconnect to next host.', e.__class__.__name__)
self.statemachine.set_to_waiting_to_be_available()
self.statemachine.detail_authentication_exception = True # TODO WHAT FOR?

# It seems that ProbableAuthenticationErrors do not cause
# RabbitMQ to call any callback, either on_connection_closed
# or on_connection_error - it just silently swallows the
# problem.
# So we need to manually trigger reconnection to the next
# host here, which we do by manually calling the callback.
errorname = 'ProbableAuthenticationError issued by pika'
self.on_connection_error(self.thread._connection, errorname)

# We start the ioloop, so it can handle the reconnection events,
# or also receive events from the publisher in the meantime.
self.thread._connection.ioloop.start()
break

except pika.exceptions.ProbableAuthenticationError as e:
logerror(LOGGER, 'Cannot properly start the thread. Caught Authentication Exception during startup ("%s")', e.__class__.__name__)
if self.thread.get_num_unpublished() > 0:
logerror(LOGGER, 'The %i messages that are waiting to be published will not be published.', self.thread.get_num_unpublished())
self.statemachine.set_to_permanently_unavailable() # to make sure no more messages are accepted, and gentle-finish won't wait...
self.statemachine.detail_authentication_exception = True
self.thread._connection.ioloop.start() # to be able to listen to finish events from main thread!
break

except Exception as e:
# This catches any error during connection startup and during the entire
# time the ioloop runs, blocks and waits for events.
logerror(LOGGER, 'Unexpected error during event listener\'s lifetime: %s: %s', e.__class__.__name__, e.message)
self.statemachine.set_to_permanently_unavailable() # to make sure no more messages are accepted, and gentle-finish won't wait...
self.thread._connection.ioloop.start() # to be able to listen to finish events from main thread!
break

# Otherwise, wait and retry
elif counter_of_tries < max_retries:
logdebug(LOGGER, 'Very unexpected: Connection object is not ready in try %s/%s. Trying again after %s seconds.', counter_of_tries, max_retries, retry_seconds)
time.sleep(retry_seconds)

# If we have reached the max number of retries:
# TODO I don't think that this can happen, as the connection object
# always exists, no matter if the actual connection to RabbitMQ
# succeeds of not.
else:
logdebug(LOGGER, 'Very unexpected: Connection object is not ready in try %s/%s. Giving up.', counter_of_tries, max_retries)
logerror(LOGGER, 'Cannot properly start the thread. Connection object is not ready.')
break

except Exception as e:
# This catches any error during connection startup and during the entire
# time the ioloop runs, blocks and waits for events.
logerror(LOGGER, 'Unexpected error during event listener\'s lifetime: %s: %s', e.__class__.__name__, e.message)

# As we will try to reconnect, set state to waiting to connect.
# If reconnection fails, it will be set to permanently unavailable.
self.statemachine.set_to_waiting_to_be_available()

# In case this error is reached, it seems that no callback
# was called that handles the problem. Let's try to reconnect
# somewhere else.
errorname = 'Unexpected error ('+str(e.__class__.__name__)+': '+str(e.message)+')'
self.on_connection_error(self.thread._connection, errorname)

# We start the ioloop, so it can handle the reconnection events,
# or also receive events from the publisher in the meantime.
self.thread._connection.ioloop.start()

else:
# I'm quite sure that this cannot happen, as the connection object
# is created in "trigger_connection_...()" and thus exists, no matter
# if the actual connection to RabbitMQ succeeded (yet) or not.
logdebug(LOGGER, 'This cannot happen: Connection object is not ready.')
logerror(LOGGER, 'Cannot happen. Cannot properly start the thread. Connection object is not ready.')

########################################
### Chain of callback functions that ###
Expand All @@ -158,8 +176,8 @@ def __please_open_connection(self):
parameters=params,
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_error,
on_close_callback=None,
stop_ioloop_on_close=False
on_close_callback=self.on_connection_closed,
stop_ioloop_on_close=False # TODO Why not?
)

''' Callback, called by RabbitMQ.'''
Expand All @@ -172,10 +190,11 @@ def on_connection_open(self, unused_connection):
# Tell the main thread we're open for events now:
# When the connection is open, the thread is ready to accept events.
# Note: It was already ready when the connection object was created,
# not just now that it's actually open. So this second call to
# "...stop_waiting..." should be redundant!
# not just now that it's actually open. There was already a call to
# "...stop_waiting..." in start_waiting_for_events(), which quite
# certainly was carried out before this callback. So this call to
# "...stop_waiting..." is likelily redundant!
self.thread.tell_publisher_to_stop_waiting_for_thread_to_accept_events()
self.__add_on_connection_close_callback()
self.__please_open_rabbit_channel()

''' Asynchronous, waits for answer from RabbitMQ.'''
Expand Down Expand Up @@ -265,38 +284,38 @@ def on_connection_error(self, connection, msg):
oldhost = self.__node_manager.get_connection_parameters().host
loginfo(LOGGER, 'Failed connection to RabbitMQ at %s. Reason: %s.', oldhost, msg)

reopen_seconds = None

# If there is alternative URLs, try one of them:
if self.__node_manager.has_more_urls():
logdebug(LOGGER, 'Connection failure: %s fallback URLs left to try.', self.__node_manager.get_num_left_urls())
self.__node_manager.set_next_host()
newhost = self.__node_manager.get_connection_parameters().host
loginfo(LOGGER, 'Connection failure: Trying to connect (now) to %s.', newhost)
reopen_seconds = 0
self.__wait_and_trigger_reconnection(connection, reopen_seconds)

# If there is no alternative URLs, reset the node manager to

# If there is no URLs, reset the node manager to
# start at the first nodes again...
else:
self.__reconnect_counter += 1;
if self.__reconnect_counter <= defaults.RABBIT_ASYN_RECONNECTION_MAX_TRIES:
reopen_seconds = defaults.RABBIT_ASYN_RECONNECTION_SECONDS
logdebug(LOGGER, 'Connection failure: Failed connecting to all hosts. Waiting %s seconds and starting over.', reopen_seconds)
self.__node_manager.reset_nodes()
newhost = self.__node_manager.get_connection_parameters().host
loginfo(LOGGER, 'Connection failure: Trying to connect (in %s seconds) to %s.', reopen_seconds, newhost)
self.__wait_and_trigger_reconnection(connection, reopen_seconds)

# Give up after so many tries...
else:
self.statemachine.set_to_permanently_unavailable()
self.statemachine.detail_could_not_connect = True
logdebug(LOGGER, 'Connection failure: Tried all hosts %s times. Giving up.', defaults.RABBIT_ASYN_RECONNECTION_MAX_TRIES)
logwarn(LOGGER, 'Permanently failed to connect to RabbitMQ. No PID requests will be sent.')
return None # to avoid reconnection
max_tries = defaults.RABBIT_ASYN_RECONNECTION_MAX_TRIES
errormsg = ('Permanently failed to connect to RabbitMQ. Tried all hosts %s times. Giving up. No PID requests will be sent.' % max_tries)
logwarn(LOGGER, errormsg)
raise PIDServerException(errormsg)

# The actual reconnection is triggered here:
newhost = self.__node_manager.get_connection_parameters().host
if reopen_seconds == 0:
loginfo(LOGGER, 'Connection failure: Trying to connect (now) to %s.', newhost)
else:
loginfo(LOGGER, 'Connection failure: Trying to connect (in %s seconds) to %s.', reopen_seconds, newhost)
self.__wait_and_trigger_reconnection(connection, reopen_seconds)

#############################
### React to channel and ###
Expand All @@ -309,11 +328,6 @@ def on_connection_error(self, connection, msg):
def __add_on_return_callback(self):
self.thread._channel.add_on_return_callback(self.returnhandler.on_message_not_accepted)

''' This tells RabbitMQ what to do if the connection
was closed. '''
def __add_on_connection_close_callback(self):
self.thread._connection.add_on_close_callback(self.on_connection_closed)

'''
This tells RabbitMQ what to do if the channel
was closed.
Expand Down Expand Up @@ -485,10 +499,20 @@ def make_permanently_closed_by_error(self, connection, reply_text):
in waiting.
'''
def __wait_and_trigger_reconnection(self, connection, wait_seconds):
self.statemachine.set_to_waiting_to_be_available()
loginfo(LOGGER, 'Trying to reconnect to RabbitMQ in %s seconds.', wait_seconds)
connection.add_timeout(wait_seconds, self.reconnect)
logtrace(LOGGER, 'Reconnect event added to connection %s (not to %s)', connection, self.thread._connection)

# Do not reconnect if the library was permanently closed.
if self.statemachine.is_PERMANENTLY_UNAVAILABLE():
logdebug(LOGGER, 'No more reconnection, as the library was permanently closed.')
# No need to do anything else. Whoever set the state
# to permanently unavailable also handled the necessary
# close down steps.

# Otherwise, do reconnect.
else:
self.statemachine.set_to_waiting_to_be_available()
loginfo(LOGGER, 'Trying to reconnect to RabbitMQ in %s seconds.', wait_seconds)
connection.add_timeout(wait_seconds, self.reconnect)
logtrace(LOGGER, 'Reconnect event added to connection %s (not to %s)', connection, self.thread._connection)

###########################
### Reconnect after ###
Expand Down
20 changes: 14 additions & 6 deletions esgfpid/rabbit/asynchronous/thread_shutter.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,23 @@ def __wait_some_more_and_redecide(self, iteration):
logerror(LOGGER, 'Connection was None when trying to wait for pending messages. Synchronization error between threads!')


def __tell_publisher_to_stop_waiting(self):
def __tell_publisher_to_stop_waiting_for_gentle_finish(self):
logdebug(LOGGER, 'Main thread does not need to wait anymore. (%s).', get_now_utc_as_formatted_string())
self.thread.tell_publisher_to_stop_waiting()

# This avoids that the last iteration is redone
# and redone upon reconnection, as after reconnection,
# if this is True, the algorithm is entered again.
self.__is_in_process_of_gently_closing = False

# This releases the event that blocks the main thread
# until the gentle finish is done.
self.thread.tell_publisher_to_stop_waiting_for_gentle_finish()

def __close_because_all_done(self, iteration):
logdebug(LOGGER, 'Gentle finish (iteration %i): All messages sent and confirmed in %ith try (waited and rechecked %i times).', self.__close_decision_iterations, iteration, iteration-1)
loginfo(LOGGER, 'All messages sent and confirmed. Closing.')
self.__normal_finish()
self.__tell_publisher_to_stop_waiting()
self.__tell_publisher_to_stop_waiting_for_gentle_finish()

def __close_because_no_point_in_waiting(self):

Expand All @@ -201,12 +209,12 @@ def __close_because_no_point_in_waiting(self):

# Actual close
self.__force_finish('Force finish as we are not sending the messages anyway.')
self.__tell_publisher_to_stop_waiting()
self.__tell_publisher_to_stop_waiting_for_gentle_finish()

def __close_because_waited_long_enough(self):
logdebug(LOGGER, 'We have waited long enough. Now closing by force.')
self.__force_finish('Force finish as normal waiting period in normal finish is over.')
self.__tell_publisher_to_stop_waiting()
self.__tell_publisher_to_stop_waiting_for_gentle_finish()

def __inform_about_pending_messages(self):
msg = self.__get_string_about_pending_messages()
Expand Down Expand Up @@ -251,7 +259,7 @@ def __close_down(self, reply_code, reply_text):

# Make sure the main thread does not continue blocking
# as it believes that we're still looking for pending messages:
self.__tell_publisher_to_stop_waiting()
self.__tell_publisher_to_stop_waiting_for_gentle_finish()

# Change the state of the state machine:
self.statemachine.set_to_permanently_unavailable()
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@

setup(
name='esgfpid',
version='0.6.3',
version='0.7.0',
author='Merret Buurman, German Climate Computing Centre (DKRZ)',
author_email='buurman@dkrz.de',
url='https://github.com/IS-ENES-Data/esgf-pid',
download_url='https://github.com/IS-ENES-Data/esgf-pid/archive/0.6.3.tar.gz',
download_url='https://github.com/IS-ENES-Data/esgf-pid/archive/0.7.0.tar.gz',
description='Library for sending PID requests to a rabbit messaging queue during ESGF publication.',
long_description=long_description,
packages=packages + test_packages,
Expand Down

0 comments on commit 2a8ff28

Please sign in to comment.