JDBC Source connector
The JDBC Source connector is a Stateless NiFi dataflow developed by Cloudera that is running in the Kafka Connect framework. Learn about the connector, its properties, and configuration.
The JDBC Source connector reads records from a database table and transfers each record to
Kafka in Avro or JSON format. The Kafka topic this connector transfers messages to is
determined by the value of the topics
parameter in the configuration. The
schema of the records can be inherited from the schema of the database table, or it can be a
predefined schema retrieved from Schema Registry.
The connector supports incremental loading based on a table column containing increasing values (typically an ID sequence or timestamp). When the connector is started for the first time with incremental loading turned on, it can load all the existing data from the source table, or it can start from the current maximum value of the incrementing column.
Properties and configuration
Configuration is passed to the connector in a JSON file during creation. The properties of the connector can be categorized into three groups. These are as follows:
- Common connector properties
- These are the properties of the Kafka Connect framework that are accepted by all connectors. For a comprehensive list of these properties, see the Apache Kafka documentation.
- Stateless NiFi Source properties
- These are the properties that are specific to the Stateless NiFi Source connector. All Stateless NiFi Source connectors share and accept these properties. For a comprehensive list of these properties, see the Stateless NiFi Source property reference.
- Connector/dataflow-specific properties
- These are the properties that are unique to this specific connector. Or to be more
precise, unique to the dataflow running within the connector. These properties use the
following
prefix:
For a comprehensive list of these properties, see the JDBC Source properties reference.parameter.[***CONNECTOR NAME***] Parameters:
Notes and limitations
- Required properties must be assigned a valid value even if they are not used in the particular configuration. If a required property is not used, either leave its default value, or completely remove the property from the configuration JSON.
- If a property that has a default value is completely removed from the configuration JSON, the system uses the default value.
- Properties not marked as required must be completely removed from the configuration JSON if not set.
Schema Branch
andSchema Version
can not be specified at the same time.- The
tasks.max
property must be set to 1. Setting the property to any other value will not have an effect. This is because this connector performs incremental loading and must run as a single task. - If
Initial Load Strategy
is set toStart at Beginning
and “Maximum-value Columns
is not set, the connector tries to load the entire content of the source database table to the target Kafka topic with every execution of the connector. SetMaximum-value Columns
to avoid this.
Configuration example
In this example, the connector connects to a PostgreSQL database using username/password authentication, queries the records from a table, converts each record to JSON format, and sends them to a Kafka topic. The schema of the records is determined from the schema of the database table. The data loading is incremental, only new records are loaded based on a column containing incrementing values in the table.
{
"connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSourceConnector",
"meta.smm.predefined.flow.name": "JDBC Source",
"meta.smm.predefined.flow.version": "1.0.0",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"tasks.max": "1",
"nexus.url": "https://repository.cloudera.com/artifactory/repo",
"extensions.directory": "/tmp/nifi-stateless-extensions",
"working.directory": "/tmp/nifi-stateless-working",
"key.attribute": "kafka.message.key",
"topics": "[***KAFKA TOPIC NAME***]",
"parameter.JDBC Source Parameters:Database Connection URL": "[***JDBC URL***]",
"parameter.JDBC Source Parameters:Database Driver Location": "[***PATH TO JDBC DRIVER***]",
"parameter.JDBC Source Parameters:Database Driver Class Name": "org.postgresql.Driver",
"parameter.JDBC Source Parameters:Database Type": "PostgreSQL",
"parameter.JDBC Source Parameters:Database User Name": "[***USERNAME***]",
"parameter.JDBC Source Parameters:Database User Password": "[***PASSWORD***]",
"parameter.JDBC Source Parameters:Database Table Name": "[***TABLE NAME***]",
"parameter.JDBC Source Parameters:Kafka Message Key Column": "[***COLUMN NAME***]",
"parameter.JDBC Source Parameters:Maximum-value Columns": "[***COLUMN NAME***]",
"parameter.JDBC Source Parameters:Kafka Message Data Format": "JSON",
"parameter.JDBC Source Parameters:Schema Access Strategy": "Inherit Schema"
}
topics
- The name of the Kafka topic that the connector sends messages to.
Database Connection URL
- The JDBC URL of the PostgreSQL database. For example,
jdbc:postgresql://myhost:5432/mydb
. Database Driver Location
- A comma-separated list of files or folders containing the JDBC client libraries.
Database Driver Class Name
- The Java class name of the PostgreSQL Driver implementation.
Database Type
- The type of the database. Because this is a PostgreSQL example, the property is set
to
PostgreSQL
. Database User Name
- The username used for authenticating to the database.
Database User Password
- The password of the user.
Database Table Name
- The name of the database table to query data from.
Kafka Message Key Column
- Specifies a database table column. The value of the column specified is used as the key of the Kafka message.
Maximum-value Columns
- The name of the column used for incremental loading. The column value must be increasing, for example, coming from a sequence or system timestamp.
Kafka Message Data Format
- The format of the messages the connector sends to Kafka.
Schema Access Strategy
- Specifies the strategy used for determining the schema of the database record. In
this example, this property is set to
Inherit Schema
, meaning that the schema is determined (inherited) from the database table.
Stateless NiFi Source properties reference
Review the following reference for a comprehensive list of the connector properties that are specific to the Stateless NiFi Source connector.
In addition to the properties listed here, Stateless NiFi connectors also accept the properties of the Kafka Connect framework. For a comprehensive list of these properties, see the Apache Kafka documentation.
dataflow.timeout
- Description
- Specifies the maximum amount of time to wait for the dataflow to complete. If the dataflow does not complete before this timeout, the thread is interrupted and the dataflow is considered as a failure. The session is rolled back and the connector retriggers the flow. Defaults to 60 seconds if not specified.
- Default Value
- 60 seconds
- Accepted Values
- Required
- false
extensions.directory
- Description
- Specifies the directory that stores downloaded extensions. Extensions are the NAR (NiFi Archive) files containing the processors and controller services a flow might use. Since Stateless NiFi is only the NiFi engine, it does not contain any of the processors and controller services you might use in your flow. When deploying the connector with the custom flow, the system needs to download the specific extensions that your flow uses from Nexus (unless they are already present in this directory). These extensions are stored in this directory. Because the default directory might not be writable, and to aid in upgrade scenarios, Cloudera recommends that you always specify an extensions directory.
- Default Value
- /tmp/nifi-stateless-extensions
- Accepted Values
- Required
- true
flow.snapshot
- Description
- Specifies the dataflow to run. When using Streams Messaging Manager to deploy a connector, the value you set in this property must be a JSON object. URLs, file paths, or escaped JSON strings are not supported when using Streams Messaging Manager. Alternatively, if using the Kafka Connect REST API to deploy a connector, this can be a file containing the dataflow, a URL that points to a dataflow, or a string containing the entire dataflow as an escaped JSON. Cloudera however, does not recommend using the Kafka Connect REST API to interact with this connector or Kafka Connect.
- Default Value
- Accepted Values
- Required
- true
header.attribute.regex
- Description
- A Java regular expression that is evaluated against all flowfile attribute names. Any attribute name matching the regular expression is converted into a Kafka message header. The name of the attribute is used as the header key, the value of the attribute is used as the header value. If not specified, headers are not added to the Kafka record.
- Default Value
- Accepted Values
- Required
- false
header.name.regex
- Description
-
A Java regular expression that will be evaluated against all flowfile attribute names. For any attribute whose name matches the regular expression, the Kafka record will have a header whose name matches the attribute name and whose value matches the attribute value. If not specified, the Kafka record will have no headers added to it.
- Default Value
- Accepted Values
- Required
- false
key.attribute
- Description
- Specifies the name of a flowfile attribute that should be used to specify the key of the Kafka record. If not specified, the Kafka record will not have a key associated with it. If specified, but the attribute does not exist on a particular flowfile, it will also have no key associated with it.
- Default Value
- Accepted Values
- Required
- false
krb5.file
- Description
- Specifies the
krb5.conf
file to use if the dataflow interacts with any services that are secured using Kerberos. Defaults to/etc/krb5.conf
if not specified. - Default Value
- /etc/krb5.conf
- Accepted Values
- Required
- false
name
- Description
- The name of the connector. On the Streams Messaging Manager UI, the
connector names are specified using the Enter Name field. The
name that you enter in the Enter Name field is automatically set
as the value of the
name
property when the connector is deployed. Because of this, the name property is omitted from the configuration template provided in Streams Messaging Manager. If you manually add the name property to the configuration in Streams Messaging Manager, ensure that the value you set matches the connector name specified in the Enter Name field. Otherwise, the connector fails to deploy. - Default Value
- Accepted Values
- Required
- True
nexus.url
- Description
- Specifies the Base URL of the Nexus instance to source extensions from. If configuring
a Nexus instance that has multiple repositories, include the name of the repository in
the URL. For example,
https://nexus-private.myorganization.org/nexus/repository/my-repository/
. If the property is not specified, the necessary extensions (the ones used by the flow) must be provided in the extensions directory before deploying the connector. - Default Value
- Accepted Values
- Required
- true
output.port
- Description
- The name of the output port in the NiFi dataflow to pull data from. If the dataflow contains exactly one port, this property is optional and can be omitted. However, if the dataflow contains multiple ports (for example, a success and a failure port), this property must be specified. If any flowfile is sent to any port other than the specified port, it is considered as a failure. The session is rolled back and no data is collected.
- Default Value
- Accepted Values
- Required
- false
parameter.[***FLOW PARAMETER NAME***]
- Description
- Specifies a parameter to use in the dataflow. For example, assume that you have the
following entry in your connector configuration
"parameter.Directory": "/mydir".
In a case like this, any parameter context in the dataflow that has a parameter namedDirectory
gets the specified value (/mydir
). If the dataflow has child process groups, and those child process groups have their own parameter contexts, the value is used for all parameter contexts that contain a parameter namedDirectory
. Parameters can also be applied to specific Parameter Contexts only. This can be done by prefixing the parameter name (Directory
) with the name of the parameter context followed by a colon. For example,parameter.My Context:Directory
only applies the specified value for theDirectory
parameter in the parameter context named My Context. - Default Value
- Accepted Values
- Required
- false
topic.name.attribute
- Description
- Specifies the name of a flowfile attribute to use for determining which Kafka topic a
flowfile is sent to. Either the
topics
ortopic.name.attribute
property must be specified. If both are specified,topic.name.attribute
takes precedence. However, if a flowfile does not have the specified attribute name, then the connector falls back to using the topics property. - Default Value
- Accepted Values
- Required
- false
topics
- Description
- The name of the topic to deliver data to. All flowfiles are delivered to the topic
specified here. However, it is also possible to determine the topic individually for
each flowfile. To do this, ensure that the dataflow specifies the topic name in an
attribute, and then use
topic.name.attribute
to specify the name of the attribute instead of topic name. For example, if you wanted a separate Kafka topic for each data source, you can omit thetopics
property and instead specify the attribute (for example,datasource.hostname
) corresponding to the topic using thetopic.name.attribute
property. - Default Value
- Accepted Values
- Required
- true
working.directory
- Description
- Specifies a directory on the Connect server that NiFi should use for unpacking
extensions that it needs to perform the dataflow. The contents of
extensions.directory
are unpacked here. Defaults to/tmp/nifi-stateless-working
if not specified. - Default Value
- /tmp/nifi-stateless-working
- Accepted Values
- Required
- false
JDBC Source properties reference
Review the following reference for a comprehensive list of the connector properties that are specific to the JDBC Source connector.
parameter.[***CONNECTOR NAME***] Parameters:
In addition to the properties listed here, this connector also accepts certain properties of the Kafka Connect framework as well as the properties of the NiFi Stateless Source connector. When creating a new connector using the Streams Messaging Manager UI, all valid properties are presented in the default configuration template. You can view the configuration template to get a full list of valid properties. In addition, for more information regarding the accepted properties not listed here, you can review the Apache Kafka documentation and the Stateless NiFi Source property reference.
Additional WHERE clause
- Description
- Specifies a custom condition to add in the WHERE clause of the SQL query.
- Default Value
- Accepted Values
- Required
- false
Columns to Return
- Description
- A comma-separated list of column names to be returned by the query. All columns are returned if it is not specified.
- Default Value
- Accepted Values
- Required
- false
Custom Query
- Description
- Specifies a custom SQL query to use instead of generating the query from the table and column names.
- Default Value
- Accepted Values
- Required
- false
Database Connection URL
- Description
- The database specific connection URL used for connecting to the database. For example,
jdbc:postgresql://localhost:5432/postgres
. - Default Value
- Accepted Values
- Required
- true
Database Driver Class Name
- Description
- The database driver class name For example,
org.postgresql.Driver
. - Default Value
- Accepted Values
- Required
- true
Database Driver Location
- Description
- A comma-separated list of files or folders containing the JDBC client libraries.
- Default Value
- Accepted Values
- Required
- true
Database Table Name
- Description
- The name of the database table to query.
- Default Value
- Accepted Values
- Required
- true
Database Type
- Description
- The database type used for generating database specific SQL queries.
- Default Value
- Generic
- Accepted Values
- Generic, Oracle, Oracle 12+, MS SQL 2008, MS SQL 2012+, MySQL, PostgreSQL
- Required
- true
Database User Name
- Description
- The database user name. If username/password authentication is not required by the database server, this property must be completely removed from the configuration JSON.
- Default Value
- Accepted Values
- Required
- false
Database User Password
- Description
- The database user password. If username/password authentication is not required by the database server, this property must be completely removed from the configuration JSON.
- Default Value
- Accepted Values
- Required
- false
Date Format
- Description
- Specifies the format to use when writing Date fields to JSON.
- Default Value
- yyyy-MM-dd
- Accepted Values
- Required
- true
Initial Load Strategy
- Description
- Specifies how existing rows in the database table are handled when the connector is started for the first time.
- Default Value
- Start at Beginning
- Accepted Values
- Start at Beginning, Start at Current Maximum Values
- Required
- true
Kafka Message Data Format
- Description
- Specifies the format of the messages the connector sends to Kafka. The database row is converted to this format. If the output format is Avro and Schema Access Strategy is set to Inherit Schema, the schema is embedded in the output message. If the output format is Avro and Schema Access Strategy is set to Schema Registry, the schema is not embedded in the output message.
- Default Value
- Avro
- Accepted Values
- Avro, JSON
- Required
- true
Kafka Message Key Column
- Description
- Specifies a database table column. The value of the column specified is used as the key of the Kafka message.
- Default Value
- Accepted Values
- Required
- false
Kerberos Keytab
- Description
- The fully-qualified filename of the kerberos keytab associated with the principal for accessing Schema Registry.
- Default Value
- The location of the default keytab which is empty and can only be used for unsecure connections.
- Accepted Values
- Required
- true
Kerberos Principal
- Description
- The Kerberos principal used for authenticating to Schema Registry.
- Default Value
- default
- Accepted Values
- Required
- true
Maximum-value Columns
- Description
- A comma-separated list of column names used for incremental loading.
- Default Value
- Accepted Values
- Required
- false
Schema Access Strategy
- Description
- Specifies the strategy used for determining the schema of the database record.
- If set to
Inherit Schema
, the schema is determined from the database schema. - If set to
Schema Registry
, the schema is read from Schema Registry. - If set to
HWX Content-Encoded Schema Reference
, the schema is read from Schema Registry. This setting can only be used ifKafka Message Data Format
isAvro
. In this case the Avro messages are expected to have a reference to the schema in Schema Registry encoded within the message content.
- If set to
- Default Value
- Inherit Schema
- Accepted Values
- Inherit Schema, Schema Registry, HWX Content-Encoded Schema Reference
- Required
- true
Schema Branch
- Description
- The name of the branch to use when looking up the schema in Schema Registry.
Schema Branch
andSchema Version
cannot be specified at the same time. If one is specified, the other needs to be removed from the configuration. If Schema Registry is not used, this property must be completely removed from the configuration. - Default Value
- Accepted Values
- Required
- false
Schema Name
- Description
- The schema name to look up in Schema Registry. If the
Schema Access Strategy
property is set toSchema Registry
, this property must contain a valid schema name. If Schema Registry is not used, this property must be completely removed from the configuration JSON. - Default Value
- Accepted Values
- Required
- false
Schema Registry URL
- Description
- The URL of the Schema Registry server. If Schema Registry is not used, this property must be completely removed from the configuration JSON.
- Default Value
- http://localhost:7788/api/v1
- Accepted Values
- Required
- true
Schema Version
- Description
- The version of the schema to look up in Schema Registry. If Schema Registry is used
and a schema version is not specified, the latest version of the schema is retrieved.
Schema Branch
andSchema Version
cannot be specified at the same time. If one is specified, the other needs to be removed from the configuration. If Schema Registry is not used, this property must be completely removed from the configuration. - Default Value
- Accepted Values
- Required
- false
Time Format
- Description
- Specifies the format to use when writing Time fields to JSON.
- Default Value
- HH:mm:ss
- Accepted Values
- Required
- true
Timestamp Format
- Description
- Specifies the format to use when writing Timestamp fields to JSON.
- Default Value
- yyyy-MM-dd HH:mm:ss.SSS
- Accepted Values
- Required
- true
Truststore Filename
- Description
- The fully-qualified filename of a truststore. This truststore is used to establish a secure connection with Schema Registry using HTTPS.
- Default Value
- The location of the default truststore which is empty and can only be used for unsecure connections.
- Accepted Values
- Required
- tue
Truststore Password
- Description
- The password used to access the contents of the truststore configured in the
Truststore Filename
property. - Default Value
- password
- Accepted Values
- Required
- true
Truststore Type
- Description
- The type of the truststore configured in the
Truststore Filename
property. - Default Value
- JKS
- Accepted Values
- BCFKS, PKCS12, JKS
- Required
- true