CLI Reference

kafkaconnect

Command-line interface for kafkaconnect.

kafkaconnect is a Connect API client that helps to configure and manage Kafka connectors.

kafkaconnect [OPTIONS] COMMAND [ARGS]...

Options

-b, --broker <broker_url>

Kafka Broker URL. Alternatively set via $KAFKA_BROKER_URL env var.

Default:localhost:9092
-c, --connect <connect_url>

Kafka Connect URL. Alternatively set via $KAFKA_CONNECT_URL env var.

Default:http://localhost:8083
--version

Show the version and exit.

config

Get the connector configuration.

kafkaconnect config [OPTIONS] NAME

Arguments

NAME

Required argument

create

Create a new connector.

Each subcommand creates a different connector.

kafkaconnect create [OPTIONS] COMMAND [ARGS]...

influxdb-sink

Create an instance of the InfluxDB Sink connector.

A list of topics can be specified using the TOPICLIST argument. If not, topics are discovered from Kafka. Use the --topic-regex and --excluded_topics options to help in selecting the topics that you want to write to InfluxDB. To check for new topics and update the connector configuration use the --auto-update and --check-interval options.

kafkaconnect create influxdb-sink [OPTIONS] [TOPICLIST]...

Options

-n, --name <name>

Name of the connector. Alternatively set via the $KAFKA_CONNECT_NAME env var.

Default:influxdb-sink
-i, --influxdb_url <connect_influx_url>

InfluxDB connection URL. Alternatively set via the $KAFKA_CONNECT_INFLUXDB_URL env var.

Default:http://localhost:8086
-d, --database <connect_influx_db>

InfluxDB database name. The database must exist at InfluxDB. Alternatively set via the $KAFKA_CONNECT_DATABASE env var.

Default:
-t, --tasks-max <tasks_max>

Number of Kafka Connect tasks. Alternatively set via the $KAFKA_CONNECT_TASKS_MAX env var.

Default:1
-u, --username <connect_influx_username>

InfluxDB username. Alternatively set via the $KAFKA_CONNECT_INFLUXDB_USERNAME env var. Use ‘-‘ for unauthenticated users.

Default:
-p, --password <connect_influx_password>

InfluxDB password. Alternatively set via the $KAFKA_CONNECT_INFLUXDB_PASSWORD env var.

Default:
-r, --topic-regex <topic_regex>

Regex for selecting topics. Alternatively set via the $KAFKA_CONNECT_TOPIC_REGEX env var.

Default:.*
--dry-run

Show the InfluxDB Sink Connector configuration but do not create the connector.

--auto-update

Check for new topics and update the connector. See also the –check-interval option.

-v, --validate

Validate the connector configuration before creating.

-c, --check-interval <check_interval>

The interval, in milliseconds, to check for new topics and updatethe connector.

Default:15000
-e, --excluded_topics <excluded_topics>

Comma separated list of topics to exclude from selection. Alternatively set via the $KAFKA_CONNECT_EXCLUDED_TOPICS env var.

Default:
--timestamp <timestamp>

Timestamp to use as the InfluxDB time.

Default:sys_time()
--error-policy <connect_influx_error_policy>

Specifies the action to be taken if an error occurs while inserting the data. There are three available options, NOOP, the error is swallowed, THROW, the error is allowed to propagate and RETRY. For RETRY the Kafka message is redelivered up to a maximum number of times specified by the --max-retries option. The retry interval is specified by the --retry-interval option. Alternatively set via the $KAFKA_CONNECT_ERROR_POLICY env var.

Default:THROW
Options:NOOP|THROW|RETRY
--max-retries <connect_influx_max_retries>

The maximum number of times a message is retried. Only valid when the --error-policy is set to RETRY. Alternatively set via the $KAFKA_CONNECT_MAX_RETRIES env var.

Default:10
--retry-interval <connect_influx_retry_interval>

The interval, in milliseconds between retries. Only valid when the --error-policy is set to RETRY. Alternatively set via the $KAFKA_CONNECT_RETRY_INTERVAL env var.

Default:60000
--progress-enabled <connect_progress_enabled>

Enables the output for how many records have been processed. Alternatively set via the $KAFKA_CONNECT_PROGRESS_ENABLED env var.

Default:False

Arguments

TOPICLIST

Optional argument(s)

s3-sink

Create an instance of the S3 Sink connector.

A list of topics can be specified using the TOPICLIST argument. If not, topics are discovered from Kafka. Use the --topic-regex and --excluded_topics options to help in selecting the topics that you want to write to S3. To check for new topics and update the connector configuration use the --auto-update and --check-interval options.

kafkaconnect create s3-sink [OPTIONS] [TOPICLIST]...

Options

-n, --name <name>

Name of the connector. Alternatively set via the $KAFKA_CONNECT_NAME env var.

Default:s3-sink
-b, --bucket-name <s3_bucket_name>

s3 bucket name. Must exist already. Alternatively set via the $KAFKA_CONNECT_S3_BUCKECT_NAME env var.

Default:
-r, --region <s3_region>

s3 region.Alternatively set via the $KAFKA_CONNECT_S3_REGION env var.

Default:us-east-1
-d, --topics-dir <topics_dir>

Top level directory to store the data ingested from Kafka. Alternatively set via the $KAFKA_CONNECT_S3_TOPICS_DIR env var.

Default:topics
--flush-size <flush_size>

Number of records written to store before invoking file commits.Alternatively set via the $KAFKA_CONNECT_S3_FLUSH_SIZE env var. Use ‘-‘ for unauthenticated users.

Default:3600
--rotate-interval-ms <rotate_interval_ms>

The time interval in milliseconds to invoke file commits. Alternatively set via the $KAFKA_CONNECT_S3_ROTATE_INTERVAL_MS env var. Use ‘-‘ for unauthenticated users.

Default:600000
-p, --partition-duration-ms <partition_duration_ms>

The duration of a partition in milliseconds used by TimeBasedPartitioner. Alternatively set via the $KAFKA_CONNECT_S3_PARTITION_DURATION_MS env var.

Default:3600000
--path-format <path_format>

Pattern used to format the path in the S3 object name. Portion of the path generated by the S3 connector’s partitioner. The default is for an hourly partitioner. Alternatively set via the $KAFKA_CONNECT_S3_PATH_FORMAT env var.

Default:‘year’=YYYY/’month’=MM/’day’=dd/’hour’=HH
-t, --tasks-max <tasks_max>

Number of Kafka Connect tasks. Alternatively set via the $KAFKA_CONNECT_TASKS_MAX env var.

Default:1
--topic-regex <topic_regex>

Regex for selecting topics. Alternatively set via the $KAFKA_CONNECT_TOPIC_REGEX env var.

Default:.*
--dry-run

Show the InfluxDB Sink Connector configuration but do not create the connector.

--auto-update

Check for new topics and update the connector. See also the --check-interval option.

-v, --validate

Validate the connector configuration before creating.

-c, --check-interval <check_interval>

The interval, in milliseconds, to check for new topics and updatethe connector.

Default:15000
-e, --excluded-topics <excluded_topics>

Comma separated list of topics to exclude from selection. Alternatively set via the $KAFKA_CONNECT_EXCLUDED_TOPICS env var.

Default:
--locale <locale>

The locale to use when partitioning with TimeBasedPartitioner.

Default:en-US
--timezone <timezone>

The timezone to use when partitioning with TimeBasedPartitioner.

Default:UTC
--timestamp-extractor <timestamp_extractor>

The extractor determines how to obtain a timestamp from each record. Values can be Wallclock to use the system time when the record is processed, Record to use the timestamp of the Kafka record denoting when it was produced or stored by the broker, RecordField to extract the timestamp from one of the fields in the record’s value as specified by the timestamp_field configuration property.

Default:Record
--timestamp-field <timestamp_field>

The record field to be used as timestamp by the timestamp extractor. Only applies if timestamp_extractor is set to RecordField.

Default:time

Arguments

TOPICLIST

Optional argument(s)

delete

Delete a connector. Halt tasks and remove the connector configuration.

kafkaconnect delete [OPTIONS] NAME

Arguments

NAME

Required argument

help

Show help for any command.

kafkaconnect help [OPTIONS] [TOPIC]

Arguments

TOPIC

Optional argument

info

Get information about the connector.

kafkaconnect info [OPTIONS] NAME

Arguments

NAME

Required argument

list

Get a list of active connectors.

kafkaconnect list [OPTIONS]

pause

Pause the connector and its tasks.

kafkaconnect pause [OPTIONS] NAME

Arguments

NAME

Required argument

plugins

Get a list of connector plugins available in the Connect cluster.

kafkaconnect plugins [OPTIONS]

restart

Restart a connector and its tasks.

kafkaconnect restart [OPTIONS] NAME

Arguments

NAME

Required argument

resume

Resume a paused connector.

kafkaconnect resume [OPTIONS] NAME

Arguments

NAME

Required argument

status

Get the connector status.

kafkaconnect status [OPTIONS] NAME

Arguments

NAME

Required argument

tasks

Get a list of tasks currently running for the connector.

kafkaconnect tasks [OPTIONS] NAME

Arguments

NAME

Required argument

topics

Get the list of topic names used by the connector.

kafkaconnect topics [OPTIONS] NAME

Arguments

NAME

Required argument