The BigQuery sink connector can be configured using a variety of configuration properties.
defaultDataset
The default dataset to be used
- Type: String
- Importance: high
Note
defaultDataset
replaced the datasets
parameter of older versions of this connector.
project
The BigQuery project to write to.
- Type: string
- Importance: high
topics
A list of Kafka topics to read from.
- Type: list
- Importance: high
autoCreateTables
Automatically create BigQuery tables if they don’t already exist.
- Type: boolean
- Default: false
- Importance: high
gcsBucketName
The name of the bucket where Google Cloud Storage (GCS) blobs are located. These blobs are used to batch-load to BigQuery. This is applicable only if enableBatchLoad
is configured.
- Type: string
- Default: “”
- Importance: high
queueSize
The maximum size (or -1 for no maximum size) of the worker queue for BigQuery write requests before all topics are paused. This is a soft limit; the size of the queue can go over this before topics are paused. All topics resume once a flush is triggered or the size of the queue drops under half of the maximum size.
- Type: long
- Default: -1
- Valid Values: [-1,…]
- Importance: high
bigQueryRetry
The number of retry attempts made for a BigQuery request that fails with a backend error or a quota exceeded error.
- Type: int
- Default: 0
- Valid Values: [0,…]
- Importance: medium
bigQueryRetryWait
The minimum amount of time, in milliseconds, to wait between retry attempts for a BigQuery backend or quota exceeded error.
- Type: long
- Default: 1000
- Valid Values: [0,…]
- Importance: medium
bigQueryMessageTimePartitioning
Whether or not to use the message time when inserting records. Default uses the connector processing time.
- Type: boolean
- Default: false
- Importance: high
bigQueryPartitionDecorator
Whether or not to append partition decorator to BigQuery table name when inserting records. Default is true. Setting this to true appends partition decorator to table name (e.g. table$yyyyMMdd depending on the configuration set for bigQueryPartitionDecorator). Setting this to false bypasses the logic to append the partition decorator and uses raw table name for inserts.
- Type: boolean
- Default: true
- Importance: high
timestampPartitionFieldName
The name of the field in the value that contains the timestamp to partition by in BigQuery and enable timestamp partitioning for each table. Leave this configuration blank, to enable ingestion time partitioning for each table.
- Type: string
- Default: null
- Importance: low
keySource
Determines whether the keyfile configuration is the path to the credentials JSON file or to the JSON itself. Available values are FILE
or JSON
This property is available in BigQuery sink connector version 1.3 (and above).
- Type: string
- Default: FILE
- Importance: medium
keyfile
keyfile
can be either a string representation of the Google credentials file or the path to the Google credentials file itself. The string representation of the Google credentials file is supported in BigQuery sink connector version 1.3 (and later).
- Type: string
- Default: null
- Importance: medium
sanitizeTopics
Designates whether to automatically sanitize topic names before using them as table names. If not enabled, topic names are used as table names.
- Type: boolean
- Default: false
- Importance: medium
schemaRetriever
A class that can be used for automatically creating tables and/or updating schemas.
- Type: class
- Default:
com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
- Importance: medium
Note
In 2.0.0, SchemaRetriever API changed to retrieve the schema from each SinkRecord, which will help support multiple schemas per topic.
SchemaRegistrySchemaRetriever
has been removed as it retrieves schema based on the topic.
threadPoolSize
The size of the BigQuery write thread pool. This establishes the maximum number of concurrent writes to BigQuery.
- Type: int
- Default: 10
- Valid Values: [1,…]
- Importance: medium
allBQFieldsNullable
If true, no fields in any produced BigQuery schema are REQUIRED. All non-nullable Avro fields are translated as NULLABLE
(or REPEATED
, if arrays).
- Type: boolean
- Default: false
- Importance: low
avroDataCacheSize
The size of the cache to use when converting schemas from Avro to Kafka Connect.
- Type: int
- Default: 100
- Valid Values: [0,…]
- Importance: low
batchLoadIntervalSec
The interval, in seconds, in which to attempt to run GCS to BigQuery load jobs. Only relevant if enableBatchLoad
is configured.
- Type: int
- Default: 120
- Importance: low
convertDoubleSpecialValues
Designates whether +Infinity is converted to Double.MAX_VALUE and whether -Infinity and NaN are converted to Double.MIN_VALUE to ensure successfull delivery to BigQuery.
- Type: boolean
- Default: false
- Importance: low
enableBatchLoad
Beta Feature Use with caution. The sublist of topics to be batch loaded through GCS.
- Type: list
- Default: “”
- Importance: low
includeKafkaData
Whether to include an extra block containing the Kafka source topic, offset,
and partition information in the resulting BigQuery rows.
- Type: boolean
- Default: false
- Importance: low
mergeRecordsThreshold
How many records to write to an intermediate table before performing a merge
flush, if upsert/delete is enabled. Can be set to -1
to disable record
count-based flushing.
- Type: long
- Default: -1
- Importance: low
autoCreateBucket
Whether to automatically create the given bucket, if it does not exist.
- Type: boolean
- Default: true
- Importance: medium
allowNewBigQueryFields
If true, new fields can be added to BigQuery tables during subsequent schema updates.
- Type: boolean
- Default: false
- Importance: medium
allowBigQueryRequiredFieldRelaxation
If true, fields in BigQuery Schema can be changed from REQUIRED
to NULLABLE
.
- Type: boolean
- Default: false
- Importance: medium
Note
allowNewBigQueryFields
and allowBigQueryRequiredFieldRelaxation
replaced the autoUpdateSchemas
parameter of older versions of this
connector.
allowSchemaUnionization
If true, the existing table schema (if one is present) will be unionized with
new record schemas during schema updates. If false, the record of the last
schema in a batch will be used for any necessary table creation and schema
update attempts.
- Type: boolean
- Default: false
- Importance: medium
Note
Setting allowSchemaUnionization
to false
and
allowNewBigQueryFields
and allowBigQueryRequiredFieldRelaxation
to
true
is equivalent to setting autoUpdateSchemas
to true
in
older (pre-2.0.0) versions of this connector.
In this case, if BigQuery raises a schema validation exception or a table
doesn’t exist when a writing a batch, the connector will try to remediate
by required field relaxation and/or adding new fields.
If allowSchemaUnionization
, allowNewBigQueryFields
, and
allowBigQueryRequiredFieldRelaxation
are true
, the connector will
create or update tables with a schema whose fields are a union of the
existing table schema’s fields and the ones present in all of the records
of the current batch.
The key difference is that with unionization disabled, new record schemas
have to be a superset of the table schema in BigQuery.
In general when enabled, allowSchemaUnionization
is useful to make things
work. For instance, if you’d like to remove fields from data upstream, the
updated schemas still work in the connector. Similarly it is useful when
different tasks see records whose schemas contain different fields that are
not in the table. However note with caution that if
allowSchemaUnionization
is set and some bad records are in the topic, the
BigQuery schema may be permanently changed. This presents two issues: first,
since BigQuery doesn’t allow columns to be dropped from tables, they’ll add
unnecessary noise to the schema. Second, since BigQuery doesn’t allow column
types to be modified, they could completely break pipelines down the road
where well-behaved records have schemas whose field names overlap with the
accidentally-added columns in the table, but use a different type.