Blog

Building Real-Time Synchronization with Databend Kafka Connect on Confluent Cloud

JeremyAug 12, 2024

Confluent Cloud is a cloud service provided by Confluent, built on Apache Kafka as an enterprise-grade event streaming platform. It allows users to easily build and manage distributed stream processing applications. Founded by the original creators of Apache Kafka, Confluent focuses on offering products and services centered around Kafka technology.

The key advantages of Confluent Cloud include simplified deployment and management, high availability, and automatic scaling capabilities. It is a cloud-hosted service, essentially the cloud version of Confluent Enterprise, enhanced with a cloud management console. This allows users to focus on implementing business logic without worrying about the underlying infrastructure maintenance and scaling issues.

Like other Confluent products, Confluent Cloud offers a suite of tools and services, such as Kafka Connect for connecting external systems, Schema Registry for managing changes in data formats, and KSQL for stream processing queries. These tools and services help users build a unified and flexible data streaming platform, enabling real-time data processing and analysis.

Additionally, Confluent Cloud offers different service tiers—Basic, Standard, and Dedicated—tailored to various enterprise needs in terms of availability, security, and other features. It supports on-demand resource creation and usage-based pricing, providing users with flexible payment options.

Databend Kafka Connect

Databend offers Databend-Kafka-Connect, a Sink connector for Apache Kafka that integrates directly with Confluent Cloud. This allows for real-time consumption of data from Kafka topics and seamless writing into Databend tables.

Databend Kafka Connect comes with several features, including automatic table creation, support for both Append Only and Upsert writing modes, and automatic schema evolution.

This post will guide you through the process of building a real-time data synchronization using Databend Kafka Connector on Confluent Cloud.

Implementation steps

Step 1: Add Custom Connector

Confluent offers a Connector Hub where you can find all the connectors that are pre-integrated into Confluent Cloud. For connectors that are not built-in, Confluent also supports the creation of custom connectors.

  1. Click Add plugin.

alt text

  1. Configure the new plugin.

alt text

When creating a custom connector in Confluent Cloud, you’ll need to fill out the connector's name, description, and specify the entry class. For the Databend Kafka Connect, the entry class is

com.databend.kafka.connect.DatabendSinkConnector
.

You can either clone the source code by running

git clone https://github.com/databendcloud/databend-kafka-connect.git
and compile it yourself, or you can directly download the pre-packaged .jar file from the release page. Once you have the .jar file, upload it to Confluent Cloud.

alt text

Step 2: Create Topic

alt text

To define the data structure for the newly created topic, you'll need to establish a schema. In this case, we're specifying that the data format in the Kafka topic will be AVRO. The schema is defined as follows:

{
"doc": "Sample schema to help you get started.",
"fields": [
{
"doc": "The int type is a 32-bit signed integer.",
"name": "id",
"type": "int"
},
{
"doc": "The string is a unicode character sequence.",
"name": "name",
"type": "string"
},
{
"doc": "The string is a unicode character sequence.",
"name": "age",
"type": "int"
}
],
"name": "sampleRecord",
"type": "record"
}

alt text

Step 3: Add Connector for New Topic

Add a Sink connector for the newly created topic by selecting our custom Databend connector and configuring the API key and secret.

alt text

Step 4: Add Configuration File for Databend Connector

The configuration file specifies connection parameters for the target endpoint, including the database name, table name, Kafka-related information, and connector converters.

  • Key Converter and Value Converter: These converters are specified to handle the format of message keys and values, respectively. They manage the conversion between Kafka Connect's format and the serialization format used when writing to Kafka. This controls the format of keys and values in messages written to or read from Kafka. Since converters are independent of connectors, any connector can work with any serialization format.

  • By default, Kafka provides the JSONConverter. Some converters come with specific configuration parameters. For example, the key.converter.schemas.enable parameter can be set to true or false to indicate whether the JSON messages include schema information.

alt text

{
"auto.create": "true",
"auto.evolve": "true",
"batch.size": "1",
"confluent.custom.schema.registry.auto": "true",
"connection.attempts": "5",
"connection.backoff.ms": "10000",
"connection.database": "testsync",
"connection.password": "password",
"connection.url": "jdbc:databend://tn3ftqihs--medium-p8at.gw.aws-us-east-2.default.databend.com:443?ssl=true",
"connection.user": "cloudapp",
"errors.tolerance": "all",
"insert.mode": "upsert",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"max.retries": "10",
"pk.fields": "id",
"pk.mode": "record_value",
"table.name.format": "testsync.${topic}",
"topics": "topic_avrob",
"value.converter": "io.confluent.connect.avro.AvroConverter"
}

Step 5: Configure Network Whitelisting

Add the target endpoint's host to the connection endpoint configuration in Confluent Cloud. Confluent will then establish a private link between Kafka and the target endpoint to ensure smooth and secure network communication.

alt text

Step 6: Review Configuration and Start Kafka Connector

alt text

Step 7: Send Sample Data to Topic

alt text

After confirming that the Kafka Connect is in a running state, use the Confluent CLI tool to send data to the topic:

confluent kafka topic produce topic_avrob --value-format avro --schema schema.json

schema.json is the schema file you defined for the topic.

Through the logs in Confluent Cloud, you can verify that Kafka Connect has received messages from the topic and has begun writing them to Databend Cloud.

alt text

At the same time, in Databend Cloud, you can observe that the testsync.topic_avrob table has been automatically created and data has been written to it.

alt text

Share this post

Subscribe to our newsletter

Stay informed on feature releases, product roadmap, support, and cloud offerings!