Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch BigQueryClient to retry on network error #3088

Merged
merged 4 commits into from
Oct 13, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions luigi/contrib/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,22 @@ class BigQueryClient:
https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest
"""

def __init__(self, oauth_credentials=None, descriptor='', http_=None):
authenticate_kwargs = gcp.get_authenticate_kwargs(oauth_credentials, http_)

if descriptor:
self.client = discovery.build_from_document(descriptor, **authenticate_kwargs)
def __init__(self, oauth_credentials=None, descriptor='', http_=None, retry_limit=2):
# Save initialisation arguments in case we need to re-create client
# due to connection timeout
self.oauth_credentials = oauth_credentials
self.descriptor = descriptor
self.http_ = http_
self.retry_limit = retry_limit
self.retry_count = 0

self.__initialise_client()

def __initialise_client(self):
authenticate_kwargs = gcp.get_authenticate_kwargs(self.oauth_credentials, self.http_)

if self.descriptor:
self.client = discovery.build_from_document(self.descriptor, **authenticate_kwargs)
else:
self.client = discovery.build('bigquery', 'v2', cache_discovery=False, **authenticate_kwargs)

Expand All @@ -147,6 +158,12 @@ def dataset_exists(self, dataset):
fetched_location if fetched_location is not None else 'unspecified',
dataset.location))

except (TimeoutError, BrokenPipeError, IOError) as bq_connection_error:
if self.retry_count > self.retry_limit:
raise Exception(f"Exceeded max retries for BigQueryClient connection error: {bq_connection_error}") from bq_connection_error
self.retry_count += 1
self.__initialise_client()
self.dataset_exists(dataset)
Copy link
Contributor

@hirosassa hirosassa Jun 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using retry decorator to simplify the codes?
The implementation below will be helpful.
https://github.com/spotify/luigi/blob/master/luigi/contrib/gcs.py#L72-L81

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll make that change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jamesmcm Thank you! Looks good to me, but flake8 notifies some formatting problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I fixed that too.

except http.HttpError as ex:
if ex.resp.status == 404:
return False
Expand All @@ -167,6 +184,12 @@ def table_exists(self, table):
self.client.tables().get(projectId=table.project_id,
datasetId=table.dataset_id,
tableId=table.table_id).execute()
except (TimeoutError, BrokenPipeError, IOError) as bq_connection_error:
if self.retry_count > self.retry_limit:
raise Exception(f"Exceeded max retries for BigQueryClient connection error: {bq_connection_error}") from bq_connection_error
self.retry_count += 1
self.__initialise_client()
self.table_exists(table)
except http.HttpError as ex:
if ex.resp.status == 404:
return False
Expand Down