SFTP Source connector
The SFTP Source connector is a Stateless NiFi dataflow developed by Cloudera that is running in the Kafka Connect framework. Learn about the connector, its properties, and configuration.
The SFTP Source connector obtains files from an SFTP server and transfers them to Kafka. The SFTP Source connector supports record processing. In case record processing is enabled, input file is split into records and the records are transferred to Kafka. If record processing is not enabled, the connector forwards files to Kafka as they are fetched from the SFTP server.
If record processing is enabled, the input file can contain records in either JSON, CSV or
Grok format. If the input records are in JSON or CSV format, the connector can either infer
the schema of the records based on the data or read the schema from Schema Registry. If the
input records are in Grok format, the connector can either derive the schema using the field
names from the value of the Grok Expression
property or read the schema from
Schema Registry.
In addition, if record processing is enabled, the connector writes the records to Kafka in
Avro
format. How the record schema is attached to these Avro messages
depends on the value of the Avro Schema Write Strategy
property.
If Schema Registry is used, and it is on a Kerberized cluster, the krb5.file
property must point to the krb5.conf
file that provides access to the cluster
on which Schema Registry is present.
This
means that the krb5.conf
file must be on the same cluster node that the
connector runs on. The Kerberos keytab used to access Schema Registry must also be on the same
cluster node that the connector runs on. The connection to Schema Registry can be secured by
TLS. The truststore file necessary for securing the connection must also be on the same
cluster node that the connector runs on.
Properties and configuration
Configuration is passed to the connector in a JSON file during creation. The properties of the connector can be categorized into three groups. These are as follows:
- Common connector properties
- These are the properties of the Kafka Connect framework that are accepted by all connectors. For a comprehensive list of these properties, see the Apache Kafka documentation.
- Stateless NiFi Source properties
- These are the properties that are specific to the Stateless NiFi Source connector. All Stateless NiFi Source connectors share and accept these properties. For a comprehensive list of these properties, see the Stateless NiFi Source property reference.
- Connector/dataflow-specific properties
- These properties are unique to this specific connector. Or to be more precise, unique
to the dataflow running within the connector. These properties use the following
prefix:
For a comprehensive list of these properties, see the SFTP Source properties reference.parameter.[***CONNECTOR NAME***] Parameters:
Notes and limitations
- Required properties must be assigned a valid value even if they are not used in the particular configuration. If a required property is not used, either leave its default value, or completely remove the property from the configuration JSON.
- If a property that has a default value is completely removed from the configuration JSON, the system uses the default value.
- Properties not marked as required must be completely removed from the configuration JSON if not set.
- The
Schema Registry URL
property is mandatory even if Schema Registry is not used. If Schema Registry is not used, use the default value, or completely remove the property from the configuration JSON. Schema Branch
andSchema Version
can not be specified at the same time.- The value of the
Schema Access Strategy
property is not independent of the value of theInput Data Format
property. As a result, you must exercise caution when configuringSchema Access Strategy
.- If the value of
Input Data Format
isJSON
, the possible values forSchema Access Strategy
areSchema Registry
orInfer Schema
. - If the value of
Input Data Format
isCSV
, the possible values forSchema Access Strategy
areSchema Registry
orInfer Schema
. - If the value of
Input Data Format
isGROK
, the possible values forSchema Access Strategy
areSchema Registry
orField Names From Grok Expression
.
- If the value of
- If the
Enable Record Processing
property is set tofalse
(record processing disabled), the entire input file is transferred to Kafka as one message. - If the
Enable Record Processing
property is set totrue
(record processing enabled), the output is always inAvro
format. How the record schema is attached to these Avro messages depends on the value of theAvro Schema Write Strategy
property.
Configuration example
{
"connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSourceConnector",
"meta.smm.predefined.flow.name": "SFTP Source",
"meta.smm.predefined.flow.version": "1.0.0",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"tasks.max": "1",
"nexus.url": "https://repository.cloudera.com/artifactory/repo",
"extensions.directory": "/tmp/nifi-stateless-extensions",
"working.directory": "/tmp/nifi-stateless-working",
"topics": "[*** KAFKA TOPIC NAME***]",
"parameter.SFTP Source Parameters:Hostname": "[***SFTP Server Hostname***]",
"parameter.SFTP Source Parameters:Port": "22",
"parameter.SFTP Source Parameters:Username": "[***SFTP USERNAME***]",
"parameter.SFTP Source Parameters:Password": "[***SFTP PASSWORD***]",
"parameter.SFTP Source Parameters:Remote Path": "[***FOLDER PATH***]",
"parameter.SFTP Source Parameters:File Filter Regex": ".*",
"parameter.SFTP Source Parameters:Enable Record Processing": "false"
}
topics
- The name of the Kafka topic that the connector sends messages to.
Hostname
- The hostname of the remote system where the SFTP server runs. For example,
my.sftp-server.com
. Port
- The port that the remote system is listening on for file transfers.
Username
- The username for connecting to the SFTP server.
Password
- The password used to authenticate the user towards the SFTP server. In this example, the SFTP server requires Basic Authentication.
Remote Path
- The path on the remote system that points to the directory from which to pull files.
For example,
/uploads
. File Filter Regex
- The Java regular expression to use for filtering filenames. Only files whose names match the regular expression are fetched.
Enable Record Processing
- Determines whether the contents of a file get parsed as records before sending them
to Kafka. In this example, the property is set to
false
, meaning that the entire file gets forwarded to Kafka as one message.
Stateless NiFi Source properties reference
Review the following reference for a comprehensive list of the connector properties that are specific to the Stateless NiFi Source connector.
In addition to the properties listed here, Stateless NiFi connectors also accept the properties of the Kafka Connect framework. For a comprehensive list of these properties, see the Apache Kafka documentation.
dataflow.timeout
- Description
- Specifies the maximum amount of time to wait for the dataflow to complete. If the dataflow does not complete before this timeout, the thread is interrupted and the dataflow is considered as a failure. The session is rolled back and the connector retriggers the flow. Defaults to 60 seconds if not specified.
- Default Value
- 60 seconds
- Accepted Values
- Required
- false
extensions.directory
- Description
- Specifies the directory that stores downloaded extensions. Extensions are the NAR (NiFi Archive) files containing the processors and controller services a flow might use. Since Stateless NiFi is only the NiFi engine, it does not contain any of the processors and controller services you might use in your flow. When deploying the connector with the custom flow, the system needs to download the specific extensions that your flow uses from Nexus (unless they are already present in this directory). These extensions are stored in this directory. Because the default directory might not be writable, and to aid in upgrade scenarios, Cloudera recommends that you always specify an extensions directory.
- Default Value
- /tmp/nifi-stateless-extensions
- Accepted Values
- Required
- true
flow.snapshot
- Description
- Specifies the dataflow to run. When using Streams Messaging Manager to deploy a connector, the value you set in this property must be a JSON object. URLs, file paths, or escaped JSON strings are not supported when using Streams Messaging Manager. Alternatively, if using the Kafka Connect REST API to deploy a connector, this can be a file containing the dataflow, a URL that points to a dataflow, or a string containing the entire dataflow as an escaped JSON. Cloudera however, does not recommend using the Kafka Connect REST API to interact with this connector or Kafka Connect.
- Default Value
- Accepted Values
- Required
- true
header.attribute.regex
- Description
- A Java regular expression that is evaluated against all flowfile attribute names. Any attribute name matching the regular expression is converted into a Kafka message header. The name of the attribute is used as the header key, the value of the attribute is used as the header value. If not specified, headers are not added to the Kafka record.
- Default Value
- Accepted Values
- Required
- false
header.name.regex
- Description
-
A Java regular expression that will be evaluated against all flowfile attribute names. For any attribute whose name matches the regular expression, the Kafka record will have a header whose name matches the attribute name and whose value matches the attribute value. If not specified, the Kafka record will have no headers added to it.
- Default Value
- Accepted Values
- Required
- false
key.attribute
- Description
- Specifies the name of a flowfile attribute that should be used to specify the key of the Kafka record. If not specified, the Kafka record will not have a key associated with it. If specified, but the attribute does not exist on a particular flowfile, it will also have no key associated with it.
- Default Value
- Accepted Values
- Required
- false
krb5.file
- Description
- Specifies the
krb5.conf
file to use if the dataflow interacts with any services that are secured using Kerberos. Defaults to/etc/krb5.conf
if not specified. - Default Value
- /etc/krb5.conf
- Accepted Values
- Required
- false
name
- Description
- The name of the connector. On the Streams Messaging Manager UI, the
connector names are specified using the Enter Name field. The
name that you enter in the Enter Name field is automatically set
as the value of the
name
property when the connector is deployed. Because of this, the name property is omitted from the configuration template provided in Streams Messaging Manager. If you manually add the name property to the configuration in Streams Messaging Manager, ensure that the value you set matches the connector name specified in the Enter Name field. Otherwise, the connector fails to deploy. - Default Value
- Accepted Values
- Required
- True
nexus.url
- Description
- Specifies the Base URL of the Nexus instance to source extensions from. If configuring
a Nexus instance that has multiple repositories, include the name of the repository in
the URL. For example,
https://nexus-private.myorganization.org/nexus/repository/my-repository/
. If the property is not specified, the necessary extensions (the ones used by the flow) must be provided in the extensions directory before deploying the connector. - Default Value
- Accepted Values
- Required
- true
output.port
- Description
- The name of the output port in the NiFi dataflow to pull data from. If the dataflow contains exactly one port, this property is optional and can be omitted. However, if the dataflow contains multiple ports (for example, a success and a failure port), this property must be specified. If any flowfile is sent to any port other than the specified port, it is considered as a failure. The session is rolled back and no data is collected.
- Default Value
- Accepted Values
- Required
- false
parameter.[***FLOW PARAMETER NAME***]
- Description
- Specifies a parameter to use in the dataflow. For example, assume that you have the
following entry in your connector configuration
"parameter.Directory": "/mydir".
In a case like this, any parameter context in the dataflow that has a parameter namedDirectory
gets the specified value (/mydir
). If the dataflow has child process groups, and those child process groups have their own parameter contexts, the value is used for all parameter contexts that contain a parameter namedDirectory
. Parameters can also be applied to specific Parameter Contexts only. This can be done by prefixing the parameter name (Directory
) with the name of the parameter context followed by a colon. For example,parameter.My Context:Directory
only applies the specified value for theDirectory
parameter in the parameter context named My Context. - Default Value
- Accepted Values
- Required
- false
topic.name.attribute
- Description
- Specifies the name of a flowfile attribute to use for determining which Kafka topic a
flowfile is sent to. Either the
topics
ortopic.name.attribute
property must be specified. If both are specified,topic.name.attribute
takes precedence. However, if a flowfile does not have the specified attribute name, then the connector falls back to using the topics property. - Default Value
- Accepted Values
- Required
- false
topics
- Description
- The name of the topic to deliver data to. All flowfiles are delivered to the topic
specified here. However, it is also possible to determine the topic individually for
each flowfile. To do this, ensure that the dataflow specifies the topic name in an
attribute, and then use
topic.name.attribute
to specify the name of the attribute instead of topic name. For example, if you wanted a separate Kafka topic for each data source, you can omit thetopics
property and instead specify the attribute (for example,datasource.hostname
) corresponding to the topic using thetopic.name.attribute
property. - Default Value
- Accepted Values
- Required
- true
working.directory
- Description
- Specifies a directory on the Connect server that NiFi should use for unpacking
extensions that it needs to perform the dataflow. The contents of
extensions.directory
are unpacked here. Defaults to/tmp/nifi-stateless-working
if not specified. - Default Value
- /tmp/nifi-stateless-working
- Accepted Values
- Required
- false
SFTP Source properties reference
Review the following reference for a comprehensive list of the connector properties that are specific to the SFTP Source connector.
parameter.[***CONNECTOR NAME***] Parameters:
In addition to the properties listed here, this connector also accepts certain properties of the Kafka Connect framework as well as the properties of the NiFi Stateless Source connector. When creating a new connector using the Streams Messaging Manager UI, all valid properties are presented in the default configuration template. You can view the configuration template to get a full list of valid properties. In addition, for more information regarding the accepted properties not listed here, you can review the Apache Kafka documentation and the Stateless NiFi Source property reference.
Avro Schema Write Strategy
- Description
- Specifies whether the record schema is written to the output Avro file.
- Default Value
- Embed Avro Schema
- Accepted Values
- Embed Avro Schema, Do Not Write Schema, HWX Content-Encoded Schema Reference
- Required
- false
CSV Character Set
- Description
- The character set used to read the input CSV files.
This property is ignored if the input is not a CSV file or if record processing is not enabled.
- Default Value
- UTF-8
- Accepted Values
- Required
- true
CSV Escape Character
- Description
- The escape character used in the input CSV files to escape other special characters.
This property is ignored if the input is not a CSV file or if record processing is not enabled.
- Default Value
- \
- Accepted Values
- Required
- true
CSV Quote Character
- Description
- The quote character used in the input CSV files.
This property is ignored if the input is not a CSV file or if record processing is not enabled.
- Default Value
- "
- Accepted Values
- Required
- true
CSV Record Separator
- Description
- The record separator used in the input CSV files.
This property is ignored if the input is not a CSV file or if record processing is not enabled.
- Default Value
- \n
- Accepted Values
- Required
- true
CSV Treat First Line as Header
- Description
- Specifies whether the first line in the input file is handled as a header.
Ignored if the input is not a CSV file or record processing is not enabled.
- Default Value
- false
- Accepted Values
- true, false
- Required
- true
CSV Trim Fields
- Description
- Specifies whether whitespace characters are removed from the beginning and the end of
fields.
Ignored if the input is not a CSV file or record processing is not enabled.
- Default Value
- true
- Accepted Values
- true, false
- Required
- true
CSV Value Separator
- Description
- The value separator used in the input CSV files.
Ignored if the input is not a CSV file or record processing is not enabled.
- Default Value
- ,
- Accepted Values
- Required
- true
Completion Strategy
- Description
- Specifies what to do with the original file on the server once it has been fetched.
If the Completion Strategy fails, a warning is logged but the data is still transferred.
- Default Value
- None
- Accepted Values
- None, Move File, Delete File
- Required
- true
Date Format
- Description
- Specifies the format used for parsing date fields in the input data.
This property is only used if
Input Data Format
is set toCSV
orJSON
. - Default Value
- yyyy-MM-dd
- Accepted Values
- Required
- true
Enable Record Processing
- Description
- Enables or disables record processing.
If set to
true
, theInput Data Format
is considered and the file gets parsed into records. In this case theRecords Per Kafka Message
property defines how many records are written into one Kafka message.If set to
false
, the entire file gets forwarded to Kafka as one message. - Default Value
- true
- Accepted Values
- true, false
- Required
- true
File Filter Regex
- Description
- The Java regular expression to use for filtering filenames. Only files whose names match the regular expression are fetched.
- Default Value
- .*
- Accepted Values
- Required
- true
Follow Symlink
- Description
- If set to
true
, both symbolic files and nested symbolic subdirectories are pulled. Otherwise, symbolic files are not read and symbolic link subdirectories are not traversed. - Default Value
- false
- Accepted Values
- true, false
- Required
- true
Grok Expression
- Description
- Specifies the format of a line in Grok format. This allows the connector to understand
how to parse each line in the input file. If a line in the file does not match this
pattern, the line is handled according to what is set in the
Grok No Match Behavior
property.A valid Grok expression must be specified using this property even if Grok format is not used.
- Default Value
- %{GREEDYDATA:message}
- Accepted Values
- Required
- true
Grok No Match Behavior
- Description
- Specifies how to handle lines that do not match the pattern set in the
Grok Expression
property.If set to
append-to-previous-message
, non-matching lines are appended to the last field of the previous message.If set to
skip-line
, non-matching lines are skipped.If set to
raw-line
, non-matching lines are only added to the_raw
field. - Default Value
- append-to-previous-message
- Accepted Values
- append-to-previous-message, skip-line, raw-line
- Required
- true
Host Key File
- Description
- The fully-qualified filename of the host key file.
If supplied, this file is used as the host key.
If a host key is not supplied, but
Strict Host Key Checking
is set totrue
, the known_hosts and known_hosts2 files from the ~/.ssh directory are used.If a host key is not supplied and
Strict Host Key Checking
is set tofalse
, no host key file is used.This parameter must either contain the fully-qualified name of a file, or be completely removed from the configuration JSON.
- Default Value
- Accepted Values
- Required
- false
Hostname
- Description
- The hostname or IP address of the remote system.
- Default Value
- localhost
- Accepted Values
- Required
- true
Ignored Dotted Files
- Description
- Specifies whether to ignore files whose names begin with a dot (".").
- Default Value
- true
- Accepted Values
- true, false
- Required
- true
Input Data Format
- Description
- The format in which the input file contains record-oriented data.
If
Enable Record Processing
is set tofalse
, this setting is ignored. - Default Value
- JSON
- Accepted Values
- JSON, CSV, GROK
- Required
- true
Kerberos Keytab for Schema Registry
- Description
- The fully-qualified filename of the kerberos keytab associated with the principal for accessing Schema Registry.
- Default Value
- The location of the default keytab which is empty and can only be used for unsecure connections.
- Accepted Values
- Required
- true
Kerberos Principal for Schema Registry
- Description
- The Kerberos principal used for authenticating to Schema Registry.
- Default Value
- default
- Accepted Values
- Required
- true
Move Destination Directory
- Description
- The fully-qualified name of the directory on the remote server to move the original
file to once it is ingested. This property is ignored unless the
Completion Strategy
property is set toMove File
. The specified directory must already exist on the remote system.This parameter must either contain the fully-qualified name of a directory, or be completely removed from the configuration JSON.
- Default Value
- Accepted Values
- Required
- false
Password
- Description
- The password to use when connecting to the SFTP server.
If the server does not require a password, this property must be completely removed from the configuration JSON.
- Default Value
- Accepted Values
- Required
- false
Path Filter Regex
- Description
- The Java Regular Expression to use for filtering paths.
If
Search Recursively
is set totrue
, only subdirectories whose path matches the given regular expression are scanned.If
Search Recursively
is set tofalse
, this property is ignored. - Default Value
- .*
- Accepted Values
- Required
- true
Port
- Description
- The port that the remote system is listening on for file transfers.
- Default Value
- 22
- Accepted Values
- Required
- true
Private Key File
- Description
- The fully-qualified filename of a private key file.
If no private key is used, this property must be completely removed from the configuration JSON.
- Default Value
- Accepted Values
- Required
- false
Private Key Password
- Description
- The password used to access the private key.
If no private key is used, this property must be completely removed from the configuration JSON.
- Default Value
- Accepted Values
- Required
- false
Record Per Kafka Message
- Description
- Specifies how many records to write into each Kafka message.
If
Enable Record Processing
is set tofalse
, this setting is ignored. - Default Value
- 1
- Accepted Values
- Required
- true
Remote Path
- Description
- The path on the remote system from which to pull files.
- Default Value
- .
- Accepted Values
- Required
- true
Schema Access Strategy
- Description
- Specifies the strategy used for determining the schema of the input records if the
Enable Record Processing
property is set totrue
.The value you set here depends on the input data format.
If set to
Schema Registry
, the schema is read from Schema Registry.This setting works with all input data formats.
If set to
Infer Schema
, the schema is inferred based on the input file. This setting can only be used if your input data format is eitherJSON
orCSV
.If set to
Field Names From Grok Expression
, the schema is determined using the field names in theGrok Expression
property. This setting can only be used if your input data format isGROK
. - Default Value
- Schema Registry
- Accepted Values
- Schema Registry, Infer Schema, Field Names From Grok Expression
- Required
- true
Schema Branch
- Description
- The name of the branch to use when looking up the schema in Schema Registry.
Schema Branch
andSchema Version
cannot be specified at the same time. If one is specified, the other needs to be removed from the configuration. If Schema Registry is not used, this property must be completely removed from the configuration. - Default Value
- Accepted Values
- Required
- false
Schema Name
- Description
- The schema name to look up in Schema Registry.
If the
Schema Access Strategy
property is set toSchema Registry
, this property must contain a valid schema name.If Schema Registry is not used, this property must be completely removed from the configuration JSON.
- Default Value
- Accepted Values
- Required
- false
Schema Registry URL
- Description
- The URL of the Schema Registry server.
If Schema Registry is not used, use the default value.
- Default Value
- http://localhost:7788/api/v1
- Accepted Values
- Required
- true
Schema Version
- Description
- The version of the schema to look up in Schema Registry. If Schema Registry is used
and a schema version is not specified, the latest version of the schema is retrieved.
Schema Branch
andSchema Version
cannot be specified at the same time. If one is specified, the other needs to be removed from the configuration. If Schema Registry is not used, this property must be completely removed from the configuration. - Default Value
- Accepted Values
- Required
- true
Search Recursively
- Description
- Specifies whether to pull files from arbitrarily nested subdirectories. Subdirectories are not traversed if set to false.
- Default Value
- false
- Accepted Values
- true, false
- Required
- true
Strict Host Key Checking
- Description
- Specifies whether strict enforcement of host keys is applied.
- Default Value
- false
- Accepted Values
- true, false
- Required
- true
Time Format
- Description
- Specifies the format used for parsing time fields in the input data. This property is
only used if
Input Data Format
is set toCSV
orJSON
. - Default Value
- HH:mm:ss
- Accepted Values
- Required
- true
Timestamp Format
- Description
- Specifies the format used for parsing timestamp fields in the input data. This
property is only used if
Input Data Format
is set toCSV
orJSON
. - Default Value
- yyyy-MM-dd HH:mm:ss.SSS
- Accepted Values
- Required
- true
Truststore Filename for Schema Registry
- Description
- The fully-qualified filename of a truststore. This truststore is used to establish a secure connection with Schema Registry using TLS.
- Default Value
- The location of the default truststore which is empty and can only be used for unsecure connections.
- Accepted Values
- Required
- true
Truststore Password for Schema Registry
- Description
- The password used to access the contents of the truststore configured in the
Truststore Filename for Schema Registry
property. - Default Value
- password
- Accepted Values
- Required
- true
Truststore Type for Schema Registry
- Description
- The type of the truststore configured in the
Truststore Filename for Schema Registry
property. - Default Value
- Accepted Values
- BCFKS, PKCS12, JKS
- Required
- true
Username
- Description
- Username for connecting to the SFTP server.
- Default Value
- Accepted Values
- Required
- true