Categories

Versions

Using the Kafka Connector Extension

Introduction

The extension connects RapidMiner with a Kafka server allowing to read messages from topics and write data into Kafka topics.

Install the Kafka Connector extension

To install the extension, go to the Extensions menu, open the RapidMiner Marketplace (Updates and Extensions), and search for Kafka Connector. For more detail, see Adding extensions.

Connect to a Kafka server

The Kafka Connector uses RapidMiner's connection framework. To connect to a Kafka server, create a new Kafka Connection object in the repository. This allows managing connections centrally and to reuse connections between operators. The extension supports the following security options:

  • none
  • SASL Plain
  • SASL SCRAM
  • SSL two-way

In addition SASL Plain and SASL SCRAM can be configured to use an encrypted SSL channel, either with public certificates or locally provided SSL key- and truststore files.

No Authentication

To connect to a Kafka server that does not provide additional security features, it's enough to provide the host address and port numbers it can be reached.

img/Kafka_Connection_no_security.png

SASL Plain

To connect to a Kafka server with SASL (Simple Authentication and Security Layer) Plain as authentication, it requires a username and a password.

If the Encryption is set to none, the credentials are transferred unencrypted. If the server provides encryption certificates (SSL), set Encryption to "yes".

img/Kafka_Connection_SASL_plain.png

SASL SCRAM

To connect to a Kafka server with SASL SCRAM (Salted Challenge Response Authentication Mechanism) as authentication, it requires a username and a password and the correct SCRAM version for hashing.

If the Encryption is set to none, the credentials are transferred unencrypted. If the server provides encryption certificates (SSL), set Encryption to "yes".

img/Kafka_Connection_SASL_SCRAM.png

SSL Two-Way

Select this option when communication to the Kafka sever is secured via SSL secured channel without additional authentication.

img/Kafka_Connection_SSL_2way.png

Using Keystore Files

In the case of self-created certificates it's required to provide the necessary files that hold this information.

The keystore file holds is the repository that holds the private-public keys and certificates and the truststore file holds the certificates to be trusted. Because of the sensitivity of this information, both files have their own passwords.

img/Kafka_Connection_SSL_keystore.png

Examples

You can find the operators Read Kafka Topic and Write Kafka Topic by searching for Kafka in the operator panel.

img/Kafka_Operators.png

For both operators you can specify the used Kafka server by connecting a stored connection object to the connection input port. No additional credentials are needed, as they are all stored in the Connection Object. By exchanging connections, you can easily switch between different servers.

Both operators have the option to provide a list of already available topics on a server. This is extremely helpful when looking for a specific topic to subscribe to or sending new messages to an already existing topic.

img/Kafka_Update_Topics.png

Reading from an existing Kafka Topic

With the Read Kafka Topic operator it is possible to retrieve messages from a Kafka server.

There are two distinct access methods, called offset strategies:

With the parameter offset strategy set to earliest past messages are retrieved, beginning from the earliest available messages for this topic. This is useful when training a machine learning model on past data or checking for recent events. It is possible to query for a selected number of messages or retrieve all past messages for that topic.

If the offset strategy is set to latest the operator waits to collect new incoming messages. The collection strategy is either to wait until a fixed number of new messages have arrived or a certain amount of time has passed. As fall back, even when waiting for a fixed number, there is a time out parameter, to prevent waiting indefinitely. This strategy can be used to collect new events, for example for monitoring events or scoring them with a trained model.

In both cases the resulting example set contains each message as an example row

Troubleshooting

Depending on the server and the connection speed, the read requests can experience time outs, in which case an empty example set is returned. Especially for remote services it might help to increase the time out settings to get reliable results, that do not increase the process execution time too much.

Writing Data to a Kafka Topic

The Write Kafka Operator allows to send data from RapidMiner to a specified Kafka topic. This can be either done in one large batch with the bulk sending option selected, or in a specified message interval.

It is also possible to select between two different message formats. With the message format set to JSON (JavaScript Object Notation),, each example is converted into a JSON message, that contains the attribute names of the example set as keys.

The message format String just sends the raw data without attribute names with a specified separator token (the default is ";").

img/Write_Kafka_JSON.png

img/Write_Kafka_String.png