MQTT Source connector

The MQTT Source connector is a Stateless NiFi dataflow developed by Cloudera that is running in the Kafka Connect framework. Learn about the connector, its properties, and configuration.

The MQTT Source connector consumes messages from an MQTT broker and transfers them to Kafka. The Kafka topic this connector transfers messages to is determined by the value of the topics property in the configuration. This connector does not perform record processing. The messages are transferred to Kafka as they are consumed from the MQTT broker. TLS can be used to establish a secure connection between the connector and the MQTT broker. The keystore and truststore files necessary for securing the connection must be present on the cluster node that the connector runs on.

Properties and configuration

Configuration is passed to the connector in a JSON file during creation. The properties of the connector can be categorized into three groups. These are as follows:

Common connector properties
These are the properties of the Kafka Connect framework that are accepted by all connectors. For a comprehensive list of these properties, see the Apache Kafka documentation.
Stateless NiFi Source properties
These are the properties that are specific to the Stateless NiFi Source connector. All Stateless NiFi Source connectors share and accept these properties. For a comprehensive list of these properties, see the Stateless NiFi Source property reference.
Connector/dataflow-specific properties
These properties are unique to this specific connector. Or to be more precise, unique to the dataflow running within the connector. These properties use the following prefix:
parameter.[***CONNECTOR NAME***] Parameters:
For a comprehensive list of these properties, see the MQTT Source properties reference.

Notes and limitations

  • Required properties must be assigned a valid value even if they are not used in the particular configuration. If a required property is not used, either leave its default value, or completely remove the property from the configuration JSON.
  • If a property that has a default value is completely removed from the configuration JSON, the system uses the default value.
  • Properties not marked as required must be completely removed from the configuration JSON if not set.
  • If the keystore-related properties are removed and an empty truststore is provided, the connector does not use TLS for connecting to the MQTT broker. TLS is used if a truststore is provided that has a key in it.
  • When the connector is deployed in a cluster environment, the system might move it from one cluster node to another. During this process, the connector is stopped, deployed to the target node, and started there. The time between the connector stopping and starting again might be several minutes. To avoid losing the messages that arrived to the MQTT broker during this time, set the following properties:
    • Set MQTT Clean Session to false.
    • Specify the MQTT Client ID.
    • Set MQTT Quality of Service to 1 or 2.

Configuration example

In this example, the connector fetches messages from an MQTT broker and transfers them to a Kafka topic.
{
 "connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSourceConnector",
 "meta.smm.predefined.flow.name": "MQTT Source",
 "meta.smm.predefined.flow.version": "1.0.0",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
 "tasks.max": "1",
 "nexus.url": "https://repository.cloudera.com/artifactory/repo",
 "extensions.directory": "/tmp/nifi-stateless-extensions",
 "working.directory": "/tmp/nifi-stateless-working",
 "topics": "[***KAFKA TOPIC NAME***]",
 "parameter.MQTT Source Parameters:MQTT Broker URI": "tcp://[***HOST***]:[***PORT***]",
 "parameter.MQTT Source Parameters:MQTT Quality of Service": "0",
 "parameter.MQTT Source Parameters:MQTT Topics": "[***MQTT TOPIC NAME***]"
}
The following list collects the properties from the configuration example that must be customized for this use case:
topics
The name of the Kafka topic that the connector sends messages to.
MQTT Broker URI
The URI of the MQTT broker. In this example, the connection is not secure. As a result, the URI starts with tcp://. The port number you typically use in a case like this is 1883. If TLS is used, the URI should start with ssl://, and typically you use port number 88883.
MQTT Topics
The comma-separated list of MQTT topics that the connector fetches messages from.

Stateless NiFi Source properties reference

Review the following reference for a comprehensive list of the connector properties that are specific to the Stateless NiFi Source connector.

In addition to the properties listed here, Stateless NiFi connectors also accept the properties of the Kafka Connect framework. For a comprehensive list of these properties, see the Apache Kafka documentation.

dataflow.timeout

Description
Specifies the maximum amount of time to wait for the dataflow to complete. If the dataflow does not complete before this timeout, the thread is interrupted and the dataflow is considered as a failure. The session is rolled back and the connector retriggers the flow. Defaults to 60 seconds if not specified.
Default Value
60 seconds
Accepted Values
Required
false

extensions.directory

Description
Specifies the directory that stores downloaded extensions. Extensions are the NAR (NiFi Archive) files containing the processors and controller services a flow might use. Since Stateless NiFi is only the NiFi engine, it does not contain any of the processors and controller services you might use in your flow. When deploying the connector with the custom flow, the system needs to download the specific extensions that your flow uses from Nexus (unless they are already present in this directory). These extensions are stored in this directory. Because the default directory might not be writable, and to aid in upgrade scenarios, Cloudera recommends that you always specify an extensions directory.
Default Value
/tmp/nifi-stateless-extensions
Accepted Values
Required
true

flow.snapshot

Description
Specifies the dataflow to run. When using Streams Messaging Manager to deploy a connector, the value you set in this property must be a JSON object. URLs, file paths, or escaped JSON strings are not supported when using Streams Messaging Manager. Alternatively, if using the Kafka Connect REST API to deploy a connector, this can be a file containing the dataflow, a URL that points to a dataflow, or a string containing the entire dataflow as an escaped JSON. Cloudera however, does not recommend using the Kafka Connect REST API to interact with this connector or Kafka Connect.
Default Value
Accepted Values
Required
true

header.attribute.regex

Description
A Java regular expression that is evaluated against all flowfile attribute names. Any attribute name matching the regular expression is converted into a Kafka message header. The name of the attribute is used as the header key, the value of the attribute is used as the header value. If not specified, headers are not added to the Kafka record.
Default Value
Accepted Values
Required
false

header.name.regex

Description

A Java regular expression that will be evaluated against all flowfile attribute names. For any attribute whose name matches the regular expression, the Kafka record will have a header whose name matches the attribute name and whose value matches the attribute value. If not specified, the Kafka record will have no headers added to it.

Default Value
Accepted Values
Required
false

key.attribute

Description
Specifies the name of a flowfile attribute that should be used to specify the key of the Kafka record. If not specified, the Kafka record will not have a key associated with it. If specified, but the attribute does not exist on a particular flowfile, it will also have no key associated with it.
Default Value
Accepted Values
Required
false

krb5.file

Description
Specifies the krb5.conf file to use if the dataflow interacts with any services that are secured using Kerberos. Defaults to /etc/krb5.conf if not specified.
Default Value
/etc/krb5.conf
Accepted Values
Required
false

name

Description
The name of the connector. On the Streams Messaging Manager UI, the connector names are specified using the Enter Name field. The name that you enter in the Enter Name field is automatically set as the value of the name property when the connector is deployed. Because of this, the name property is omitted from the configuration template provided in Streams Messaging Manager. If you manually add the name property to the configuration in Streams Messaging Manager, ensure that the value you set matches the connector name specified in the Enter Name field. Otherwise, the connector fails to deploy.
Default Value
Accepted Values
Required
True

nexus.url

Description
Specifies the Base URL of the Nexus instance to source extensions from. If configuring a Nexus instance that has multiple repositories, include the name of the repository in the URL. For example, https://nexus-private.myorganization.org/nexus/repository/my-repository/. If the property is not specified, the necessary extensions (the ones used by the flow) must be provided in the extensions directory before deploying the connector.
Default Value
Accepted Values
Required
true

output.port

Description
The name of the output port in the NiFi dataflow to pull data from. If the dataflow contains exactly one port, this property is optional and can be omitted. However, if the dataflow contains multiple ports (for example, a success and a failure port), this property must be specified. If any flowfile is sent to any port other than the specified port, it is considered as a failure. The session is rolled back and no data is collected.
Default Value
Accepted Values
Required
false

parameter.[***FLOW PARAMETER NAME***]

Description
Specifies a parameter to use in the dataflow. For example, assume that you have the following entry in your connector configuration "parameter.Directory": "/mydir". In a case like this, any parameter context in the dataflow that has a parameter named Directory gets the specified value (/mydir). If the dataflow has child process groups, and those child process groups have their own parameter contexts, the value is used for all parameter contexts that contain a parameter named Directory. Parameters can also be applied to specific Parameter Contexts only. This can be done by prefixing the parameter name (Directory) with the name of the parameter context followed by a colon. For example, parameter.My Context:Directory only applies the specified value for the Directory parameter in the parameter context named My Context.
Default Value
Accepted Values
Required
false

topic.name.attribute

Description
Specifies the name of a flowfile attribute to use for determining which Kafka topic a flowfile is sent to. Either the topics or topic.name.attribute property must be specified. If both are specified, topic.name.attribute takes precedence. However, if a flowfile does not have the specified attribute name, then the connector falls back to using the topics property.
Default Value
Accepted Values
Required
false

topics

Description
The name of the topic to deliver data to. All flowfiles are delivered to the topic specified here. However, it is also possible to determine the topic individually for each flowfile. To do this, ensure that the dataflow specifies the topic name in an attribute, and then use topic.name.attribute to specify the name of the attribute instead of topic name. For example, if you wanted a separate Kafka topic for each data source, you can omit the topics property and instead specify the attribute (for example, datasource.hostname) corresponding to the topic using the topic.name.attribute property.
Default Value
Accepted Values
Required
true

working.directory

Description
Specifies a directory on the Connect server that NiFi should use for unpacking extensions that it needs to perform the dataflow. The contents of extensions.directory are unpacked here. Defaults to /tmp/nifi-stateless-working if not specified.
Default Value
/tmp/nifi-stateless-working
Accepted Values
Required
false

MQTT Source properties reference

Review the following reference for a comprehensive list of the connector properties that are specific to the MQTT Source connector.

The properties listed in this reference must be added to the connector configuration with the following prefix:
parameter.[***CONNECTOR NAME***] Parameters:

In addition to the properties listed here, this connector also accepts certain properties of the Kafka Connect framework as well as the properties of the NiFi Stateless Source connector. When creating a new connector using the Streams Messaging Manager UI, all valid properties are presented in the default configuration template. You can view the configuration template to get a full list of valid properties. In addition, for more information regarding the accepted properties not listed here, you can review the Apache Kafka documentation and the Stateless NiFi Source property reference.

Keystore Filename

Description
The fully-qualified filename of a keystore. This keystore is used for mutual TLS towards the MQTT server.

If the MQTT broker does not require client certificate authentication, this property must be completely removed from the configuration JSON.

Default Value
Accepted Values
Required
false

Keystore Key Password

Description
The password used to access the key stored in the keystore file configured in the Keystore Filename property.

If the MQTT broker does not require client certificate authentication, this property must be completely removed from the configuration JSON.

Default Value
Accepted Values
Required
false

Keystore Password

Description
The password used to access the contents keystore configured in the Keystore Filename property.

If the MQTT broker does not require client certificate authentication, this property must be completely removed from the configuration JSON.

Default Value
Accepted Values
Required
false

Keystore Type

Description
The type of the keystore configured in the Keystore Filename property.

If the MQTT broker does not require client certificate authentication, this property must be completely removed from the configuration JSON.

Default Value
Accepted Values
BCFKS, PKCS12, JKS
Required
false

MQTT Broker URI

Description
The URI to use to connect to the MQTT broker.

Example URI if SSL is not used (keystore-related parameters are removed and and the default truststore is used): tcp://localhost:1883

Example URI if SSL is used: ssl://localhost:8883

Default Value
tcp://localhost:1883
Accepted Values
Required
true

MQTT Password

Description
Password to use when connecting to the MQTT broker.

If username-password authentication is not required by the MQTT broker, this property must be completely removed from the configuration JSON.

Default Value
Accepted Values
Required
false

MQTT Quality of Service

Description
The Quality of Service (QoS) to receive the message with.

0 - at most once

1 - at least once

2 - exactly once

Default Value
0
Accepted Values
0,1 or 2
Required
true

MQTT Topics

Description
Specifies the MQTT topic to subscribe to. Use an MQTT wildcard to subscribe to multiple topics simultaneously.
Default Value
Accepted Values
Required
true

MQTT Username

Description
Username to use when connecting to the MQTT broker.

If username-password authentication is not required by the MQTT broker, this property must be completely removed from the configuration JSON.

Default Value
Accepted Values
Required
false

Truststore Filename

Description
The fully-qualified filename of a truststore. This truststore is used to establish a secure connection with the MQTT server using TLS.
Default Value

The location of the default truststore which is empty and can only be used for unsecure connections.

Accepted Values
Required
true

Truststore Password

Description
The password used to access the contents of the truststore configured in the Truststore Filename property.
Default Value
password
Accepted Values
Required
true

Truststore Type

Description
The type of the truststore configured in the Truststore Filename property.
Default Value
JKS
Accepted Values
BCFKS, PKCS12, JKS
Required
true

MQTT Client ID

Description
The MQTT client ID to use.

If it is not set, a UUID is generated. In such a case, this property must be removed from the configuration.

Default Value
Accepted Values
Required
false

MQTT Clean Session

Description
Specifies whether to start a new session.

If it is set to true, the client and server discard any previous sessions and start a new one. The new session lasts as long as the network connection. State data associated with a session is not reused in any subsequent sessions.

If it is set to false, the server resumes communications with the client based on the state from the current session (identified by the Client ID). The client and server store the session after being disconnected. When a session that is not a clean session is disconnected, the server stores further QoS 1 and QoS 2 messages that match any subscriptions that the client has at the time of disconnection. These messages are stored as part of the session state.

Default Value
true
Accepted Values
true, false
Required
false