diff --git a/luigi/contrib/bigquery.py b/luigi/contrib/bigquery.py index 4661c68245..9da6d883ed 100644 --- a/luigi/contrib/bigquery.py +++ b/luigi/contrib/bigquery.py @@ -21,14 +21,38 @@ import time from luigi.contrib import gcp +from tenacity import retry +from tenacity import retry_if_exception +from tenacity import retry_if_exception_type +from tenacity import wait_exponential +from tenacity import stop_after_attempt + logger = logging.getLogger('luigi-interface') +RETRYABLE_ERRORS = None try: + import httplib2 from googleapiclient import discovery + from googleapiclient import errors from googleapiclient import http except ImportError: logger.warning('BigQuery module imported, but google-api-python-client is ' 'not installed. Any BigQuery task will fail') +else: + RETRYABLE_ERRORS = (httplib2.HttpLib2Error, IOError, TimeoutError, BrokenPipeError) + + +# Retry configurations. For more details, see https://tenacity.readthedocs.io/en/latest/ +def is_error_5xx(err): + return isinstance(err, errors.HttpError) and err.resp.status >= 500 + + +bq_retry = retry(retry=(retry_if_exception(is_error_5xx) | retry_if_exception_type(RETRYABLE_ERRORS)), + wait=wait_exponential(multiplier=1, min=1, max=10), + stop=stop_after_attempt(3), + reraise=True, + after=lambda x: x.args[0].__initialise_client() + ) class CreateDisposition: @@ -122,13 +146,23 @@ class BigQueryClient: """ def __init__(self, oauth_credentials=None, descriptor='', http_=None): - authenticate_kwargs = gcp.get_authenticate_kwargs(oauth_credentials, http_) + # Save initialisation arguments in case we need to re-create client + # due to connection timeout + self.oauth_credentials = oauth_credentials + self.descriptor = descriptor + self.http_ = http_ + + self.__initialise_client() - if descriptor: - self.client = discovery.build_from_document(descriptor, **authenticate_kwargs) + def __initialise_client(self): + authenticate_kwargs = gcp.get_authenticate_kwargs(self.oauth_credentials, self.http_) + + if self.descriptor: + self.client = discovery.build_from_document(self.descriptor, **authenticate_kwargs) else: self.client = discovery.build('bigquery', 'v2', cache_discovery=False, **authenticate_kwargs) + @bq_retry def dataset_exists(self, dataset): """Returns whether the given dataset exists. If regional location is specified for the dataset, that is also checked @@ -147,7 +181,6 @@ def dataset_exists(self, dataset): raise Exception('''Dataset already exists with regional location {}. Can't use {}.'''.format( fetched_location if fetched_location is not None else 'unspecified', dataset.location)) - except http.HttpError as ex: if ex.resp.status == 404: return False @@ -155,6 +188,7 @@ def dataset_exists(self, dataset): return True + @bq_retry def table_exists(self, table): """Returns whether the given table exists.