ADLS Sink connector

The ADLS Sink connector is a Stateless NiFi dataflow developed by Cloudera that is running in the Kafka connect framework. Learn about the connector, its properties, and configuration.

The ADLS Sink connector fetches messages from Kafka and uploads them to ADLS. The topic this connector receives messages from is determined by the value of the topics property in the configuration. The messages can contain unstructured (character or binary) data or they can be in Avro or JSON format.

If the input is unstructured data, record processing is disabled. In a case like this, multiple messages can be concatenated into a single output file on ADLS using a demarcator (for example, newline for text messages). Merging is optional, large binary data can be forwarded to Azure at a 1:1 ratio, one Kafka message equals a single ADLS file.

If the input is either Avro or JSON, record processing is enabled. In a case like this, the schema of the records can be a predefined schema retrieved from Schema Registry (for both Avro and JSON data), it can be embedded in the Avro data, or inferred from the JSON data. The output can be Avro, JSON or CSV. Multiple records are typically merged into a single output file before uploading to ADLS.

The connector can authenticate to Azure using an Account Key, SAS Token, Service Principal, or a Managed Identity.

Properties and configuration

Configuration is passed to the connector in a JSON file during creation. The properties of the connector can be categorized into three groups. These are as follows:

Common connector properties
These are the properties of the Kafka Connect framework that are accepted by all connectors. For a comprehensive list of these properties, see the Apache Kafka documentation.
Stateless NiFi Sink properties
These are the properties that are specific to the Stateless NiFi Sink connector. All Stateless NiFi Sink connectors share and accept these properties. For a comprehensive list of these properties, see the Stateless NiFi Sink properties reference.
Connector/dataflow-specific properties
These properties are unique to this specific connector. Or to be more precise, unique to the dataflow running within the connector. These properties use the following prefix:
parameter.[***CONNECTOR NAME***] Parameters:
For a comprehensive list of these properties, see the ADLS Sink properties reference.

Notes and limitations

  • Required properties must be assigned a valid value even if they are not used in the particular configuration. If a required property is not used, either leave its default value, or completely remove the property from the configuration JSON.
  • If a property that has a default value is completely removed from the configuration JSON, the system uses the default value.
  • Properties not marked as required must be completely removed from the configuration JSON if not set.
  • Schema Branch and Schema Version can not be specified at the same time.

Configuration example for fetching unstructured data

In this example, the connector fetches unstructured messages containing single line character data from Kafka. The connector concatenates the messages using newline characters as the demarcator. The files that the connector creates and uploads to ADLS are maximum 10 MB in size. The files are named according to the following pattern: messages_[***UUID***]. The connector authenticates to Azure using an Account Key.

{
 "connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSinkConnector",
 "meta.smm.predefined.flow.name": "ADLS Sink",
 "meta.smm.predefined.flow.version": "1.0.0",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
 "tasks.max": "1",
 "nexus.url": "https://repository.cloudera.com/artifactory/repo",
 "extensions.directory": "/tmp/nifi-stateless-extensions",
 "working.directory": "/tmp/nifi-stateless-working",
 "failure.ports": "PutAzureDataLakeStorage Failure",
 "topics": "[***KAFKA TOPIC NAME***]",
 "parameter.ADLS Sink Parameters:ADLS Account Name": "[***ACCOUNT NAME***]",
 "parameter.ADLS Sink Parameters:ADLS Account Key": "[***ACCOUNT KEY***]",
 "parameter.ADLS Sink Parameters:ADLS Filesystem Name": "[***FILESYSTEM NAME***]",
 "parameter.ADLS Sink Parameters:ADLS Directory Name": "[***DIRECTORY NAME***]",
 "parameter.ADLS Sink Parameters:Maximum File Size": "10 MB",
 "parameter.ADLS Sink Parameters:Kafka Message Data Format": "Raw",
 "parameter.ADLS Sink Parameters:Output File Demarcator": "\n",
 "parameter.ADLS Sink Parameters:Output Filename Pattern": "messages_${filename.uuid}"
}
The following list collects the properties from the configuration example that must be customized for this use case.
topics
The name of the Kafka topic the connector fetches messages from.
ADLS Account Name
The Storage Account Name to use for authentication to Azure. This is also the target account where the output file is uploaded.
ADLS Account Key
The Storage Account Key to use for authentication to Azure.
ADLS Filesystem Name
The filesystem (or container) in the Storage Account to upload data to.
ADLS Directory Name
The target directory in the filesystem.
Maximum File Size
The maximum size of the output data file. In this example, the maximum size is 10 MB.
Kafka Message Data Format
The format of the messages the connector receives from Kafka. In this example, this property is set to Raw. This means that the connector expects unstructured text or binary data.
Output File Demarcator
Specifies the character sequence for demarcating (delimiting) message boundaries when multiple Kafka messages are ingested into an output file as raw messages. In this example, the property is set to \n (newline). This means that the newline character is used to separate the single line character data of the Kafka messages.
Output Filename Pattern
Specifies the structure of the name of the output file. This property accepts string literals (fixed text) as well as various expressions. In this example, the property is set to messages_${filename.uuid}.

messages_ is a string literal. ${filename.uuid} is an expression that inserts a generated UUID in the filename. As a result, the files are named according to the following pattern: messages_[***UUID***].

Configuration example for fetching JSON data

In this example, the connector fetches messages in JSON format from Kafka. The connector parses the JSON records and converts them to Avro format using the schema inferred from the JSON data. The schema is also embedded in the output file. The connector merges multiple records (all messages that are available for a single execution of the connector) into an Avro file, generates a name for the file using the following pattern data_[***TIMESTAMP***]_[***SEQUENCE***].avro, and uploads the file to ADLS. The connector authenticates to Azure using a Service Principal.

{
 "connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSinkConnector",
 "meta.smm.predefined.flow.name": "ADLS Sink",
 "meta.smm.predefined.flow.version": "1.0.0",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
 "nexus.url": "https://repository.cloudera.com/artifactory/repo",
 "extensions.directory": "/tmp/nifi-stateless-extensions",
 "working.directory": "/tmp/nifi-stateless-working",
 "failure.ports": "PutAzureDataLakeStorage Failure",
 "topics": "[***KAFKA TOPIC NAME***]",
 "parameter.ADLS Sink Parameters:ADLS Account Name": "[***ACCOUNT NAME***]",
 "parameter.ADLS Sink Parameters:Azure Service Principal Tenant ID": "[***SERVICE PRINCIPAL TENANT ID***]",
 "parameter.ADLS Sink Parameters:Azure Service Principal Client ID": "[***SERVICE PRINCIPAL CLIENT ID***]",
 "parameter.ADLS Sink Parameters:Azure Service Principal Client Secret": "[***SERVICE PRINCIPAL CLIENT SECRET***]",
 "parameter.ADLS Sink Parameters:ADLS Filesystem Name": "[***FILESYSTEM NAME***]",
 "parameter.ADLS Sink Parameters:ADLS Directory Name": "[***DIRECTORY NAME***]",
 "parameter.ADLS Sink Parameters:Kafka Message Data Format": "JSON",
 "parameter.ADLS Sink Parameters:Schema Access Strategy": "Infer Schema",
 "parameter.ADLS Sink Parameters:Avro Schema Write Strategy": "Embed Avro Schema",
 "parameter.ADLS Sink Parameters:Output File Data Format": "Avro",
 "parameter.ADLS Sink Parameters:Output Filename Pattern": "data_${filename.timestamp}_${filename.sequence}.avro",
 "parameter.ADLS Sink Parameters:Output Filename Timestamp Format": "yyyyMMdd_HHmmss_SSS"
}

The following list collects the properties from the configuration example that must be customized for this use case.

topics
The name of the Kafka topic the connector fetches messages from.
ADLS Account Name
The Storage Account Name to use for authentication to Azure. This is also the target account where the output file is uploaded.
Service Principal *
The properties of the Service Principal to use for authentication to Azure.
ADLS Filesystem Name
The filesystem (or container) in the Storage Account to upload data to.
ADLS Directory Name
The target directory in the filesystem.
Kafka Message Data Format
The format of the messages the connector receives from Kafka. In this example, this property is set to JSON. This means that the connector expects JSON data.
Schema Access Strategy
Specifies the strategy used for determining the schema of the Kafka record. In this example, this property is set to Infer Schema, meaning that the schema is determined (inferred) from the JSON data.
Avro Schema Write Strategy
Specifies whether the record schema is written to the output data file. In this example, this property is set to Embed Avro Schema, meaning that the schema is embedded in the output Avro file.
Output File Data Format
Specifies the format of the records written to the output file. In this example, this property is set to Avro, meaning that the output file format is Avro.
Output Filename Pattern
Specifies the structure of the name of the output file. This property accepts string literals (fixed text) as well as various expressions. In this example, the property is set to data_${filename.timestamp}_${filename.sequence}.avro.

data_, the underscore ( _ ), and .avro are string literals. ${filename.timestamp} and ${filename.sequence} are expressions. ${filename.timestamp} inserts the current timestamp in the filename. ${filename.sequence} inserts an incrementing sequence value in the filename. As a result, the files are named according to the following pattern: data_[***TIMESTAMP***]_[***SEQUENCE***].avro.

Output Filename Timestamp Format
Specifies the timestamp format used in the ${filename.timestamp} expression. The expression is used when generating the output filename.

Stateless NiFi Sink properties reference

Review the following reference for a comprehensive list of the connector properties that are specific to the Stateless NiFi Sink connector.

In addition to the properties listed here, Stateless NiFi connectors also accept the properties of the Kafka Connect framework. For a comprehensive list of these properties, see the Apache Kafka documentation.

attribute.prefix

Description
The prefix to add to the key of each header that matches the regular expression specified in headers.as.attributes.regex. For example, if the header key is MyHeader, its value is MyValue, headers.as.attributes.regex is set to My.*, and this property is set to kafka, the flowfile that is created for the Kafka message will have an attribute named kafka.MyHeader with a value of MyValue.
Default Value
Accepted Values
Required
false

dataflow.timeout

Description
Specifies the maximum amount of time to wait for the dataflow to complete. If the dataflow does not complete before this timeout, the thread is interrupted and the dataflow is considered as a failure. The session is rolled back and the connector retriggers the flow. Defaults to 60 seconds if not specified.
Default Value
60 seconds
Accepted Values
Required
false

extensions.directory

Description
Specifies the directory that stores downloaded extensions. Extensions are the NAR (NiFi Archive) files containing the processors and controller services a flow might use. Since Stateless NiFi is only the NiFi engine, it does not contain any of the processors and controller services you might use in your flow. When deploying the connector with the custom flow, the system needs to download the specific extensions that your flow uses from Nexus (unless they are already present in this directory). These extensions are stored in this directory. Because the default directory might not be writable, and to aid in upgrade scenarios, Cloudera recommends that you always specify an extensions directory.
Default Value
/tmp/nifi-stateless-extensions
Accepted Values
Required
true

failure.ports

Description
A comma separated list of output ports that are considered as failure conditions. If any flowfile is routed to an output port specified in this property, the dataflow is considered a failure and the session is rolled back. After a set amount of time, the dataflow reattempts to process the Kafka record. Any data transferred to an output port that is not in the list of failure ports is discarded.

Because of how Stateless NiFi Sink connectors behave, even if a single flowfile ends up in an output port that is marked as failure, the entire sessions is rolled back with all messages in the batch. Furthermore, if a flowfile ends up in a failure port in each subsequent iteration, the result is an endless loop. With some sink connectors (for example. MQTT Sink) this is the desired behavior. For more information regarding this behavior, see Dataflow execution and scheduling.

Default Value
Accepted Values
Required
false

flow.snapshot

Description
Specifies the dataflow to run. When using Streams Messaging Manager to deploy a connector, the value you set in this property must be a JSON object. URLs, file paths, or escaped JSON strings are not supported when using Streams Messaging Manager. Alternatively, if using the Kafka Connect REST API to deploy a connector, this can be a file containing the dataflow, a URL that points to a dataflow, or a string containing the entire dataflow as an escaped JSON. Cloudera however, does not recommend using the Kafka Connect REST API to interact with this connector or Kafka Connect.
Default Value
Accepted Values
Required
true

headers.as.attributes.regex

Description
A Java regular expression that is evaluated against all Kafka record headers. Headers are added to the flowfile as an attribute if the header key matches the regular expression. The header key is used as the attribute name. The header value is used as the attribute value. Additionally, the name of the attribute can also contain an optional prefix which is defined by the attribute.prefix property.
Default Value
Accepted Values
Required
false

input.port

Description
The name of the input port in the NiFi dataflow that Kafka records are sent to. If the dataflow contains exactly one input port, this property is optional and can be omitted. However, if the dataflow contains multiple input ports, this property must be specified.
Default Value
Accepted Values
Required
false

krb5.file

Description
Specifies the krb5.conf file to use if the dataflow interacts with any services that are secured using Kerberos. Defaults to /etc/krb5.conf if not specified.
Default Value
/etc/krb5.conf
Accepted Values
Required
false

name

Description
The name of the connector. On the Streams Messaging Manager UI, the connector names are specified using the Enter Name field. The name that you enter in the Enter Name field is automatically set as the value of the name property when the connector is deployed. Because of this, the name property is omitted from the configuration template provided in Streams Messaging Manager. If you manually add the name property to the configuration in Streams Messaging Manager, ensure that the value you set matches the connector name specified in the Enter Name field. Otherwise, the connector fails to deploy.
Default Value
Accepted Values
Required
true

nexus.url

Description
Specifies the Base URL of the Nexus instance to source extensions from. If configuring a Nexus instance that has multiple repositories, include the name of the repository in the URL. For example, https://nexus-private.myorganization.org/nexus/repository/my-repository/. If the property is not specified, the necessary extensions (the ones used by the flow) must be provided in the extensions directory before deploying the connector.
Default Value
Accepted Values
Required
true

parameter.[***FLOW PARAMETER NAME***]

Description
Specifies a parameter to use in the dataflow. For example, assume that you have the following entry in your connector configuration "parameter.Directory": "/mydir". In a case like this, any parameter context in the dataflow that has a parameter named Directory gets the specified value (/mydir). If the dataflow has child process groups, and those child process groups have their own parameter contexts, the value is used for all parameter contexts that contain a parameter named Directory. Parameters can also be applied to specific parameter contexts only. This can be done by prefixing the parameter name (Directory) with the name of the parameter context followed by a colon. For example, parameter.My Context:Directory only applies the specified value for the Directory parameter in the Parameter Context named My Context.
Default Value
Accepted Values
Required
false

working.directory

Description
Specifies a directory on the Connect server that NiFi should use for unpacking extensions that it needs to perform the dataflow. The contents of extensions.directory are unpacked here. Defaults to /tmp/nifi-stateless-working if not specified.
Default Value
/tmp/nifi-stateless-working
Accepted Values
Required
false

ADLS Sink properties reference

Review the following reference for a comprehensive list of the connector properties that are specific to the ADLS Sink connector.

The properties listed in this reference must be added to the connector configuration with the following prefix:
parameter.[***CONNECTOR NAME***] Parameters:

In addition to the properties listed here, this connector also accepts certain properties of the Kafka Connect framework as well as the properties of the NiFi Stateless Sink connector. When creating a new connector using the Streams Messaging Manager UI, all valid properties are presented in the default configuration template. You can view the configuration template to get a full list of valid properties. In addition, for more information regarding the accepted properties not listed here, you can review the Apache Kafka documentation and the Stateless NiFi Sink properties reference.

ADLS Account Key

Description
The Storage Account Key to use for authentication to Azure. Required when Account Key authentication is used.
Default Value
Accepted Values
Required
false

ADLS Account Name

Description
The name of the Azure Storage Account to use for authentication to Azure and the target account where the output file is uploaded.
Default Value
Accepted Values
Required
true

ADLS Directory Name

Description
The name of the directory where the output file is uploaded. Use an empty string to specify the root directory.
Default Value
Accepted Values
Required
true

ADLS Endpoint Suffix

Description
Specifies the endpoint suffix used for accessing the ADLS service.
Default Value
dfs.core.windows.net
Accepted Values
Required
true

ADLS Filesystem Name

Description
The name of the filesystem (or container) in the Azure Storage Account where the output file is uploaded.
Default Value
Accepted Values
Required
true

ADLS SAS Token

Description
The SAS Token to use for authentication to Azure. Required when SAS Token authentication is used.
Default Value
Accepted Values
Required
false

Avro Schema Write Strategy

Description
Specifies how the record schema is attached to the output data file. Applicable only for Avro output (Output File Data Format is set to Avro).
Do Not Write Schema
Neither the schema nor reference to the schema is attached to the output Avro messages.
Embed Avro Schema
The schema is embedded in every output Avro message.
HWX Content-Encoded Schema Reference
A reference to the schema (identified by Schema Name) within Schema Registry is encoded in the content of the outgoing Avro messages.
Default Value
Embed Avro Schema
Accepted Values
Embed Avro Schema, Do Not Write Schema, HWX Content-Encoded Schema Reference
Required
false

Azure Service Principal Client ID

Description
The Client ID of the Service Principal to use for authentication to Azure. Required when Service Principal authentication is used.
Default Value
Accepted Values
Required
false

Azure Service Principal Client Secret

Description
The Client Secret of the Service Principal to use for authentication to Azure. Required when Service Principal authentication is used.
Default Value
Accepted Values
Required
false

Azure Service Principal Tenant ID

Description
The Tenant ID of the Service Principal to use for authentication to Azure. Required when Service Principal authentication is used.
Default Value
Accepted Values
Required
false

Date Format

Description
Specifies the format to use when writing date fields to JSON or CSV.
Default Value
yyyy-MM-dd
Accepted Values
Required
true

Kafka Message Data Format

Description
Specifies the format of the messages the connector receives from Kafka. If set to Avro or JSON, record processing is enabled. Raw can be used for unstructured text or binary data.
Default Value
Avro
Accepted Values
Avro, JSON, Raw
Required
true

Kerberos Keytab

Description
The fully-qualified filename of the Kerberos keytab associated with the principal for accessing Schema Registry.
Default Value
The location of the default keytab which is empty and can only be used for unsecure connections.
Accepted Values
Required
true

Kerberos Principal

Description
The Kerberos principal used for authenticating to Schema Registry.
Default Value
default
Accepted Values
Required
true

Maximum File Size

Description
The maximum size of the output data file. No size limit is applied if this property is not specified. Example values: 100 MB, 1 GB.
Default Value
Accepted Values
Required
false

Maximum Number of Entries

Description
The maximum number of entries in the output data file. In the context of this property, entry can mean one of two things. If record processing is enabled (Output File Data Format is set to Avro or JSON), an entry is a record. Otherwise, entry means a Kafka message. Set this property to 1 if you want to create a separate output file for each Kafka message.
Default Value
1000000
Accepted Values
Required
true

Output File Data Format

Description
Specifies the format of the records written to the output file. Required when record processing is enabled (Kafka Message Data Format is set to Avro or JSON).
Default Value
Avro
Accepted Values
Avro, JSON, CSV
Required
false

Output File Demarcator

Description
Specifies the character sequence for demarcating (delimiting) message boundaries when multiple Kafka messages are ingested into an output file as raw messages (no record processing). This property can only be used if Kafka Message Data Format is set to Raw. If you want to use newline as the delimiter, set this property to \n.
Default Value
Accepted Values
Required
false

Output Filename Pattern

Description
Specifies the structure of the name of the output file. The pattern can contain string literal (fixed text) parts and one or more of the following expressions:
  • ${filename.uuid}: Inserts a generated UUID in the filename.
  • ${filename.timestamp}: Inserts the current timestamp in the filename.
  • ${filename.sequence}: Inserts an incrementing sequence value in the filename.

In order to generate unique filenames, either ${filename.uuid} or ${filename.sequence} must be used in the pattern.

Examples:
  • data_${filename.uuid}.json
  • records_${filename.timestamp}_${filename.sequence}.avro
Default Value
${filename.uuid}
Accepted Values
Required
false

Output Filename Sequence Initial Value

Description
This property is used to configure the initial value of the ${filename.sequence} expression. The value you set in this property is not the initial value of the sequence. The sequence starts at the value of this property +1. For example, if you set this property to 0, the sequence starts at 1.
Default Value
0
Accepted Values
Required
false

Output Filename Sequence Padding Length

Description
Specifies the length of the ${filename.sequence} expression in characters. If the sequence has fewer characters than the value set in this property, it is padded with zeros (0). Padding is added to the left of the sequence.
Default Value
6
Accepted Values
Required
false

Output Filename Timestamp Format

Description
The timestamp format to use for the ${filename.timestamp} expression. For example, yyyyMMdd_HHmmss_SSS.
Default Value
Accepted Values
Required
false

Schema Access Strategy

Description
Specifies the strategy used for determining the schema of the Kafka record. The value you set here depends on the data format set in Kafka Message Data Format.
  • If set to Schema Registry, the schema is read from Schema Registry. This setting can be used with both Avro and JSON formats.
  • If set to Infer Schema, the schema is inferred based on the input file. This setting can only be used if Kafka Message Data Format is JSON.
  • If set to Embedded Schema, the schema embedded in the input is used. This setting can only be used if Kafka Message Data Format is Avro.
  • If set to HWX Content-Encoded Schema Reference, the schema is read from Schema Registry. This setting can only be used if Kafka Message Data Format is Avro. In this case the Avro messages are expected to have a reference to the schema in Schema Registry encoded within the message content.
This property is not used if record processing is disabled (Kafka Message Data Format is set to Raw).
Default Value
Schema Registry
Accepted Values
Schema Registry, Infer Schema, Embedded Schema, HWX Content-Encoded Schema Reference
Required
true

Schema Branch

Description
The name of the branch to use when looking up the schema in Schema Registry. Schema Branch and Schema Version cannot be specified at the same time. If one is specified, the other needs to be removed from the configuration. If Schema Registry is not used, this property must be completely removed from the configuration.
Default Value
Accepted Values
Required
false

Schema Name

Description
The schema name to look up in Schema Registry. If the Schema Access Strategy property is set to Schema Registry, this property must contain a valid schema name. If Schema Registry is not used, this property must be completely removed from the configuration JSON.
Default Value
Accepted Values
Required
false

Schema Registry URL

Description
The URL of the Schema Registry server. If Schema Registry is not used, this property must be completely removed from the configuration JSON.
Default Value
http://localhost:7788/api/v1
Accepted Values
Required
true

Schema Version

Description
The version of the schema to look up in Schema Registry. If Schema Registry is used and a schema version is not specified, the latest version of the schema is retrieved. Schema Branch and Schema Version cannot be specified at the same time. If one is specified, the other needs to be removed from the configuration. If Schema Registry is not used, this property must be completely removed from the configuration.
Default Value
Accepted Values
Required
false

Time Format

Description
Specifies the format to use when writing Time fields to JSON or CSV.
Default Value
HH:mm:ss
Accepted Values
Required
true

Timestamp Format

Description
Specifies the format to use when writing Timestamp fields to JSON or CSV.
Default Value
yyyy-MM-dd HH:mm:ss.SSS
Accepted Values
Required
true

Truststore Filename

Description
The fully-qualified filename of a truststore. This truststore is used to establish a secure connection with Schema Registry using HTTPS.
Default Value
The location of the default truststore which is empty and can only be used for unsecure connections.
Accepted Values
Required
true

Truststore Password

Description
The password used to access the contents of the truststore configured in the Truststore Filename property.
Default Value
password
Accepted Values
Required
true

Truststore Type

Description
The type of the truststore configured in the Truststore Filename property.
Default Value
JKS
Accepted Values
BCFKS, PKCS12, JKS
Required
true

Use Azure Managed Identity

Description
Turns on authentication using Managed Identity.
Default Value
false
Accepted Values
true, false
Required
true