Building Real-Time Synchronization with Databend Kafka Connect on Confluent Cloud
JeremyAug 12, 2024
Confluent Cloud is a cloud service provided by Confluent, built on Apache Kafka as an enterprise-grade event streaming platform. It allows users to easily build and manage distributed stream processing applications. Founded by the original creators of Apache Kafka, Confluent focuses on offering products and services centered around Kafka technology.
The key advantages of Confluent Cloud include simplified deployment and management, high availability, and automatic scaling capabilities. It is a cloud-hosted service, essentially the cloud version of Confluent Enterprise, enhanced with a cloud management console. This allows users to focus on implementing business logic without worrying about the underlying infrastructure maintenance and scaling issues.
Like other Confluent products, Confluent Cloud offers a suite of tools and services, such as Kafka Connect for connecting external systems, Schema Registry for managing changes in data formats, and KSQL for stream processing queries. These tools and services help users build a unified and flexible data streaming platform, enabling real-time data processing and analysis.
Additionally, Confluent Cloud offers different service tiers—Basic, Standard, and Dedicated—tailored to various enterprise needs in terms of availability, security, and other features. It supports on-demand resource creation and usage-based pricing, providing users with flexible payment options.
Databend Kafka Connect
Databend offers Databend-Kafka-Connect, a Sink connector for Apache Kafka that integrates directly with Confluent Cloud. This allows for real-time consumption of data from Kafka topics and seamless writing into Databend tables.
Databend Kafka Connect comes with several features, including automatic table creation, support for both Append Only and Upsert writing modes, and automatic schema evolution.
This post will guide you through the process of building a real-time data synchronization using Databend Kafka Connector on Confluent Cloud.
Implementation steps
Step 1: Add Custom Connector
Confluent offers a Connector Hub where you can find all the connectors that are pre-integrated into Confluent Cloud. For connectors that are not built-in, Confluent also supports the creation of custom connectors.
- Click Add plugin.
- Configure the new plugin.
When creating a custom connector in Confluent Cloud, you'll need to fill out the connector's name, description, and specify the entry class. For the Databend Kafka Connect, the entry class is
com.databend.kafka.connect.DatabendSinkConnector
You can either clone the source code by running
git clone https://github.com/databendcloud/databend-kafka-connect.git
Step 2: Create Topic
To define the data structure for the newly created topic, you'll need to establish a schema. In this case, we're specifying that the data format in the Kafka topic will be AVRO. The schema is defined as follows:
{
"doc": "Sample schema to help you get started.",
"fields": [
{
"doc": "The int type is a 32-bit signed integer.",
"name": "id",
"type": "int"
},
{
"doc": "The string is a unicode character sequence.",
"name": "name",
"type": "string"
},
{
"doc": "The string is a unicode character sequence.",
"name": "age",
"type": "int"
}
],
"name": "sampleRecord",
"type": "record"
}
Step 3: Add Connector for New Topic
Add a Sink connector for the newly created topic by selecting our custom Databend connector and configuring the API key and secret.
Step 4: Add Configuration File for Databend Connector
The configuration file specifies connection parameters for the target endpoint, including the database name, table name, Kafka-related information, and connector converters.
-
Key Converter and Value Converter: These converters are specified to handle the format of message keys and values, respectively. They manage the conversion between Kafka Connect's format and the serialization format used when writing to Kafka. This controls the format of keys and values in messages written to or read from Kafka. Since converters are independent of connectors, any connector can work with any serialization format.
-
By default, Kafka provides the JSONConverter. Some converters come with specific configuration parameters. For example, the key.converter.schemas.enable parameter can be set to true or false to indicate whether the JSON messages include schema information.
{
"auto.create": "true",
"auto.evolve": "true",
"batch.size": "1",
"confluent.custom.schema.registry.auto": "true",
"connection.attempts": "5",
"connection.backoff.ms": "10000",
"connection.database": "testsync",
"connection.password": "password",
"connection.url": "jdbc:databend://tn3ftqihs--medium-p8at.gw.aws-us-east-2.default.databend.com:443?ssl=true",
"connection.user": "cloudapp",
"errors.tolerance": "all",
"insert.mode": "upsert",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"max.retries": "10",
"pk.fields": "id",
"pk.mode": "record_value",
"table.name.format": "testsync.${topic}",
"topics": "topic_avrob",
"value.converter": "io.confluent.connect.avro.AvroConverter"
}
Step 5: Configure Network Whitelisting
Add the target endpoint's host to the connection endpoint configuration in Confluent Cloud. Confluent will then establish a private link between Kafka and the target endpoint to ensure smooth and secure network communication.
Step 6: Review Configuration and Start Kafka Connector
Step 7: Send Sample Data to Topic
After confirming that the Kafka Connect is in a running state, use the Confluent CLI tool to send data to the topic:
confluent kafka topic produce topic_avrob --value-format avro --schema schema.json
schema.json is the schema file you defined for the topic.
Through the logs in Confluent Cloud, you can verify that Kafka Connect has received messages from the topic and has begun writing them to Databend Cloud.
At the same time, in Databend Cloud, you can observe that the testsync.topic_avrob table has been automatically created and data has been written to it.
Subscribe to our newsletter
Stay informed on feature releases, product roadmap, support, and cloud offerings!