InfluxConfig

class kafkaconnect.influxdb_sink.config.InfluxConfig(name: str = 'influxdb-sink', connector_class: str = 'com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector', topics: str = '', tasks_max: int = 1, connect_influx_url: str = 'http://localhost:8086', connect_influx_db: str = '', connect_influx_kcql: str = '', connect_influx_username: str = '-', connect_influx_password: str = '', connect_influx_timestamp: str = 'sys_time()', connect_influx_error_policy: str = 'THROW', connect_influx_max_retries: str = '10', connect_influx_retry_interval: str = '60000', connect_progress_enabled: bool = False)

Bases: kafkaconnect.config.ConnectConfig

InfluxDB connector configuration

Attributes Summary

connect_influx_db InfluxDB database name.
connect_influx_error_policy Connector error policy configuration.
connect_influx_kcql KCQL queries to extract fields from topics.
connect_influx_max_retries Connector error policy configuration.
connect_influx_password InfluxDB password.
connect_influx_retry_interval Connector error policy configuration.
connect_influx_timestamp Timestamp to use as the InfluxDB time.
connect_influx_url InfluxDB connection URL.
connect_influx_username InfluxDB username.
connect_progress_enabled Enables the output for how many records have been processed.
connector_class Stream reactor InfluxDB Sink connector class
name Name of the connector.
tasks_max
topics

Methods Summary

asjson()
format_field_names(fields, Any]]) Dictionary factory to use with the dataclasses.asdict() method.
update_topics(topics, timestamp) Update the list of Kafka topics and Influx KCQL queries.

Attributes Documentation

connect_influx_db = ''

InfluxDB database name.

connect_influx_error_policy = 'THROW'

Connector error policy configuration.

See https://docs.lenses.io/connectors/sink/influx.html

connect_influx_kcql = ''

KCQL queries to extract fields from topics.

We assume that a topic has a flat structure so that SELECT * FROM will retrieve all topic fields. This is configuration is derived from the list of topics and from the timestamp to use as the InfluxDB time.

connect_influx_max_retries = '10'

Connector error policy configuration.

See https://docs.lenses.io/connectors/sink/influx.html

connect_influx_password = ''

InfluxDB password.

connect_influx_retry_interval = '60000'

Connector error policy configuration.

See https://docs.lenses.io/connectors/sink/influx.html

connect_influx_timestamp = 'sys_time()'

Timestamp to use as the InfluxDB time.

connect_influx_url = 'http://localhost:8086'

InfluxDB connection URL.

connect_influx_username = '-'

InfluxDB username.

connect_progress_enabled = False

Enables the output for how many records have been processed.

connector_class = 'com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector'

Stream reactor InfluxDB Sink connector class

name = 'influxdb-sink'

Name of the connector.

The connector name must be unique accross the cluster.

tasks_max = 1
topics = ''

Methods Documentation

asjson() → str
static format_field_names(fields: List[Tuple[str, Any]]) → Dict[str, str]

Dictionary factory to use with the dataclasses.asdict() method.

Rename the field name replacing ‘_’ by ‘.’ and return a dictionary mapping field names to field values.

update_topics(topics: List[str], timestamp: str = 'sys_time()') → None

Update the list of Kafka topics and Influx KCQL queries.

Parameters:
  • topics (list) – List of kafka topics.
  • timestamp (str) – Timestamp used as influxDB time. Default is sys_time() you can use the name of a field as well.