How to Achieve High-Performance Data Ingestion to TiDB in Apache Flink

Written by Fengbiao Liang (TiDB Cloud Solution Engineer at PingCAP)

TiDB is a distributed SQL database that supports Hybrid Transactional and Analytical Processing (HTAP) workloads. Apache Flink is the most popular, open-source computing framework. It provides high-throughput, low-latency data computing and exactly-once semantics.

If you use TiDB with Flink, you may have had to quickly ingest streaming data from Apache Flink to TiDB. However, if you use Apache Flink’s default configuration, data ingestion performance will be limited. To achieve higher performance, you must:

  • Enable bulk insert.
  • Establish a high concurrency connection.
  • Run the SQL ON DUPLICATE KEY UPDATE clause.

In this article, we share two ways you can achieve high-performance data ingestion by adjusting some parameters and code.

Test environment

Our test environment was:

  • Apache Flink: V1.13.5
  • Programing language: Scala V2.12
  • MySQL JDBC Connector: V8.0.27
  • Database: TiDB V5.3.0

Sample test cases

There are different approaches to achieving high performance in this scenario. We will recommend two of them, both of which enable BULK INSERT. The sample code is written in Scala.

Solution 1: Use the Flink SQL statement

The Flink SQL statement is the easiest way to enable BULK INSERT from other data sources to TiDB.

  1. Create the SQL table:
var createTableStatement = """
| CREATE TABLE order_item (
| id BIGINT,
| productId INT,
| name STRING,
| price DECIMAL(8,2),
| cnt INT,
| PRIMARY KEY (`id`) NOT ENFORCED
| ) WITH (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://<tidb_addr>:4000/?useServerPrepStmts=true
| &cachePrepStmts=true&rewriteBatchedStatements=true',
| 'driver' = 'com.mysql.jdbc.Driver',
| 'table-name' = 't_order_item',
| 'username' = 'root',
| 'password' = '',
| 'sink.buffer-flush.max-rows' = '200',
| 'sink.buffer-flush.interval' = '3',
| 'sink.parallelism' = '200' )
|""".stripMargin

The SQL statement above creates a table in Apache Flink that maps to the TiDB table. For better performance, note the following:

2. Execute the insert SQL statement.

You can execute the following code to insert data to TiDB. The first line creates a table using the SQL statement from step 1.

The second line is the data from a data source table named mock_orderitem. When the code runs, it generates the insert SQL statement. For example: insert into t (a) values (10), (11), (12) on duplicate key update a = values(a);

tableEnv.executeSql(createTableStatement); 
tableEnv.executeSql("""INSERT INTO order_item (
| id, productId, name, price, cnt)
| select id, productId, name, price, cnt
| FROM mock_orderitem""".stripMargin)

Solution 2: Use JDBC sink in your code

Developers favor this solution because it’s highly flexible. You can customize the data source, structure, and format. The sample code shown below:

  1. Sets TiDB connection parameters.
  2. Uses “JdbcStatementBuilder” to generate SQL statements dynamically.
  3. Executes the SQL statement with multi-threads.
val executionOptions = JdbcExecutionOptions.builder.withBatchIntervalMs(3).withBatchSize(200).build 
val connectionOptions = (new JdbcConnectionOptions.JdbcConnectionOptionsBuilder) .withUrl("jdbc:mysql://<tidb_addr>:4000/?useServerPrepStmts=true&cachePrepStmts=true&rewriteBatchedStatements=true")
.withDriverName("com.mysql.jdbc.Driver").withUsername("root").withPassword("").build
var insertSQL = "INSERT INTO t_order_item (id, productId, name, price, cnt) values (?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE id=values(id)"
val sb: JdbcStatementBuilder[OrderItem] = new JdbcStatementBuilder[OrderItem] {
override def accept(ps: PreparedStatement, t: OrderItem): Unit = {
ps.setLong(1, t.id);
ps.setInt(2, t.productId);
ps.setString(3, t.name);
ps.setInt(4, t.price);
ps.setInt(5, t.cnt);
}
}
val mySink = JdbcSink.sink(insertSQL, sb, executionOptions, connectionOptions) sourceDataStream.addSink(mySink).setParallelism(200) env.execute("TiDB Bulk Insert Job")

Note the following parameters in your code:

Note: If you use the ON DUPLICATE KEY UPDATE clause in your SQL statement, please pay attention to the statement syntax. You should use the VALUES(col_name) function to refer to column values. For example:

INSERT INTO table (id, name) values (?, ?) ON DUPLICATE KEY UPDATE id=values(id)

Summary

If you are writing a high-performance data ingestion program, we recommend that you read this article and test the parameters carefully. If you want to check whether BULK INSERT works, enable the TiDB general log to view the SQL statements.

If you run into issues, feel free to join our community on Slack and TiDB Internals to share them with us. You can also request a demo for these methods from PingCAP.

Originally published at https://en.pingcap.com on April 12, 2022.

--

--

PingCAP is the team behind TiDB, an open-source MySQL compatible NewSQL database. Official website: https://pingcap.com/ GitHub: https://github.com/pingcap

Love podcasts or audiobooks? Learn on the go with our new app.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
PingCAP

PingCAP

PingCAP is the team behind TiDB, an open-source MySQL compatible NewSQL database. Official website: https://pingcap.com/ GitHub: https://github.com/pingcap