Upon instantiating, a connection to BigQuery is established
and maintained for the life of the object until the close method is called.
It is recommended to use this block as a context manager, which will automatically
close the connection and its cursors when the context is exited.
It is also recommended that this block is loaded and consumed within a single task
or flow because if the block is passed across separate tasks and flows,
the state of the block's connection and cursor could be lost.
The number of rows to fetch at a time when calling fetch_many.
Note, this parameter is executed on the client side and is not
passed to the database. To limit on the server side, add the LIMIT
clause, or the dialect's equivalent clause, like TOP, to the query.
classBigQueryWarehouse(DatabaseBlock):""" A block for querying a database with BigQuery. Upon instantiating, a connection to BigQuery is established and maintained for the life of the object until the close method is called. It is recommended to use this block as a context manager, which will automatically close the connection and its cursors when the context is exited. It is also recommended that this block is loaded and consumed within a single task or flow because if the block is passed across separate tasks and flows, the state of the block's connection and cursor could be lost. Attributes: gcp_credentials: The credentials to use to authenticate. fetch_size: The number of rows to fetch at a time when calling fetch_many. Note, this parameter is executed on the client side and is not passed to the database. To limit on the server side, add the `LIMIT` clause, or the dialect's equivalent clause, like `TOP`, to the query. """# noqa_block_type_name="BigQuery Warehouse"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/10424e311932e31c477ac2b9ef3d53cefbaad708-250x250.png"# noqa_documentation_url="https://prefecthq.github.io/prefect-gcp/bigquery/#prefect_gcp.bigquery.BigQueryWarehouse"# noqa: E501gcp_credentials:GcpCredentialsfetch_size:int=Field(default=1,description="The number of rows to fetch at a time.")_connection:Optional["Connection"]=None_unique_cursors:Dict[str,"Cursor"]=Nonedef_start_connection(self):""" Starts a connection. """withself.gcp_credentials.get_bigquery_client()asclient:self._connection=Connection(client=client)defblock_initialization(self)->None:super().block_initialization()ifself._connectionisNone:self._start_connection()ifself._unique_cursorsisNone:self._unique_cursors={}defget_connection(self)->"Connection":""" Get the opened connection to BigQuery. """returnself._connectiondef_get_cursor(self,inputs:Dict[str,Any])->Tuple[bool,"Cursor"]:""" Get a BigQuery cursor. Args: inputs: The inputs to generate a unique hash, used to decide whether a new cursor should be used. Returns: Whether a cursor is new and a BigQuery cursor. """input_hash=hash_objects(inputs)assertinput_hashisnotNone,("We were not able to hash your inputs, ""which resulted in an unexpected data return; ""please open an issue with a reproducible example.")ifinput_hashnotinself._unique_cursors.keys():new_cursor=self._connection.cursor()self._unique_cursors[input_hash]=new_cursorreturnTrue,new_cursorelse:existing_cursor=self._unique_cursors[input_hash]returnFalse,existing_cursordefreset_cursors(self)->None:""" Tries to close all opened cursors. """input_hashes=tuple(self._unique_cursors.keys())forinput_hashininput_hashes:cursor=self._unique_cursors.pop(input_hash)try:cursor.close()exceptExceptionasexc:self.logger.warning(f"Failed to close cursor for input hash {input_hash!r}: {exc}")@sync_compatibleasyncdeffetch_one(self,operation:str,parameters:Optional[Dict[str,Any]]=None,**execution_options:Dict[str,Any],)->"Row":""" Fetch a single result from the database. Repeated calls using the same inputs to *any* of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_cursors method is called. Args: operation: The SQL query or other operation to be executed. parameters: The parameters for the operation. **execution_options: Additional options to pass to `connection.execute`. Returns: A tuple containing the data returned by the database, where each row is a tuple and each column is a value in the tuple. Examples: Execute operation with parameters, fetching one new row at a time: ```python from prefect_gcp.bigquery import BigQueryWarehouse with BigQueryWarehouse.load("BLOCK_NAME") as warehouse: operation = ''' SELECT word, word_count FROM `bigquery-public-data.samples.shakespeare` WHERE corpus = %(corpus)s AND word_count >= %(min_word_count)s ORDER BY word_count DESC LIMIT 3; ''' parameters = { "corpus": "romeoandjuliet", "min_word_count": 250, } for _ in range(0, 3): result = warehouse.fetch_one(operation, parameters=parameters) print(result) ``` """inputs=dict(operation=operation,parameters=parameters,**execution_options,)new,cursor=self._get_cursor(inputs)ifnew:awaitrun_sync_in_worker_thread(cursor.execute,**inputs)result=awaitrun_sync_in_worker_thread(cursor.fetchone)returnresult@sync_compatibleasyncdeffetch_many(self,operation:str,parameters:Optional[Dict[str,Any]]=None,size:Optional[int]=None,**execution_options:Dict[str,Any],)->List["Row"]:""" Fetch a limited number of results from the database. Repeated calls using the same inputs to *any* of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_cursors method is called. Args: operation: The SQL query or other operation to be executed. parameters: The parameters for the operation. size: The number of results to return; if None or 0, uses the value of `fetch_size` configured on the block. **execution_options: Additional options to pass to `connection.execute`. Returns: A list of tuples containing the data returned by the database, where each row is a tuple and each column is a value in the tuple. Examples: Execute operation with parameters, fetching two new rows at a time: ```python from prefect_gcp.bigquery import BigQueryWarehouse with BigQueryWarehouse.load("BLOCK_NAME") as warehouse: operation = ''' SELECT word, word_count FROM `bigquery-public-data.samples.shakespeare` WHERE corpus = %(corpus)s AND word_count >= %(min_word_count)s ORDER BY word_count DESC LIMIT 6; ''' parameters = { "corpus": "romeoandjuliet", "min_word_count": 250, } for _ in range(0, 3): result = warehouse.fetch_many( operation, parameters=parameters, size=2 ) print(result) ``` """inputs=dict(operation=operation,parameters=parameters,**execution_options,)new,cursor=self._get_cursor(inputs)ifnew:awaitrun_sync_in_worker_thread(cursor.execute,**inputs)size=sizeorself.fetch_sizeresult=awaitrun_sync_in_worker_thread(cursor.fetchmany,size=size)returnresult@sync_compatibleasyncdeffetch_all(self,operation:str,parameters:Optional[Dict[str,Any]]=None,**execution_options:Dict[str,Any],)->List["Row"]:""" Fetch all results from the database. Repeated calls using the same inputs to *any* of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_cursors method is called. Args: operation: The SQL query or other operation to be executed. parameters: The parameters for the operation. **execution_options: Additional options to pass to `connection.execute`. Returns: A list of tuples containing the data returned by the database, where each row is a tuple and each column is a value in the tuple. Examples: Execute operation with parameters, fetching all rows: ```python from prefect_gcp.bigquery import BigQueryWarehouse with BigQueryWarehouse.load("BLOCK_NAME") as warehouse: operation = ''' SELECT word, word_count FROM `bigquery-public-data.samples.shakespeare` WHERE corpus = %(corpus)s AND word_count >= %(min_word_count)s ORDER BY word_count DESC LIMIT 3; ''' parameters = { "corpus": "romeoandjuliet", "min_word_count": 250, } result = warehouse.fetch_all(operation, parameters=parameters) ``` """inputs=dict(operation=operation,parameters=parameters,**execution_options,)new,cursor=self._get_cursor(inputs)ifnew:awaitrun_sync_in_worker_thread(cursor.execute,**inputs)result=awaitrun_sync_in_worker_thread(cursor.fetchall)returnresult@sync_compatibleasyncdefexecute(self,operation:str,parameters:Optional[Dict[str,Any]]=None,**execution_options:Dict[str,Any],)->None:""" Executes an operation on the database. This method is intended to be used for operations that do not return data, such as INSERT, UPDATE, or DELETE. Unlike the fetch methods, this method will always execute the operation upon calling. Args: operation: The SQL query or other operation to be executed. parameters: The parameters for the operation. **execution_options: Additional options to pass to `connection.execute`. Examples: Execute operation with parameters: ```python from prefect_gcp.bigquery import BigQueryWarehouse with BigQueryWarehouse.load("BLOCK_NAME") as warehouse: operation = ''' CREATE TABLE mydataset.trips AS ( SELECT bikeid, start_time, duration_minutes FROM bigquery-public-data.austin_bikeshare.bikeshare_trips LIMIT %(limit)s ); ''' warehouse.execute(operation, parameters={"limit": 5}) ``` """inputs=dict(operation=operation,parameters=parameters,**execution_options,)cursor=self._get_cursor(inputs)[1]awaitrun_sync_in_worker_thread(cursor.execute,**inputs)@sync_compatibleasyncdefexecute_many(self,operation:str,seq_of_parameters:List[Dict[str,Any]],)->None:""" Executes many operations on the database. This method is intended to be used for operations that do not return data, such as INSERT, UPDATE, or DELETE. Unlike the fetch methods, this method will always execute the operations upon calling. Args: operation: The SQL query or other operation to be executed. seq_of_parameters: The sequence of parameters for the operation. Examples: Create mytable in mydataset and insert two rows into it: ```python from prefect_gcp.bigquery import BigQueryWarehouse with BigQueryWarehouse.load("bigquery") as warehouse: create_operation = ''' CREATE TABLE IF NOT EXISTS mydataset.mytable ( col1 STRING, col2 INTEGER, col3 BOOLEAN ) ''' warehouse.execute(create_operation) insert_operation = ''' INSERT INTO mydataset.mytable (col1, col2, col3) VALUES (%s, %s, %s) ''' seq_of_parameters = [ ("a", 1, True), ("b", 2, False), ] warehouse.execute_many( insert_operation, seq_of_parameters=seq_of_parameters ) ``` """inputs=dict(operation=operation,seq_of_parameters=seq_of_parameters,)cursor=self._get_cursor(inputs)[1]awaitrun_sync_in_worker_thread(cursor.executemany,**inputs)defclose(self):""" Closes connection and its cursors. """try:self.reset_cursors()finally:ifself._connectionisnotNone:self._connection.close()self._connection=Nonedef__enter__(self):""" Start a connection upon entry. """returnselfdef__exit__(self,*args):""" Closes connection and its cursors upon exit. """self.close()def__getstate__(self):""" """data=self.__dict__.copy()data.update({k:Noneforkin{"_connection","_unique_cursors"}})returndatadef__setstate__(self,data:dict):""" """self.__dict__.update(data)self._unique_cursors={}self._start_connection()
defclose(self):""" Closes connection and its cursors. """try:self.reset_cursors()finally:ifself._connectionisnotNone:self._connection.close()self._connection=None
@sync_compatibleasyncdefexecute(self,operation:str,parameters:Optional[Dict[str,Any]]=None,**execution_options:Dict[str,Any],)->None:""" Executes an operation on the database. This method is intended to be used for operations that do not return data, such as INSERT, UPDATE, or DELETE. Unlike the fetch methods, this method will always execute the operation upon calling. Args: operation: The SQL query or other operation to be executed. parameters: The parameters for the operation. **execution_options: Additional options to pass to `connection.execute`. Examples: Execute operation with parameters: ```python from prefect_gcp.bigquery import BigQueryWarehouse with BigQueryWarehouse.load("BLOCK_NAME") as warehouse: operation = ''' CREATE TABLE mydataset.trips AS ( SELECT bikeid, start_time, duration_minutes FROM bigquery-public-data.austin_bikeshare.bikeshare_trips LIMIT %(limit)s ); ''' warehouse.execute(operation, parameters={"limit": 5}) ``` """inputs=dict(operation=operation,parameters=parameters,**execution_options,)cursor=self._get_cursor(inputs)[1]awaitrun_sync_in_worker_thread(cursor.execute,**inputs)
Executes many operations on the database. This method is intended to be used
for operations that do not return data, such as INSERT, UPDATE, or DELETE.
Unlike the fetch methods, this method will always execute the operations
upon calling.
Parameters:
Name
Type
Description
Default
operation
str
The SQL query or other operation to be executed.
required
seq_of_parameters
List[Dict[str, Any]]
The sequence of parameters for the operation.
required
Examples:
Create mytable in mydataset and insert two rows into it:
fromprefect_gcp.bigqueryimportBigQueryWarehousewithBigQueryWarehouse.load("bigquery")aswarehouse:create_operation=''' CREATE TABLE IF NOT EXISTS mydataset.mytable ( col1 STRING, col2 INTEGER, col3 BOOLEAN ) '''warehouse.execute(create_operation)insert_operation=''' INSERT INTO mydataset.mytable (col1, col2, col3) VALUES (%s, %s, %s) '''seq_of_parameters=[("a",1,True),("b",2,False),]warehouse.execute_many(insert_operation,seq_of_parameters=seq_of_parameters)
@sync_compatibleasyncdefexecute_many(self,operation:str,seq_of_parameters:List[Dict[str,Any]],)->None:""" Executes many operations on the database. This method is intended to be used for operations that do not return data, such as INSERT, UPDATE, or DELETE. Unlike the fetch methods, this method will always execute the operations upon calling. Args: operation: The SQL query or other operation to be executed. seq_of_parameters: The sequence of parameters for the operation. Examples: Create mytable in mydataset and insert two rows into it: ```python from prefect_gcp.bigquery import BigQueryWarehouse with BigQueryWarehouse.load("bigquery") as warehouse: create_operation = ''' CREATE TABLE IF NOT EXISTS mydataset.mytable ( col1 STRING, col2 INTEGER, col3 BOOLEAN ) ''' warehouse.execute(create_operation) insert_operation = ''' INSERT INTO mydataset.mytable (col1, col2, col3) VALUES (%s, %s, %s) ''' seq_of_parameters = [ ("a", 1, True), ("b", 2, False), ] warehouse.execute_many( insert_operation, seq_of_parameters=seq_of_parameters ) ``` """inputs=dict(operation=operation,seq_of_parameters=seq_of_parameters,)cursor=self._get_cursor(inputs)[1]awaitrun_sync_in_worker_thread(cursor.executemany,**inputs)
Repeated calls using the same inputs to any of the fetch methods of this
block will skip executing the operation again, and instead,
return the next set of results from the previous execution,
until the reset_cursors method is called.
Parameters:
Name
Type
Description
Default
operation
str
The SQL query or other operation to be executed.
required
parameters
Optional[Dict[str, Any]]
The parameters for the operation.
None
**execution_options
Dict[str, Any]
Additional options to pass to connection.execute.
{}
Returns:
Type
Description
List[Row]
A list of tuples containing the data returned by the database,
where each row is a tuple and each column is a value in the tuple.
Examples:
Execute operation with parameters, fetching all rows:
fromprefect_gcp.bigqueryimportBigQueryWarehousewithBigQueryWarehouse.load("BLOCK_NAME")aswarehouse:operation=''' SELECT word, word_count FROM `bigquery-public-data.samples.shakespeare` WHERE corpus = %(corpus)s AND word_count >= %(min_word_count)s ORDER BY word_count DESC LIMIT 3; '''parameters={"corpus":"romeoandjuliet","min_word_count":250,}result=warehouse.fetch_all(operation,parameters=parameters)
@sync_compatibleasyncdeffetch_all(self,operation:str,parameters:Optional[Dict[str,Any]]=None,**execution_options:Dict[str,Any],)->List["Row"]:""" Fetch all results from the database. Repeated calls using the same inputs to *any* of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_cursors method is called. Args: operation: The SQL query or other operation to be executed. parameters: The parameters for the operation. **execution_options: Additional options to pass to `connection.execute`. Returns: A list of tuples containing the data returned by the database, where each row is a tuple and each column is a value in the tuple. Examples: Execute operation with parameters, fetching all rows: ```python from prefect_gcp.bigquery import BigQueryWarehouse with BigQueryWarehouse.load("BLOCK_NAME") as warehouse: operation = ''' SELECT word, word_count FROM `bigquery-public-data.samples.shakespeare` WHERE corpus = %(corpus)s AND word_count >= %(min_word_count)s ORDER BY word_count DESC LIMIT 3; ''' parameters = { "corpus": "romeoandjuliet", "min_word_count": 250, } result = warehouse.fetch_all(operation, parameters=parameters) ``` """inputs=dict(operation=operation,parameters=parameters,**execution_options,)new,cursor=self._get_cursor(inputs)ifnew:awaitrun_sync_in_worker_thread(cursor.execute,**inputs)result=awaitrun_sync_in_worker_thread(cursor.fetchall)returnresult
Fetch a limited number of results from the database.
Repeated calls using the same inputs to any of the fetch methods of this
block will skip executing the operation again, and instead,
return the next set of results from the previous execution,
until the reset_cursors method is called.
Parameters:
Name
Type
Description
Default
operation
str
The SQL query or other operation to be executed.
required
parameters
Optional[Dict[str, Any]]
The parameters for the operation.
None
size
Optional[int]
The number of results to return; if None or 0, uses the value of
fetch_size configured on the block.
None
**execution_options
Dict[str, Any]
Additional options to pass to connection.execute.
{}
Returns:
Type
Description
List[Row]
A list of tuples containing the data returned by the database,
where each row is a tuple and each column is a value in the tuple.
Examples:
Execute operation with parameters, fetching two new rows at a time:
fromprefect_gcp.bigqueryimportBigQueryWarehousewithBigQueryWarehouse.load("BLOCK_NAME")aswarehouse:operation=''' SELECT word, word_count FROM `bigquery-public-data.samples.shakespeare` WHERE corpus = %(corpus)s AND word_count >= %(min_word_count)s ORDER BY word_count DESC LIMIT 6; '''parameters={"corpus":"romeoandjuliet","min_word_count":250,}for_inrange(0,3):result=warehouse.fetch_many(operation,parameters=parameters,size=2)print(result)
@sync_compatibleasyncdeffetch_many(self,operation:str,parameters:Optional[Dict[str,Any]]=None,size:Optional[int]=None,**execution_options:Dict[str,Any],)->List["Row"]:""" Fetch a limited number of results from the database. Repeated calls using the same inputs to *any* of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_cursors method is called. Args: operation: The SQL query or other operation to be executed. parameters: The parameters for the operation. size: The number of results to return; if None or 0, uses the value of `fetch_size` configured on the block. **execution_options: Additional options to pass to `connection.execute`. Returns: A list of tuples containing the data returned by the database, where each row is a tuple and each column is a value in the tuple. Examples: Execute operation with parameters, fetching two new rows at a time: ```python from prefect_gcp.bigquery import BigQueryWarehouse with BigQueryWarehouse.load("BLOCK_NAME") as warehouse: operation = ''' SELECT word, word_count FROM `bigquery-public-data.samples.shakespeare` WHERE corpus = %(corpus)s AND word_count >= %(min_word_count)s ORDER BY word_count DESC LIMIT 6; ''' parameters = { "corpus": "romeoandjuliet", "min_word_count": 250, } for _ in range(0, 3): result = warehouse.fetch_many( operation, parameters=parameters, size=2 ) print(result) ``` """inputs=dict(operation=operation,parameters=parameters,**execution_options,)new,cursor=self._get_cursor(inputs)ifnew:awaitrun_sync_in_worker_thread(cursor.execute,**inputs)size=sizeorself.fetch_sizeresult=awaitrun_sync_in_worker_thread(cursor.fetchmany,size=size)returnresult
Repeated calls using the same inputs to any of the fetch methods of this
block will skip executing the operation again, and instead,
return the next set of results from the previous execution,
until the reset_cursors method is called.
Parameters:
Name
Type
Description
Default
operation
str
The SQL query or other operation to be executed.
required
parameters
Optional[Dict[str, Any]]
The parameters for the operation.
None
**execution_options
Dict[str, Any]
Additional options to pass to connection.execute.
{}
Returns:
Type
Description
Row
A tuple containing the data returned by the database,
where each row is a tuple and each column is a value in the tuple.
Examples:
Execute operation with parameters, fetching one new row at a time:
fromprefect_gcp.bigqueryimportBigQueryWarehousewithBigQueryWarehouse.load("BLOCK_NAME")aswarehouse:operation=''' SELECT word, word_count FROM `bigquery-public-data.samples.shakespeare` WHERE corpus = %(corpus)s AND word_count >= %(min_word_count)s ORDER BY word_count DESC LIMIT 3; '''parameters={"corpus":"romeoandjuliet","min_word_count":250,}for_inrange(0,3):result=warehouse.fetch_one(operation,parameters=parameters)print(result)
@sync_compatibleasyncdeffetch_one(self,operation:str,parameters:Optional[Dict[str,Any]]=None,**execution_options:Dict[str,Any],)->"Row":""" Fetch a single result from the database. Repeated calls using the same inputs to *any* of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_cursors method is called. Args: operation: The SQL query or other operation to be executed. parameters: The parameters for the operation. **execution_options: Additional options to pass to `connection.execute`. Returns: A tuple containing the data returned by the database, where each row is a tuple and each column is a value in the tuple. Examples: Execute operation with parameters, fetching one new row at a time: ```python from prefect_gcp.bigquery import BigQueryWarehouse with BigQueryWarehouse.load("BLOCK_NAME") as warehouse: operation = ''' SELECT word, word_count FROM `bigquery-public-data.samples.shakespeare` WHERE corpus = %(corpus)s AND word_count >= %(min_word_count)s ORDER BY word_count DESC LIMIT 3; ''' parameters = { "corpus": "romeoandjuliet", "min_word_count": 250, } for _ in range(0, 3): result = warehouse.fetch_one(operation, parameters=parameters) print(result) ``` """inputs=dict(operation=operation,parameters=parameters,**execution_options,)new,cursor=self._get_cursor(inputs)ifnew:awaitrun_sync_in_worker_thread(cursor.execute,**inputs)result=awaitrun_sync_in_worker_thread(cursor.fetchone)returnresult
defreset_cursors(self)->None:""" Tries to close all opened cursors. """input_hashes=tuple(self._unique_cursors.keys())forinput_hashininput_hashes:cursor=self._unique_cursors.pop(input_hash)try:cursor.close()exceptExceptionasexc:self.logger.warning(f"Failed to close cursor for input hash {input_hash!r}: {exc}")
Creates table in BigQuery.
Args:
dataset: Name of a dataset in that the table will be created.
table: Name of a table to create.
schema: Schema to use when creating the table.
gcp_credentials: Credentials to use for authentication with GCP.
clustering_fields: List of fields to cluster the table by.
time_partitioning: bigquery.TimePartitioning object specifying a partitioning
of the newly created table
project: Project to initialize the BigQuery Client with; if
not provided, will default to the one inferred from your credentials.
location: The location of the dataset that will be written to.
external_config: The external data source. # noqa
Returns:
Table name.
Example:
@taskasyncdefbigquery_create_table(dataset:str,table:str,gcp_credentials:GcpCredentials,schema:Optional[List["SchemaField"]]=None,clustering_fields:List[str]=None,time_partitioning:"TimePartitioning"=None,project:Optional[str]=None,location:str="US",external_config:Optional["ExternalConfig"]=None,)->str:""" Creates table in BigQuery. Args: dataset: Name of a dataset in that the table will be created. table: Name of a table to create. schema: Schema to use when creating the table. gcp_credentials: Credentials to use for authentication with GCP. clustering_fields: List of fields to cluster the table by. time_partitioning: `bigquery.TimePartitioning` object specifying a partitioning of the newly created table project: Project to initialize the BigQuery Client with; if not provided, will default to the one inferred from your credentials. location: The location of the dataset that will be written to. external_config: The [external data source](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/bigquery_table#nested_external_data_configuration). # noqa Returns: Table name. Example: ```python from prefect import flow from prefect_gcp import GcpCredentials from prefect_gcp.bigquery import bigquery_create_table from google.cloud.bigquery import SchemaField @flow def example_bigquery_create_table_flow(): gcp_credentials = GcpCredentials(project="project") schema = [ SchemaField("number", field_type="INTEGER", mode="REQUIRED"), SchemaField("text", field_type="STRING", mode="REQUIRED"), SchemaField("bool", field_type="BOOLEAN") ] result = bigquery_create_table( dataset="dataset", table="test_table", schema=schema, gcp_credentials=gcp_credentials ) return result example_bigquery_create_table_flow() ``` """logger=get_run_logger()logger.info("Creating %s.%s",dataset,table)ifnotexternal_configandnotschema:raiseValueError("Either a schema or an external config must be provided.")client=gcp_credentials.get_bigquery_client(project=project,location=location)try:partial_get_dataset=partial(client.get_dataset,dataset)dataset_ref=awaitto_thread.run_sync(partial_get_dataset)exceptNotFound:logger.debug("Dataset %s not found, creating",dataset)partial_create_dataset=partial(client.create_dataset,dataset)dataset_ref=awaitto_thread.run_sync(partial_create_dataset)table_ref=dataset_ref.table(table)try:partial_get_table=partial(client.get_table,table_ref)awaitto_thread.run_sync(partial_get_table)logger.info("%s.%s already exists",dataset,table)exceptNotFound:logger.debug("Table %s not found, creating",table)table_obj=Table(table_ref,schema=schema)# external data configurationifexternal_config:table_obj.external_data_configuration=external_config# cluster for optimal data sorting/accessifclustering_fields:table_obj.clustering_fields=clustering_fields# partitioningiftime_partitioning:table_obj.time_partitioning=time_partitioningpartial_create_table=partial(client.create_table,table_obj)awaitto_thread.run_sync(partial_create_table)returntable
Insert records in a Google BigQuery table via the streaming
API.
Parameters:
Name
Type
Description
Default
dataset
str
Name of a dataset where the records will be written to.
required
table
str
Name of a table to write to.
required
records
List[dict]
The list of records to insert as rows into the BigQuery table;
each item in the list should be a dictionary whose keys correspond to
columns in the table.
@taskasyncdefbigquery_insert_stream(dataset:str,table:str,records:List[dict],gcp_credentials:GcpCredentials,project:Optional[str]=None,location:str="US",)->List:""" Insert records in a Google BigQuery table via the [streaming API](https://cloud.google.com/bigquery/streaming-data-into-bigquery). Args: dataset: Name of a dataset where the records will be written to. table: Name of a table to write to. records: The list of records to insert as rows into the BigQuery table; each item in the list should be a dictionary whose keys correspond to columns in the table. gcp_credentials: Credentials to use for authentication with GCP. project: The project to initialize the BigQuery Client with; if not provided, will default to the one inferred from your credentials. location: Location of the dataset that will be written to. Returns: List of inserted rows. Example: ```python from prefect import flow from prefect_gcp import GcpCredentials from prefect_gcp.bigquery import bigquery_insert_stream from google.cloud.bigquery import SchemaField @flow def example_bigquery_insert_stream_flow(): gcp_credentials = GcpCredentials(project="project") records = [ {"number": 1, "text": "abc", "bool": True}, {"number": 2, "text": "def", "bool": False}, ] result = bigquery_insert_stream( dataset="integrations", table="test_table", records=records, gcp_credentials=gcp_credentials ) return result example_bigquery_insert_stream_flow() ``` """logger=get_run_logger()logger.info("Inserting into %s.%s as a stream",dataset,table)client=gcp_credentials.get_bigquery_client(project=project,location=location)table_ref=client.dataset(dataset).table(table)partial_insert=partial(client.insert_rows_json,table=table_ref,json_rows=records)response=awaitto_thread.run_sync(partial_insert)errors=[]output=[]forrowinresponse:output.append(row)if"errors"inrow:errors.append(row["errors"])iferrors:raiseValueError(errors)returnoutput
Run method for this Task. Invoked by calling this
Task within a Flow context, after initialization.
Args:
uri: GCS path to load data from.
dataset: The id of a destination dataset to write the records to.
table: The name of a destination table to write the records to.
gcp_credentials: Credentials to use for authentication with GCP.
schema: The schema to use when creating the table.
job_config: Dictionary of job configuration parameters;
note that the parameters provided here must be pickleable
(e.g., dataset references will be rejected).
project: The project to initialize the BigQuery Client with; if
not provided, will default to the one inferred from your credentials.
location: Location of the dataset that will be written to.
@taskasyncdefbigquery_load_cloud_storage(dataset:str,table:str,uri:str,gcp_credentials:GcpCredentials,schema:Optional[List["SchemaField"]]=None,job_config:Optional[dict]=None,project:Optional[str]=None,location:str="US",)->"LoadJob":""" Run method for this Task. Invoked by _calling_ this Task within a Flow context, after initialization. Args: uri: GCS path to load data from. dataset: The id of a destination dataset to write the records to. table: The name of a destination table to write the records to. gcp_credentials: Credentials to use for authentication with GCP. schema: The schema to use when creating the table. job_config: Dictionary of job configuration parameters; note that the parameters provided here must be pickleable (e.g., dataset references will be rejected). project: The project to initialize the BigQuery Client with; if not provided, will default to the one inferred from your credentials. location: Location of the dataset that will be written to. Returns: The response from `load_table_from_uri`. Example: ```python from prefect import flow from prefect_gcp import GcpCredentials from prefect_gcp.bigquery import bigquery_load_cloud_storage @flow def example_bigquery_load_cloud_storage_flow(): gcp_credentials = GcpCredentials(project="project") result = bigquery_load_cloud_storage( dataset="dataset", table="test_table", uri="uri", gcp_credentials=gcp_credentials ) return result example_bigquery_load_cloud_storage_flow() ``` """logger=get_run_logger()logger.info("Loading into %s.%s from cloud storage",dataset,table)client=gcp_credentials.get_bigquery_client(project=project,location=location)table_ref=client.dataset(dataset).table(table)job_config=job_configor{}if"autodetect"notinjob_config:job_config["autodetect"]=Truejob_config=LoadJobConfig(**job_config)ifschema:job_config.schema=schemaresult=Nonetry:partial_load=partial(_result_sync,client.load_table_from_uri,uri,table_ref,job_config=job_config,)result=awaitto_thread.run_sync(partial_load)exceptExceptionasexception:logger.exception(exception)ifresultisnotNoneandresult.errorsisnotNone:forerrorinresult.errors:logger.exception(error)raiseifresultisnotNone:# remove unpickleable attributesresult._client=Noneresult._completion_lock=Nonereturnresult
An optional dictionary of job configuration parameters;
note that the parameters provided here must be pickleable
(e.g., dataset references will be rejected).
None
rewind
bool
if True, seek to the beginning of the file handle
before reading the file.
False
size
Optional[int]
Number of bytes to read from the file handle. If size is None or large,
resumable upload will be used. Otherwise, multipart upload will be used.
None
project
Optional[str]
Project to initialize the BigQuery Client with; if
not provided, will default to the one inferred from your credentials.
@taskasyncdefbigquery_load_file(dataset:str,table:str,path:Union[str,Path],gcp_credentials:GcpCredentials,schema:Optional[List["SchemaField"]]=None,job_config:Optional[dict]=None,rewind:bool=False,size:Optional[int]=None,project:Optional[str]=None,location:str="US",)->"LoadJob":""" Loads file into BigQuery. Args: dataset: ID of a destination dataset to write the records to; if not provided here, will default to the one provided at initialization. table: Name of a destination table to write the records to; if not provided here, will default to the one provided at initialization. path: A string or path-like object of the file to be loaded. gcp_credentials: Credentials to use for authentication with GCP. schema: Schema to use when creating the table. job_config: An optional dictionary of job configuration parameters; note that the parameters provided here must be pickleable (e.g., dataset references will be rejected). rewind: if True, seek to the beginning of the file handle before reading the file. size: Number of bytes to read from the file handle. If size is None or large, resumable upload will be used. Otherwise, multipart upload will be used. project: Project to initialize the BigQuery Client with; if not provided, will default to the one inferred from your credentials. location: location of the dataset that will be written to. Returns: The response from `load_table_from_file`. Example: ```python from prefect import flow from prefect_gcp import GcpCredentials from prefect_gcp.bigquery import bigquery_load_file from google.cloud.bigquery import SchemaField @flow def example_bigquery_load_file_flow(): gcp_credentials = GcpCredentials(project="project") result = bigquery_load_file( dataset="dataset", table="test_table", path="path", gcp_credentials=gcp_credentials ) return result example_bigquery_load_file_flow() ``` """logger=get_run_logger()logger.info("Loading into %s.%s from file",dataset,table)ifnotos.path.exists(path):raiseValueError(f"{path} does not exist")elifnotos.path.isfile(path):raiseValueError(f"{path} is not a file")client=gcp_credentials.get_bigquery_client(project=project)table_ref=client.dataset(dataset).table(table)job_config=job_configor{}if"autodetect"notinjob_config:job_config["autodetect"]=True# TODO: test if autodetect is needed when schema is passedjob_config=LoadJobConfig(**job_config)ifschema:# TODO: test if schema can be passed directly in job_configjob_config.schema=schematry:withopen(path,"rb")asfile_obj:partial_load=partial(_result_sync,client.load_table_from_file,file_obj,table_ref,rewind=rewind,size=size,location=location,job_config=job_config,)result=awaitto_thread.run_sync(partial_load)exceptIOError:logger.exception(f"Could not open and read from {path}")raiseifresultisnotNone:# remove unpickleable attributesresult._client=Noneresult._completion_lock=Nonereturnresult
List of 3-tuples specifying BigQuery query parameters; currently
only scalar query parameters are supported. See the
Google documentation
for more details on how both the query and the query parameters should be formatted.
None
dry_run_max_bytes
Optional[int]
If provided, the maximum number of bytes the query
is allowed to process; this will be determined by executing a dry run
and raising a ValueError if the maximum is exceeded.
None
dataset
Optional[str]
Name of a destination dataset to write the query results to,
if you don't want them returned; if provided, table must also be provided.
None
table
Optional[str]
Name of a destination table to write the query results to,
if you don't want them returned; if provided, dataset must also be provided.
None
to_dataframe
bool
If provided, returns the results of the query as a pandas
dataframe instead of a list of bigquery.table.Row objects.
False
job_config
Optional[dict]
Dictionary of job configuration parameters;
note that the parameters provided here must be pickleable
(e.g., dataset references will be rejected).
None
project
Optional[str]
The project to initialize the BigQuery Client with; if not
provided, will default to the one inferred from your credentials.
None
result_transformer
Optional[Callable[[List[Row]], Any]]
Function that can be passed to transform the result of a query before returning. The function will be passed the list of rows returned by BigQuery for the given query.
None
location
str
Location of the dataset that will be queried.
'US'
Returns:
Type
Description
Any
A list of rows, or pandas DataFrame if to_dataframe,
Any
matching the query criteria.
Example
Queries the public names database, returning 10 results.
fromprefectimportflowfromprefect_gcpimportGcpCredentialsfromprefect_gcp.bigqueryimportbigquery_query@flowdefexample_bigquery_query_flow():gcp_credentials=GcpCredentials(service_account_file="/path/to/service/account/keyfile.json",project="project")query=''' SELECT word, word_count FROM `bigquery-public-data.samples.shakespeare` WHERE corpus = @corpus AND word_count >= @min_word_count ORDER BY word_count DESC; '''query_params=[("corpus","STRING","romeoandjuliet"),("min_word_count","INT64",250)]result=bigquery_query(query,gcp_credentials,query_params=query_params)returnresultexample_bigquery_query_flow()
@taskasyncdefbigquery_query(query:str,gcp_credentials:GcpCredentials,query_params:Optional[List[tuple]]=None,# 3-tuplesdry_run_max_bytes:Optional[int]=None,dataset:Optional[str]=None,table:Optional[str]=None,to_dataframe:bool=False,job_config:Optional[dict]=None,project:Optional[str]=None,result_transformer:Optional[Callable[[List["Row"]],Any]]=None,location:str="US",)->Any:""" Runs a BigQuery query. Args: query: String of the query to execute. gcp_credentials: Credentials to use for authentication with GCP. query_params: List of 3-tuples specifying BigQuery query parameters; currently only scalar query parameters are supported. See the [Google documentation](https://cloud.google.com/bigquery/docs/parameterized-queries#bigquery-query-params-python) for more details on how both the query and the query parameters should be formatted. dry_run_max_bytes: If provided, the maximum number of bytes the query is allowed to process; this will be determined by executing a dry run and raising a `ValueError` if the maximum is exceeded. dataset: Name of a destination dataset to write the query results to, if you don't want them returned; if provided, `table` must also be provided. table: Name of a destination table to write the query results to, if you don't want them returned; if provided, `dataset` must also be provided. to_dataframe: If provided, returns the results of the query as a pandas dataframe instead of a list of `bigquery.table.Row` objects. job_config: Dictionary of job configuration parameters; note that the parameters provided here must be pickleable (e.g., dataset references will be rejected). project: The project to initialize the BigQuery Client with; if not provided, will default to the one inferred from your credentials. result_transformer: Function that can be passed to transform the result of a query before returning. The function will be passed the list of rows returned by BigQuery for the given query. location: Location of the dataset that will be queried. Returns: A list of rows, or pandas DataFrame if to_dataframe, matching the query criteria. Example: Queries the public names database, returning 10 results. ```python from prefect import flow from prefect_gcp import GcpCredentials from prefect_gcp.bigquery import bigquery_query @flow def example_bigquery_query_flow(): gcp_credentials = GcpCredentials( service_account_file="/path/to/service/account/keyfile.json", project="project" ) query = ''' SELECT word, word_count FROM `bigquery-public-data.samples.shakespeare` WHERE corpus = @corpus AND word_count >= @min_word_count ORDER BY word_count DESC; ''' query_params = [ ("corpus", "STRING", "romeoandjuliet"), ("min_word_count", "INT64", 250) ] result = bigquery_query( query, gcp_credentials, query_params=query_params ) return result example_bigquery_query_flow() ``` """# noqalogger=get_run_logger()logger.info("Running BigQuery query")client=gcp_credentials.get_bigquery_client(project=project,location=location)# setup job configjob_config=QueryJobConfig(**job_configor{})ifquery_paramsisnotNone:job_config.query_parameters=[ScalarQueryParameter(*qp)forqpinquery_params]# perform dry_run if requestedifdry_run_max_bytesisnotNone:saved_info=dict(dry_run=job_config.dry_run,use_query_cache=job_config.use_query_cache)job_config.dry_run=Truejob_config.use_query_cache=Falsepartial_query=partial(client.query,query,job_config=job_config)response=awaitto_thread.run_sync(partial_query)total_bytes_processed=response.total_bytes_processediftotal_bytes_processed>dry_run_max_bytes:raiseRuntimeError(f"Query will process {total_bytes_processed} bytes which is above "f"the set maximum of {dry_run_max_bytes} for this task.")job_config.dry_run=saved_info["dry_run"]job_config.use_query_cache=saved_info["use_query_cache"]# if writing to a destination tableifdatasetisnotNone:table_ref=client.dataset(dataset).table(table)job_config.destination=table_refpartial_query=partial(_result_sync,client.query,query,job_config=job_config,)result=awaitto_thread.run_sync(partial_query)ifto_dataframe:returnresult.to_dataframe()else:ifresult_transformer:returnresult_transformer(result)else:returnlist(result)