S3Config

class kafkaconnect.s3_sink.config.S3Config(name: str = 's3-sink', connector_class: str = 'io.confluent.connect.s3.S3SinkConnector', topics: str = '', tasks_max: int = 1, format_class: str = 'io.confluent.connect.s3.format.parquet.ParquetFormat', parquet_codec: str = 'snappy', schema_compatibility: str = 'NONE', s3_bucket_name: str = '', s3_region: str = 'us-east-1', aws_access_key_id: str = '', aws_secret_access_key: str = '', topics_dir: str = 'topics', storage_class: str = 'io.confluent.connect.s3.storage.S3Storage', flush_size: int = 3600, rotate_interval_ms: int = 600000, partitioner_class: str = 'io.confluent.connect.storage.partitioner.TimeBasedPartitioner', partition_duration_ms: int = 3600000, path_format: str = "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH", locale: str = 'en-US', timezone: str = 'UTC', timestamp_extractor: str = 'Record', timestamp_field: str = 'time')

Bases: kafkaconnect.config.ConnectConfig

S3 Sink connector configuration

Attributes Summary

aws_access_key_id The AWS access key ID used to authenticate personal AWS credentials.
aws_secret_access_key The secret access key used to authenticate personal AWS credentials.
connector_class S3 Sink connector class
flush_size Number of records written to store before invoking file commits.
format_class The format class to use when writing data to the store.
locale The locale to use when partitioning with TimeBasedPartitioner.
name Name of the connector.
parquet_codec The Parquet compression codec to be used for output files.
partition_duration_ms The duration of a partition in ms, used by the TimeBasedPartitioner.
partitioner_class The partitioner to use when writing data to the store.
path_format Pattern used to format the path in the S3 object name.
rotate_interval_ms The time interval in milliseconds to invoke file commits.
s3_bucket_name The S3 Bucket.
s3_region The AWS region to be used the connector.
schema_compatibility The schema compatibility rule.
storage_class The underlying storage layer.
tasks_max
timestamp_extractor The extractor determines how to obtain a timestamp from each record.
timestamp_field The record field to be used as timestamp by the timestamp extractor.
timezone The timezone to use when partitioning with TimeBasedPartitioner.
topics
topics_dir Top level directory to store the data ingested from Kafka.

Methods Summary

asjson()
format_field_names(fields, Any]]) Dictionary factory to use with the dataclasses.asdict() method.
update_topics(topics) Update the list of Kafka topics.

Attributes Documentation

aws_access_key_id = ''

The AWS access key ID used to authenticate personal AWS credentials.

aws_secret_access_key = ''

The secret access key used to authenticate personal AWS credentials.

connector_class = 'io.confluent.connect.s3.S3SinkConnector'

S3 Sink connector class

flush_size = 3600

Number of records written to store before invoking file commits.

By default this is set to 6 times the number of records expected for an output stream of 1Hz within the default rotate_interval_ms value. This way the rotate_interval_ms configuration takes precedence over the flush_size configuration. But flush_size still works as a maximum limit to invoke file commits when the connect-s3-sink consumer accumulates 3600 records.

format_class = 'io.confluent.connect.s3.format.parquet.ParquetFormat'

The format class to use when writing data to the store.

locale = 'en-US'

The locale to use when partitioning with TimeBasedPartitioner.

name = 's3-sink'

Name of the connector.

The connector name must be unique accross the cluster.

parquet_codec = 'snappy'

The Parquet compression codec to be used for output files.

partition_duration_ms = 3600000

The duration of a partition in ms, used by the TimeBasedPartitioner.

The default value is for an hourly partitioner.

partitioner_class = 'io.confluent.connect.storage.partitioner.TimeBasedPartitioner'

The partitioner to use when writing data to the store.

path_format = "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH"

Pattern used to format the path in the S3 object name.

Portion of the path generated by the S3 connector’s partitioner. The default is for an hourly partitioner.

rotate_interval_ms = 600000

The time interval in milliseconds to invoke file commits.

Use this option to control the size of the objects in S3. For example, if the output data stream is 1Hz, a rotate interval of 600 seconds will create a file with aproximatelly 600 records if less than flush_size. Note that the lag of the connect-s3-sink consumer will increase until it accumulates records within 600 seconds and it will decrease again after the file is commited. For an hourly partitioner this configuration should create 6 parquet files in the destination path.

s3_bucket_name = ''

The S3 Bucket.

s3_region = 'us-east-1'

The AWS region to be used the connector.

schema_compatibility = 'NONE'

The schema compatibility rule.

The supported configurations are NONE, BACKWARD, FORWARD and FULL.

storage_class = 'io.confluent.connect.s3.storage.S3Storage'

The underlying storage layer.

tasks_max = 1
timestamp_extractor = 'Record'

The extractor determines how to obtain a timestamp from each record.

Values can be Wallclock to use the system time when the record is processed, Record (default) to use the timestamp of the Kafka record denoting when it was produced or stored by the broker, RecordField to extract the timestamp from one of the fields in the record’s value as specified by the timestamp_field configuration property.

timestamp_field = 'time'

The record field to be used as timestamp by the timestamp extractor.

Only applies if timestamp_extractor is set to RecordField.

timezone = 'UTC'

The timezone to use when partitioning with TimeBasedPartitioner.

topics = ''
topics_dir = 'topics'

Top level directory to store the data ingested from Kafka.

Methods Documentation

asjson() → str
static format_field_names(fields: List[Tuple[str, Any]]) → Dict[str, str]

Dictionary factory to use with the dataclasses.asdict() method.

Rename the field name replacing ‘_’ by ‘.’ and return a dictionary mapping field names to field values.

update_topics(topics: List[str]) → None

Update the list of Kafka topics.

Parameters:topics (list) – List of kafka topics.