Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

17243 rate limits part deux #27

Open
wants to merge 16 commits into
base: master
Choose a base branch
from

Conversation

thatandromeda
Copy link
Collaborator

This answers the feedback from #25 (which I'm going to close in favor of this PR) and completes the background processing, thus fully addressing the scope of https://cyber.harvard.edu/projectmanagement/issues/17243 .

This demanded some pretty major surgery. Superficially, some methods are
moved from DataSet to TweetFetcher so tests and other things using that
interface need to be updated. But more critically, it now makes sense to
stage ALL the tweet data in Elasticsearch.

Originally, we kept Tweet objects on DataSets as they kept in and did
metadata extraction on those as part of the data set pipeline process.
However, we're now fetching tweets one user at a time -- and those
times may be pretty separated, since TweetFetcher will ultimately be a
delayed job subject to Twitter rate limits. This means that tweets
can't be accumulated on DataSet, but they ARE all available in
Elasticsearch.

Therefore it makes sense to reuse our existing extractor features which
rehydrate tweets from Elasticsearch, and update it to also be used in
the case where only one DataSet is fed into an extractor. This results
in a small change to the Extractor API (they are now initialized with
one or more DataSets instead of with Tweets).

On the bright side, when we improve extractor performance on
Elasticsearch, that work should automatically apply to all cases.

Next up, providing for delayed jobs and rate limiting!
This sets the backoff number for a TweetFetcher based on how much is
in the queue right now. This will in turn allow us to schedule jobs in a
way that respect rate limits.

Add an index to TweetFetcher to make the select count fast.
At heart, this:
* Installs DelayedJob
* Adds a TweetFetchingJob
* Delegates tweet-fetching to said job
* Moves logic from TweetFetcher to TweetFetchingJob as needed
* Makes sure the existing tests pass.

In addition, during code review, it came up that it would be good to
have TweetFetcher take all user IDs related to a data set and be able to
delegate to multiple potential tweet-fetching back ends, while
presenting a stable interface to DataSet. The current
TweetFetcher#ingest can do that without changing DataSet or
TweetFetchingJob, e.g. with the addition of a parameter that defaults to
TweetFetchingJob, or other logic to determine fetcher type (e.g.
TweetFetcher checks an environment variable to choose a handler, and
the internals of #ingest are rewritten accordingly).
This really should be using the native delayed_job retry functions and
the #error hook, but I just can't get those to work; exceptions I throw
in spec don't get caught by #error.
@thatandromeda thatandromeda requested a review from jdcc June 25, 2020 18:06
@thatandromeda
Copy link
Collaborator Author

I'm testing this out on the dev server with your real cohort data and I've found two bugs, one of which I'll deal with, one of which is inside your scope.

Mine: sometimes it double-processes the same user id, leading to errors with fully_processed?

Yours: with large data sets the number of tweets returned by Elasticsearch is too large to process with the current logic (exceeds the scroll window) -- this should be addressed as part of your efficiency work.

Copy link
Collaborator

@jdcc jdcc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a few thoughts. Much cleaner, but I got confused in a few places I think.

app/jobs/tweet_fetching_job.rb Outdated Show resolved Hide resolved
# In addition, since this looks at all enqueued TweetFetchers and not just
# those belonging to a given DataSet, when we are fetching data for multiple
# DataSets at once, they will cooperate to avoid hitting the window.
[2*window - (2.0**(-2*enqueued/limit))*4*window, 0].max
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this assuming that the queue will only consist of TweetFetchingJobs? It's conceivable we'll use this infrastructure for other things in the future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't traced through how this works fully, so this might be moot, but depending solely on the number of jobs in the queue frightens me a little. Is it possible jobs can end up waiting longer because there are jobs after them in the queue?

You're doing as exponential rather than linear so it's faster for small cohorts, correct? I think that's beneficial as long as the size of cohorts themselves is something like an Exponential. But most of the cohorts we'll be dealing with are n=1000, and an expo backoff is unboundedly bad relative to maximal throughput as cohorts grow to infinity. This might all be irrelevant given the limits, but how long will we end up waiting between i=997, 998, and 999? I think this expo needs to be capped or made linear or we store the n for the last RATE_LIMIT_WINDOW somewhere and have all requests call a throttle function.

Copy link
Collaborator Author

@thatandromeda thatandromeda Jul 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is assuming that the queue will consist only of TweetFetchingJobs. We can enforce that if we need to in future by updating `self.enqueued. I'll comment it. I am not inclined to prematurely optimize around job types that don't exist.

The exponential makes it fast for small cohorts, yes, and fairly fast for medium-sized ones, while stretching it out for long ones. In the 997-999 range, we're waiting 1-2 seconds between jobs.

It builds in a cap of about twice the rate-limiting window (here, about 30 minutes). But I actually think capping is a problem, now that I think about it -- if we had, say, a 10K cohort, we'd end up stacking most of that cohort at almost 30 minutes out, and then we'd end up rate-limiting there.

I've just convinced myself we can't have all 3 of [cap on length of time, rate limit avoidance, arbitrary cohort sizes]. Right now we have the first two. I could do the second two instead, e.g. via a linear function (or better yet a relu), but I can't do all 3.

I can also periodically check the rate-limit-status endpoint and use data from there to reschedule everything, but I couldn't figure out a safe way to do that.

spec/jobs/tweet_fetching_job_spec.rb Show resolved Hide resolved
app/models/data_set.rb Show resolved Hide resolved
# Stop execution of jobs; restart it after the rate limiting window has
# elapsed.
stop_cmd = "RAILS_ENV=#{Rails.env} #{Rails.root}/bin/delayed_job stop"
start_cmd = "RAILS_ENV=#{Rails.env} #{Rails.configuration.delayed_job_command} | at now + #{Rails.configuration.rate_limit_window} minutes"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more complex and less portable than modifying the run_ats in the delayed_jobs table. Does it have advantages?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modifying run_at won't affect workers which have already reserved jobs (they can reserve multiple jobs at a time, and they check the value of run_at at the time of reservation).

That said if this is a graceful exit the workers are going to finish their queue before stopping, so I should check that behavior.

(Honestly the whole rate-limiting handling has been very challenging -- Twitter says "just check the endpoint periodically" but I couldn't think of a strategy that seemed sound for that, and even when I do that I need to stop everything once we hit the rate-limiting window anyway -- so if you have better ideas I am happy to hear them.)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spit balling:
The fastest way to fulfill an arbitrary number of requests is to immediately use up all allowed requests in this 15 minute window, wait for window reset, and then immediately use up all allowed requests in the next 15 minute window, etc. To me, that suggests a single queue that maintains a running tally and that can be paused and resumed at any moment rather than maintaining a schedule. The architecture that falls out of that is a single, long running coordinator process that handles throttling the requests that get submitted to it. This can be a single worker running off a single queue. That worker doesn't actually run the requests. Instead, it creates request jobs that run out of a different queue on a different set of workers. If there are no requests left in this window's budget, the coordinator process sleeps until the window resets. Requests can still happen in parallel, but no request jobs get created if there is no budget for them.

I dunno. Does that sound reasonable? Things are way easier for me to reason about if there's a single choke point to handle throttling than trying to make decentralized workers coordinate.

app/jobs/tweet_fetching_job.rb Outdated Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants