HDFS Sink

Learn more about the HDFS Sink connector.

The HDFS Sink connector can be used to transfer data from Kafka topics to files on HDFS clusters. Each partition of every topic results in a collection of files named in the following pattern:
{topic name}_{partition number}_{end_offset}.{file extension}
For example, running the HDFS Sink connector on partition 0 of a topic named sourceTopic can yield the following series of files:
sourceTopic_0_50.avro - for record 0 ~ 50
sourceTopic_0_79.avro - holding record 51 ~ 79
...
The HDFS Sink connector periodically commits records to final result files. Each commit results in a separate "chunk" file.

Configuration example for writing data to HDFS

A simple configuration example for the HDFS Sink connector.

The following is a simple configuration example for the HDFS Sink connector. Short descriptions of the properties set in this example are also provided. For a full properties reference, see the HDFS Sink properties reference.

{
    "connector.class": "com.cloudera.dim.kafka.connect.hdfs.HdfsSinkConnector",
    "tasks.max": 1,
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "com.cloudera.dim.kafka.connect.converts.AvroConverter",
    "value.converter.passthrough.enabled": true,
    "value.converter.schema.registry.url": "http://localhost:9090/api/v1",
    "topics": "avro_topic",
    "hdfs.uri": "hdfs://my-host.my-realm.com:8020",
    "hdfs.output": "/topics_output/",
    "output.writer": "com.cloudera.dim.kafka.connect.partition.writers.avro.AvroPartitionWriter",
    "output.avro.passthrough.enabled": true,
    "hdfs.kerberos.authentication": true,
    "hdfs.kerberos.user.principal": "user_account@MY-REALM.COM",
    "hdfs.kerberos.keytab.path": "/path/to/user_account.keytab",
    "hdfs.kerberos.namenode.principal": "hdfs/_HOST@MY-REALM.COM",
    "hadoop.conf.path": "/etc/hadoop/"
  }
connector.class
Class name of the HDFS Sink connector.
key.converter
The converter capable of understanding the data format of the key of each record on this topic.
value.converter
The converter capable of understanding the data format of the value of each record on this topic.
value.converter.passthrough.enabled
This property controls whether or not data is converted into the Kafka Connect intermediate data format before writing into an output file. Because in this example the input and output format is the same, the property is set to true, that is, data is not converted.
value.converter.schema.registry.url
The URL to Schema Registry. This is a mandatory property if the topic has records encoded in Avro format.
topics
List of topics to consume data from.
hdfs.uri
The URI to the namenode of the HDFS cluster.
hdfs.output
The destination folder on the HDFS cluster where output files will reside.
output.writer
Determines the output file format. Because in this example the output format is Avro, AvroPartitionWriter is used.
output.avro.passthrough.enabled
This property has to match the configuration of the value.converter.passthrough.enabled property because both the input and output formats are Avro.
hdfs.kerberos.authentication
Enables or disables kerberos authentication.
hdfs.kerberos.user.principal
The user principal that the Kafka Connect role will use.
hdfs.kerberos.keytab.path
The path to the kerberos keytab file.
hdfs.kerberos.namenode.principal
The Kerberos principal used by the namenode. This is necessary when the HDFS cluster has data encryption turned on.
hadoop.conf.path
The path to the hadoop configuration files. This is necessary when the HDFS cluster has data encryption turned on.

Configuration example for writing data to Ozone FS

A simple configuration example for the HDFS Sink connector that writes data to the Ozone FS.

The following is a simple configuration example for the HDFS Sink connector. In this example data is written to the Ozone FS. Short descriptions of the properties set in this example are also provided. For a full properties reference, see the HDFS Sink properties reference.

{
    "connector.class": "com.cloudera.dim.kafka.connect.hdfs.HdfsSinkConnector",
    "tasks.max": 1,
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "com.cloudera.dim.kafka.connect.converts.AvroConverter",
    "value.converter.passthrough.enabled": true,
    "value.converter.schema.registry.url": "http://localhost:9090/api/v1",
    "topics": "avro_topic",
    "hdfs.uri": "ofs://ozone1/volume1/bucket1/",
    "hdfs.output": "/topics_output/",
    "output.writer": "com.cloudera.dim.kafka.connect.hdfs.avro.AvroPartitionWriter",
    "output.avro.passthrough.enabled": true,
    "hdfs.kerberos.authentication": true,
    "hdfs.kerberos.user.principal": "user_account@MY-REALM.COM",
    "hdfs.kerberos.keytab.path": "/path/to/user_account.keytab",
    "hadoop.conf.path": "/etc/hadoop/"
  }
connector.class
Class name of the HDFS Sink connector.
key.converter
The converter capable of understanding the data format of the key of each record on this topic.
value.converter
The converter capable of understanding the data format of the value of each record on this topic.
value.converter.passthrough.enabled
This property controls whether or not data is converted into the Kafka Connect intermediate data format before writing into an output file. Because in this example the input and output format is the same, the property is set to true, that is, data is not converted.
value.converter.schema.registry.url
The URL to Schema Registry. This is a mandatory property if the topic has records encoded in Avro format.
topics
List of topics to consume data from.
hdfs.uri
The Ozone FS (ofs) URI.
hdfs.output
The destination folder on the HDFS cluster where output files will reside.
output.writer
Determines the output file format. Because in this example the output format is Avro, AvroPartitionWriter is used.
output.avro.passthrough.enabled
This property has to match the configuration of the value.converter.passthrough.enabled property because both the input and output formats are Avro.
hdfs.kerberos.authentication
Enables or disables kerberos authentication.
hdfs.kerberos.user.principal
The user principal that the Kafka Connect role will use.
hdfs.kerberos.keytab.path
The path to the kerberos keytab file.
hadoop.conf.path
The path to the hadoop configuration files. This is necessary when the HDFS cluster has data encryption turned on.

Stateless NiFi Sink properties reference

Review the following reference for a comprehensive list of the connector properties that are specific to the Stateless NiFi Sink connector.

In addition to the properties listed here, Stateless NiFi connectors also accept the properties of the Kafka Connect framework. For a comprehensive list of these properties, see the Apache Kafka documentation.

attribute.prefix

Description
The prefix to add to the key of each header that matches the regular expression specified in headers.as.attributes.regex. For example, if the header key is MyHeader, its value is MyValue, headers.as.attributes.regex is set to My.*, and this property is set to kafka, the flowfile that is created for the Kafka message will have an attribute named kafka.MyHeader with a value of MyValue.
Default Value
Accepted Values
Required
false

dataflow.timeout

Description
Specifies the maximum amount of time to wait for the dataflow to complete. If the dataflow does not complete before this timeout, the thread is interrupted and the dataflow is considered as a failure. The session is rolled back and the connector retriggers the flow. Defaults to 60 seconds if not specified.
Default Value
60 seconds
Accepted Values
Required
false

extensions.directory

Description
Specifies the directory that stores downloaded extensions. Extensions are the NAR (NiFi Archive) files containing the processors and controller services a flow might use. Since Stateless NiFi is only the NiFi engine, it does not contain any of the processors and controller services you might use in your flow. When deploying the connector with the custom flow, the system needs to download the specific extensions that your flow uses from Nexus (unless they are already present in this directory). These extensions are stored in this directory. Because the default directory might not be writable, and to aid in upgrade scenarios, Cloudera recommends that you always specify an extensions directory.
Default Value
/tmp/nifi-stateless-extensions
Accepted Values
Required
true

failure.ports

Description
A comma separated list of output ports that are considered as failure conditions. If any flowfile is routed to an output port specified in this property, the dataflow is considered a failure and the session is rolled back. After a set amount of time, the dataflow reattempts to process the Kafka record. Any data transferred to an output port that is not in the list of failure ports is discarded.

Because of how Stateless NiFi Sink connectors behave, even if a single flowfile ends up in an output port that is marked as failure, the entire sessions is rolled back with all messages in the batch. Furthermore, if a flowfile ends up in a failure port in each subsequent iteration, the result is an endless loop. With some sink connectors (for example. MQTT Sink) this is the desired behavior. For more information regarding this behavior, see Dataflow execution and scheduling.

Default Value
Accepted Values
Required
false

flow.snapshot

Description
Specifies the dataflow to run. When using Streams Messaging Manager to deploy a connector, the value you set in this property must be a JSON object. URLs, file paths, or escaped JSON strings are not supported when using Streams Messaging Manager. Alternatively, if using the Kafka Connect REST API to deploy a connector, this can be a file containing the dataflow, a URL that points to a dataflow, or a string containing the entire dataflow as an escaped JSON. Cloudera however, does not recommend using the Kafka Connect REST API to interact with this connector or Kafka Connect.
Default Value
Accepted Values
Required
true

headers.as.attributes.regex

Description
A Java regular expression that is evaluated against all Kafka record headers. Headers are added to the flowfile as an attribute if the header key matches the regular expression. The header key is used as the attribute name. The header value is used as the attribute value. Additionally, the name of the attribute can also contain an optional prefix which is defined by the attribute.prefix property.
Default Value
Accepted Values
Required
false

input.port

Description
The name of the input port in the NiFi dataflow that Kafka records are sent to. If the dataflow contains exactly one input port, this property is optional and can be omitted. However, if the dataflow contains multiple input ports, this property must be specified.
Default Value
Accepted Values
Required
false

krb5.file

Description
Specifies the krb5.conf file to use if the dataflow interacts with any services that are secured using Kerberos. Defaults to /etc/krb5.conf if not specified.
Default Value
/etc/krb5.conf
Accepted Values
Required
false

name

Description
The name of the connector. On the Streams Messaging Manager UI, the connector names are specified using the Enter Name field. The name that you enter in the Enter Name field is automatically set as the value of the name property when the connector is deployed. Because of this, the name property is omitted from the configuration template provided in Streams Messaging Manager. If you manually add the name property to the configuration in Streams Messaging Manager, ensure that the value you set matches the connector name specified in the Enter Name field. Otherwise, the connector fails to deploy.
Default Value
Accepted Values
Required
true

nexus.url

Description
Specifies the Base URL of the Nexus instance to source extensions from. If configuring a Nexus instance that has multiple repositories, include the name of the repository in the URL. For example, https://nexus-private.myorganization.org/nexus/repository/my-repository/. If the property is not specified, the necessary extensions (the ones used by the flow) must be provided in the extensions directory before deploying the connector.
Default Value
Accepted Values
Required
true

parameter.[***FLOW PARAMETER NAME***]

Description
Specifies a parameter to use in the dataflow. For example, assume that you have the following entry in your connector configuration "parameter.Directory": "/mydir". In a case like this, any parameter context in the dataflow that has a parameter named Directory gets the specified value (/mydir). If the dataflow has child process groups, and those child process groups have their own parameter contexts, the value is used for all parameter contexts that contain a parameter named Directory. Parameters can also be applied to specific parameter contexts only. This can be done by prefixing the parameter name (Directory) with the name of the parameter context followed by a colon. For example, parameter.My Context:Directory only applies the specified value for the Directory parameter in the Parameter Context named My Context.
Default Value
Accepted Values
Required
false

working.directory

Description
Specifies a directory on the Connect server that NiFi should use for unpacking extensions that it needs to perform the dataflow. The contents of extensions.directory are unpacked here. Defaults to /tmp/nifi-stateless-working if not specified.
Default Value
/tmp/nifi-stateless-working
Accepted Values
Required
false

HDFS Sink properties reference

HDFS Sink connector properties reference.

The following table collects connector properties that are specific for the HDFS Sink Connector. For properties common to all sink connectors, see the upstream Apache Kafka documentation.

Property Name Description Type Default Value Accepted Values Recommended Values
hdfs.uri The file system URI to connect to on the destination cluster. This property supports any valid Hadoop-compatible filesystem (HCFS, For example, HDFS or ofs) URI. String None
hdfs.output The root directory on the HDFS cluster where all the output files will reside. The sub path has the following pattern: {topic}/{topic}_{partition}_{endoffset}.{file extension} String /tmp Any path on the HDFS file system where the role has read write permission.
hdfs.kerberos.authentication Enables or disables secure access to the HDFS cluster by authenticating with Kerberos. Boolean false true or false
hdfs.kerberos.user.principal The kerberos user principal. String null The host-dependent Kerberos principal assigned to the Kafka Connect role.
hdfs.kerberos.keytab.path The path to the Kerberos keytab file. String null In a Cloudera Manager provisioned environment, it’s recommended to use the Cloudera Manager Config Provider to automatically provision the path.
hdfs.kerberos.namenode.principal The kerberos name node principal. Required when the HDFS cluster has data encryption on. String null
hadoop.conf.path The path to the site specific Hadoop configuration XML files. Required when the HDFS cluster has data encryption on. String null
output.writer The output file writer which determines the type of file to be written to the HDFS cluster. The value of this property should be the FQCN of a class that implements the PartitionWriter interface. String null
  • com.cloudera.dim.kafka.connect.partition.writers.avro.AvroPartitionWriter
  • com.cloudera.dim.kafka.connect.partition.writers.json.JsonPartitionWriter
  • com.cloudera.dim.kafka.connect.hdfs.parquet.ParquetPartitionWriter
  • com.cloudera.dim.kafka.connect.partition.writers.txt.TxtPartitionWriter
com.cloudera.dim.kafka.connect.partition.writers.avro.AvroPartitionWriter
output.avro.passthrough.enabled Configures whether the output writer expects an Avro encoded Kafka Connect data record. Must match the configuration of value.converter.passthrough.enabled. Boolean true true or false True if input and output are both Avro.
value.converter The converter to be used to translate the value field of the source Kafka record into Kafka Connect Data format. String Inherited from Kafka Connect worker properties.
  • org.apache.kafka.connect.json.JsonConverter
  • org.apache.kafka.connect.storage.StringConverter
  • com.cloudera.dim.kafka.connect.converts.AvroConverter
com.cloudera.dim.kafka.connect.converts.AvroConverter
value.converter.schema.registry.url The URL to the Schema Registry server. String null true or false
value.converter.passthrough.enabled Configures whether the AvroConverter translates an Avro record into Kafka Connect Data or transparently passes the Avro encoded bytes as payload. Boolean true true or false True if input and output are both Avro.