SFTP Source connector

The SFTP Source connector is a Stateless NiFi dataflow developed by Cloudera that is running in the Kafka Connect framework. Learn about the connector, its properties, and configuration.

The SFTP Source connector obtains files from an SFTP server and transfers them to Kafka. The SFTP Source connector supports record processing. In case record processing is enabled, input file is split into records and the records are transferred to Kafka. If record processing is not enabled, the connector forwards files to Kafka as they are fetched from the SFTP server.

If record processing is enabled, the input file can contain records in either JSON, CSV or Grok format. If the input records are in JSON or CSV format, the connector can either infer the schema of the records based on the data or read the schema from Schema Registry. If the input records are in Grok format, the connector can either derive the schema using the field names from the value of the Grok Expression property or read the schema from Schema Registry.

In addition, if record processing is enabled, the connector writes the records to Kafka in Avro format. How the record schema is attached to these Avro messages depends on the value of the Avro Schema Write Strategy property.

If Schema Registry is used, and it is on a Kerberized cluster, the krb5.file property must point to the krb5.conf file that provides access to the cluster on which Schema Registry is present. This means that the krb5.conf file must be on the same cluster node that the connector runs on. The Kerberos keytab used to access Schema Registry must also be on the same cluster node that the connector runs on. The connection to Schema Registry can be secured by TLS. The truststore file necessary for securing the connection must also be on the same cluster node that the connector runs on.

Properties and configuration

Configuration is passed to the connector in a JSON file during creation. The properties of the connector can be categorized into three groups. These are as follows:

Common connector properties
These are the properties of the Kafka Connect framework that are accepted by all connectors. For a comprehensive list of these properties, see the Apache Kafka documentation.
Stateless NiFi Source properties
These are the properties that are specific to the Stateless NiFi Source connector. All Stateless NiFi Source connectors share and accept these properties. For a comprehensive list of these properties, see the Stateless NiFi Source property reference.
Connector/dataflow-specific properties
These properties are unique to this specific connector. Or to be more precise, unique to the dataflow running within the connector. These properties use the following prefix:
parameter.[***CONNECTOR NAME***] Parameters:
For a comprehensive list of these properties, see the SFTP Source properties reference.

Notes and limitations

  • Required properties must be assigned a valid value even if they are not used in the particular configuration. If a required property is not used, either leave its default value, or completely remove the property from the configuration JSON.
  • If a property that has a default value is completely removed from the configuration JSON, the system uses the default value.
  • Properties not marked as required must be completely removed from the configuration JSON if not set.
  • The Schema Registry URL property is mandatory even if Schema Registry is not used. If Schema Registry is not used, use the default value, or completely remove the property from the configuration JSON.
  • Schema Branch and Schema Version can not be specified at the same time.
  • The value of the Schema Access Strategy property is not independent of the value of the Input Data Format property. As a result, you must exercise caution when configuring Schema Access Strategy.
    • If the value of Input Data Format is JSON, the possible values for Schema Access Strategy are Schema Registry or Infer Schema.
    • If the value of Input Data Format is CSV, the possible values for Schema Access Strategy are Schema Registry or Infer Schema.
    • If the value of Input Data Format is GROK, the possible values for Schema Access Strategy are Schema Registry or Field Names From Grok Expression.
  • If the Enable Record Processing property is set to false (record processing disabled), the entire input file is transferred to Kafka as one message.
  • If the Enable Record Processing property is set to true (record processing enabled), the output is always in Avro format. How the record schema is attached to these Avro messages depends on the value of the Avro Schema Write Strategy property.

Configuration example

In this example, the connector fetches files from an SFTP server that only requires Basic Authentication (username and password) and transfers these files to a Kafka topic.
{
 "connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSourceConnector",
 "meta.smm.predefined.flow.name": "SFTP Source",
 "meta.smm.predefined.flow.version": "1.0.0",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
 "tasks.max": "1",
 "nexus.url": "https://repository.cloudera.com/artifactory/repo",
 "extensions.directory": "/tmp/nifi-stateless-extensions",
 "working.directory": "/tmp/nifi-stateless-working",
 "topics": "[*** KAFKA TOPIC NAME***]",
 "parameter.SFTP Source Parameters:Hostname": "[***SFTP Server Hostname***]",
 "parameter.SFTP Source Parameters:Port": "22",
 "parameter.SFTP Source Parameters:Username": "[***SFTP USERNAME***]",
 "parameter.SFTP Source Parameters:Password": "[***SFTP PASSWORD***]",
 "parameter.SFTP Source Parameters:Remote Path": "[***FOLDER PATH***]",
 "parameter.SFTP Source Parameters:File Filter Regex": ".*",
 "parameter.SFTP Source Parameters:Enable Record Processing": "false"
}
The following list collects the properties from the configuration example that must be customized for this use case.
topics
The name of the Kafka topic that the connector sends messages to.
Hostname
The hostname of the remote system where the SFTP server runs. For example, my.sftp-server.com.
Port
The port that the remote system is listening on for file transfers.
Username
The username for connecting to the SFTP server.
Password
The password used to authenticate the user towards the SFTP server. In this example, the SFTP server requires Basic Authentication.
Remote Path
The path on the remote system that points to the directory from which to pull files. For example, /uploads.
File Filter Regex
The Java regular expression to use for filtering filenames. Only files whose names match the regular expression are fetched.
Enable Record Processing
Determines whether the contents of a file get parsed as records before sending them to Kafka. In this example, the property is set to false, meaning that the entire file gets forwarded to Kafka as one message.

Stateless NiFi Source properties reference

Review the following reference for a comprehensive list of the connector properties that are specific to the Stateless NiFi Source connector.

In addition to the properties listed here, Stateless NiFi connectors also accept the properties of the Kafka Connect framework. For a comprehensive list of these properties, see the Apache Kafka documentation.

dataflow.timeout

Description
Specifies the maximum amount of time to wait for the dataflow to complete. If the dataflow does not complete before this timeout, the thread is interrupted and the dataflow is considered as a failure. The session is rolled back and the connector retriggers the flow. Defaults to 60 seconds if not specified.
Default Value
60 seconds
Accepted Values
Required
false

extensions.directory

Description
Specifies the directory that stores downloaded extensions. Extensions are the NAR (NiFi Archive) files containing the processors and controller services a flow might use. Since Stateless NiFi is only the NiFi engine, it does not contain any of the processors and controller services you might use in your flow. When deploying the connector with the custom flow, the system needs to download the specific extensions that your flow uses from Nexus (unless they are already present in this directory). These extensions are stored in this directory. Because the default directory might not be writable, and to aid in upgrade scenarios, Cloudera recommends that you always specify an extensions directory.
Default Value
/tmp/nifi-stateless-extensions
Accepted Values
Required
true

flow.snapshot

Description
Specifies the dataflow to run. When using Streams Messaging Manager to deploy a connector, the value you set in this property must be a JSON object. URLs, file paths, or escaped JSON strings are not supported when using Streams Messaging Manager. Alternatively, if using the Kafka Connect REST API to deploy a connector, this can be a file containing the dataflow, a URL that points to a dataflow, or a string containing the entire dataflow as an escaped JSON. Cloudera however, does not recommend using the Kafka Connect REST API to interact with this connector or Kafka Connect.
Default Value
Accepted Values
Required
true

header.attribute.regex

Description
A Java regular expression that is evaluated against all flowfile attribute names. Any attribute name matching the regular expression is converted into a Kafka message header. The name of the attribute is used as the header key, the value of the attribute is used as the header value. If not specified, headers are not added to the Kafka record.
Default Value
Accepted Values
Required
false

header.name.regex

Description

A Java regular expression that will be evaluated against all flowfile attribute names. For any attribute whose name matches the regular expression, the Kafka record will have a header whose name matches the attribute name and whose value matches the attribute value. If not specified, the Kafka record will have no headers added to it.

Default Value
Accepted Values
Required
false

key.attribute

Description
Specifies the name of a flowfile attribute that should be used to specify the key of the Kafka record. If not specified, the Kafka record will not have a key associated with it. If specified, but the attribute does not exist on a particular flowfile, it will also have no key associated with it.
Default Value
Accepted Values
Required
false

krb5.file

Description
Specifies the krb5.conf file to use if the dataflow interacts with any services that are secured using Kerberos. Defaults to /etc/krb5.conf if not specified.
Default Value
/etc/krb5.conf
Accepted Values
Required
false

name

Description
The name of the connector. On the Streams Messaging Manager UI, the connector names are specified using the Enter Name field. The name that you enter in the Enter Name field is automatically set as the value of the name property when the connector is deployed. Because of this, the name property is omitted from the configuration template provided in Streams Messaging Manager. If you manually add the name property to the configuration in Streams Messaging Manager, ensure that the value you set matches the connector name specified in the Enter Name field. Otherwise, the connector fails to deploy.
Default Value
Accepted Values
Required
True

nexus.url

Description
Specifies the Base URL of the Nexus instance to source extensions from. If configuring a Nexus instance that has multiple repositories, include the name of the repository in the URL. For example, https://nexus-private.myorganization.org/nexus/repository/my-repository/. If the property is not specified, the necessary extensions (the ones used by the flow) must be provided in the extensions directory before deploying the connector.
Default Value
Accepted Values
Required
true

output.port

Description
The name of the output port in the NiFi dataflow to pull data from. If the dataflow contains exactly one port, this property is optional and can be omitted. However, if the dataflow contains multiple ports (for example, a success and a failure port), this property must be specified. If any flowfile is sent to any port other than the specified port, it is considered as a failure. The session is rolled back and no data is collected.
Default Value
Accepted Values
Required
false

parameter.[***FLOW PARAMETER NAME***]

Description
Specifies a parameter to use in the dataflow. For example, assume that you have the following entry in your connector configuration "parameter.Directory": "/mydir". In a case like this, any parameter context in the dataflow that has a parameter named Directory gets the specified value (/mydir). If the dataflow has child process groups, and those child process groups have their own parameter contexts, the value is used for all parameter contexts that contain a parameter named Directory. Parameters can also be applied to specific Parameter Contexts only. This can be done by prefixing the parameter name (Directory) with the name of the parameter context followed by a colon. For example, parameter.My Context:Directory only applies the specified value for the Directory parameter in the parameter context named My Context.
Default Value
Accepted Values
Required
false

topic.name.attribute

Description
Specifies the name of a flowfile attribute to use for determining which Kafka topic a flowfile is sent to. Either the topics or topic.name.attribute property must be specified. If both are specified, topic.name.attribute takes precedence. However, if a flowfile does not have the specified attribute name, then the connector falls back to using the topics property.
Default Value
Accepted Values
Required
false

topics

Description
The name of the topic to deliver data to. All flowfiles are delivered to the topic specified here. However, it is also possible to determine the topic individually for each flowfile. To do this, ensure that the dataflow specifies the topic name in an attribute, and then use topic.name.attribute to specify the name of the attribute instead of topic name. For example, if you wanted a separate Kafka topic for each data source, you can omit the topics property and instead specify the attribute (for example, datasource.hostname) corresponding to the topic using the topic.name.attribute property.
Default Value
Accepted Values
Required
true

working.directory

Description
Specifies a directory on the Connect server that NiFi should use for unpacking extensions that it needs to perform the dataflow. The contents of extensions.directory are unpacked here. Defaults to /tmp/nifi-stateless-working if not specified.
Default Value
/tmp/nifi-stateless-working
Accepted Values
Required
false

SFTP Source properties reference

Review the following reference for a comprehensive list of the connector properties that are specific to the SFTP Source connector.

The properties listed in this reference must be added to the connector configuration with the following prefix:
parameter.[***CONNECTOR NAME***] Parameters:

In addition to the properties listed here, this connector also accepts certain properties of the Kafka Connect framework as well as the properties of the NiFi Stateless Source connector. When creating a new connector using the Streams Messaging Manager UI, all valid properties are presented in the default configuration template. You can view the configuration template to get a full list of valid properties. In addition, for more information regarding the accepted properties not listed here, you can review the Apache Kafka documentation and the Stateless NiFi Source property reference.

Avro Schema Write Strategy

Description
Specifies whether the record schema is written to the output Avro file.
Default Value
Embed Avro Schema
Accepted Values
Embed Avro Schema, Do Not Write Schema, HWX Content-Encoded Schema Reference
Required
false

CSV Character Set

Description
The character set used to read the input CSV files.

This property is ignored if the input is not a CSV file or if record processing is not enabled.

Default Value
UTF-8
Accepted Values
Required
true

CSV Escape Character

Description
The escape character used in the input CSV files to escape other special characters.

This property is ignored if the input is not a CSV file or if record processing is not enabled.

Default Value
\
Accepted Values
Required
true

CSV Quote Character

Description
The quote character used in the input CSV files.

This property is ignored if the input is not a CSV file or if record processing is not enabled.

Default Value
"
Accepted Values
Required
true

CSV Record Separator

Description
The record separator used in the input CSV files.

This property is ignored if the input is not a CSV file or if record processing is not enabled.

Default Value
\n
Accepted Values
Required
true

CSV Treat First Line as Header

Description
Specifies whether the first line in the input file is handled as a header.

Ignored if the input is not a CSV file or record processing is not enabled.

Default Value
false
Accepted Values
true, false
Required
true

CSV Trim Fields

Description
Specifies whether whitespace characters are removed from the beginning and the end of fields.

Ignored if the input is not a CSV file or record processing is not enabled.

Default Value
true
Accepted Values
true, false
Required
true

CSV Value Separator

Description
The value separator used in the input CSV files.

Ignored if the input is not a CSV file or record processing is not enabled.

Default Value
,
Accepted Values
Required
true

Completion Strategy

Description
Specifies what to do with the original file on the server once it has been fetched.

If the Completion Strategy fails, a warning is logged but the data is still transferred.

Default Value
None
Accepted Values
None, Move File, Delete File
Required
true

Date Format

Description
Specifies the format used for parsing date fields in the input data.

This property is only used if Input Data Format is set to CSV or JSON.

Default Value
yyyy-MM-dd
Accepted Values
Required
true

Enable Record Processing

Description
Enables or disables record processing.

If set to true, the Input Data Format is considered and the file gets parsed into records. In this case the Records Per Kafka Message property defines how many records are written into one Kafka message.

If set to false, the entire file gets forwarded to Kafka as one message.

Default Value
true
Accepted Values
true, false
Required
true

File Filter Regex

Description
The Java regular expression to use for filtering filenames. Only files whose names match the regular expression are fetched.
Default Value
.*
Accepted Values
Required
true

Follow Symlink

Description
If set to true, both symbolic files and nested symbolic subdirectories are pulled. Otherwise, symbolic files are not read and symbolic link subdirectories are not traversed.
Default Value
false
Accepted Values
true, false
Required
true

Grok Expression

Description
Specifies the format of a line in Grok format. This allows the connector to understand how to parse each line in the input file. If a line in the file does not match this pattern, the line is handled according to what is set in the Grok No Match Behavior property.

A valid Grok expression must be specified using this property even if Grok format is not used.

Default Value
%{GREEDYDATA:message}
Accepted Values
Required
true

Grok No Match Behavior

Description
Specifies how to handle lines that do not match the pattern set in the Grok Expression property.

If set to append-to-previous-message, non-matching lines are appended to the last field of the previous message.

If set to skip-line, non-matching lines are skipped.

If set to raw-line, non-matching lines are only added to the _raw field.

Default Value
append-to-previous-message
Accepted Values
append-to-previous-message, skip-line, raw-line
Required
true

Host Key File

Description
The fully-qualified filename of the host key file.

If supplied, this file is used as the host key.

If a host key is not supplied, but Strict Host Key Checking is set to true, the known_hosts and known_hosts2 files from the ~/.ssh directory are used.

If a host key is not supplied and Strict Host Key Checking is set to false, no host key file is used.

This parameter must either contain the fully-qualified name of a file, or be completely removed from the configuration JSON.

Default Value
Accepted Values
Required
false

Hostname

Description
The hostname or IP address of the remote system.
Default Value
localhost
Accepted Values
Required
true

Ignored Dotted Files

Description
Specifies whether to ignore files whose names begin with a dot (".").
Default Value
true
Accepted Values
true, false
Required
true

Input Data Format

Description
The format in which the input file contains record-oriented data.

If Enable Record Processing is set to false, this setting is ignored.

Default Value
JSON
Accepted Values
JSON, CSV, GROK
Required
true

Kerberos Keytab for Schema Registry

Description
The fully-qualified filename of the kerberos keytab associated with the principal for accessing Schema Registry.
Default Value
The location of the default keytab which is empty and can only be used for unsecure connections.
Accepted Values
Required
true

Kerberos Principal for Schema Registry

Description
The Kerberos principal used for authenticating to Schema Registry.
Default Value
default
Accepted Values
Required
true

Move Destination Directory

Description
The fully-qualified name of the directory on the remote server to move the original file to once it is ingested. This property is ignored unless the Completion Strategy property is set to Move File. The specified directory must already exist on the remote system.

This parameter must either contain the fully-qualified name of a directory, or be completely removed from the configuration JSON.

Default Value
Accepted Values
Required
false

Password

Description
The password to use when connecting to the SFTP server.

If the server does not require a password, this property must be completely removed from the configuration JSON.

Default Value
Accepted Values
Required
false

Path Filter Regex

Description
The Java Regular Expression to use for filtering paths.

If Search Recursively is set to true, only subdirectories whose path matches the given regular expression are scanned.

If Search Recursively is set to false, this property is ignored.

Default Value
.*
Accepted Values
Required
true

Port

Description
The port that the remote system is listening on for file transfers.
Default Value
22
Accepted Values
Required
true

Private Key File

Description
The fully-qualified filename of a private key file.

If no private key is used, this property must be completely removed from the configuration JSON.

Default Value
Accepted Values
Required
false

Private Key Password

Description
The password used to access the private key.

If no private key is used, this property must be completely removed from the configuration JSON.

Default Value
Accepted Values
Required
false

Record Per Kafka Message

Description
Specifies how many records to write into each Kafka message.

If Enable Record Processing is set to false, this setting is ignored.

Default Value
1
Accepted Values
Required
true

Remote Path

Description
The path on the remote system from which to pull files.
Default Value
.
Accepted Values
Required
true

Schema Access Strategy

Description
Specifies the strategy used for determining the schema of the input records if the Enable Record Processing property is set to true .

The value you set here depends on the input data format.

If set to Schema Registry, the schema is read from Schema Registry.

This setting works with all input data formats.

If set to Infer Schema, the schema is inferred based on the input file. This setting can only be used if your input data format is either JSON or CSV.

If set to Field Names From Grok Expression, the schema is determined using the field names in the Grok Expression property. This setting can only be used if your input data format is GROK.

Default Value
Schema Registry
Accepted Values
Schema Registry, Infer Schema, Field Names From Grok Expression
Required
true

Schema Branch

Description
The name of the branch to use when looking up the schema in Schema Registry. Schema Branch and Schema Version cannot be specified at the same time. If one is specified, the other needs to be removed from the configuration. If Schema Registry is not used, this property must be completely removed from the configuration.
Default Value
Accepted Values
Required
false

Schema Name

Description
The schema name to look up in Schema Registry.

If the Schema Access Strategy property is set to Schema Registry, this property must contain a valid schema name.

If Schema Registry is not used, this property must be completely removed from the configuration JSON.

Default Value
Accepted Values
Required
false

Schema Registry URL

Description
The URL of the Schema Registry server.

If Schema Registry is not used, use the default value.

Default Value
http://localhost:7788/api/v1
Accepted Values
Required
true

Schema Version

Description
The version of the schema to look up in Schema Registry. If Schema Registry is used and a schema version is not specified, the latest version of the schema is retrieved. Schema Branch and Schema Version cannot be specified at the same time. If one is specified, the other needs to be removed from the configuration. If Schema Registry is not used, this property must be completely removed from the configuration.
Default Value
Accepted Values
Required
true

Search Recursively

Description
Specifies whether to pull files from arbitrarily nested subdirectories. Subdirectories are not traversed if set to false.
Default Value
false
Accepted Values
true, false
Required
true

Strict Host Key Checking

Description
Specifies whether strict enforcement of host keys is applied.
Default Value
false
Accepted Values
true, false
Required
true

Time Format

Description
Specifies the format used for parsing time fields in the input data. This property is only used if Input Data Format is set to CSV or JSON.
Default Value
HH:mm:ss
Accepted Values
Required
true

Timestamp Format

Description
Specifies the format used for parsing timestamp fields in the input data. This property is only used if Input Data Format is set to CSV or JSON.
Default Value
yyyy-MM-dd HH:mm:ss.SSS
Accepted Values
Required
true

Truststore Filename for Schema Registry

Description
The fully-qualified filename of a truststore. This truststore is used to establish a secure connection with Schema Registry using TLS.
Default Value
The location of the default truststore which is empty and can only be used for unsecure connections.
Accepted Values
Required
true

Truststore Password for Schema Registry

Description
The password used to access the contents of the truststore configured in the Truststore Filename for Schema Registry property.
Default Value
password
Accepted Values
Required
true

Truststore Type for Schema Registry

Description
The type of the truststore configured in the Truststore Filename for Schema Registry property.
Default Value
Accepted Values
BCFKS, PKCS12, JKS
Required
true

Username

Description
Username for connecting to the SFTP server.
Default Value
Accepted Values
Required
true