Clickhouse is currently used in the real-time BI backend. As long as the data is stable and logged into the database, reports can be generated very quickly and temporary queries can also be very fast. In the process of using it, I have a deep understanding of some of its advantages and disadvantages. Here is a summary. It cannot be done. It is comprehensive, but introduces the issues and application techniques that need attention in practical applications in as much detail as possible.

We write Flink programs, consume Kafka data, clean the data, expand the dimensions, and then store it in clickhouse. Over the past six months, there have been few problems with the Flink program, and the data has been stored very stably. For clickhouse, Tencent Cloud’s clickhouse service is used. There is a replica cluster, and the disk has been expanded several times. The service is also quite stable. Overall, the entire BI backend can provide data reports stably. For the convenience of writing, clickhouse will be abbreviated as ck next.

ck references the mysql external data table

Usually we need to use tables in mysql in ck. For example, there is a dimension table in mysql. We need to query a certain name based on the id. At this time, we don’t need to import a copy of the data to map the mysql table to ck. , or directly map the entire mysql database to a certain library of ck, you can operate all tables of the mysql database, and use sql syntax to associate the tables of mysql and ck.

The MySQL engine is used to map tables in the remote MySQL server to ClickHouse, and allows you to perform INSERT and SELECT queries on the table to facilitate data exchange between ClickHouse and MySQL.

Create database

CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MySQL('host:port', ['database' | database], 'user', 'password')

For example, we create a table in mysql:

mysql> USE test;
Database changed

mysql> CREATE TABLE `mysql_table` (
    ->   `int_id` INT NOT NULL AUTO_INCREMENT,
    ->   `float` FLOAT NOT NULL,
    ->   PRIMARY KEY (`int_id`));
Query OK, 0 rows affected (0,09 sec)

mysql> insert into mysql_table (`int_id`, `float`) VALUES (1,2);
Query OK, 1 row affected (0,00 sec)

mysql> select * from mysql_table;
+------+-----+
| int_id | value |
+------+-----+
|      1 |     2 |
+------+-----+
1 row in set (0,00 sec)

Let’s create a database in ck and associate it with the mysql database.

CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'test', 'my_user', 'user_password')

In this way, a mysql_db is created in ck. This database is mapped with the mysql test database. We directly query in ck:

SELECT * FROM mysql_db.mysql_table

┌─int_id─┬─value─┐
│      1 │     2 │
└────────┴───────┘

The database engine can be mysql or other databases, such as sqlite and PostgreSQL. For more information, you can check the official documentation:

clickhouse.com/docs/zh/eng…

ck distributed table with replicas

A distributed table with copies is a distributed table, and a single part also has a copy. When we first built the table, it took some time. Recall that the main problems at that time were as follows:

1) How to create a distributed table with copies?

We also created it wrong at first, and found that the data was incomplete, only half of it each time. Later we learned that Tencent Cloud’s service is a distributed cluster with replicas. Creating a table also requires a distributed table with replicas, otherwise the data will be lost. Create a table Divided into 2 steps, the statement is as follows:

-- 
CREATE TABLE test.table_local on cluster default_cluster
(
    `id` Int32,
    `uid` String,
    `name` String,
    `time` Int32,
    `create_at` DateTime
)
ENGINE = ReplicatedMergeTree()
PARTITION BY toYYYYMM(create_at)
ORDER BY id;

-- 
CREATE TABLE test.table_all on cluster default_cluster as test.table_local 
ENGINE = Distributed('default_cluster', 'test', 'table_local', sipHash64(id));

Parameter Description:

ReplicatedMergeTree: table engine with copies;

PARTITION BY: data partitioning method;

sipHash64(id): The data distribution method of the distributed table on each node;

For details, please see the official documentation, address:

clickhouse.com/docs/zh/eng…

In the following articles, this table will be used as an example.

2) In a distributed table, when inserting data, does each node need to perform an insertion operation?

No, just use standard SQL syntax to insert into the distributed table, for example:

insert into test.table_all values(.....)

3) Are the update and delete operations of distributed tables the same as mysql?

Not the same, it can only be said to be similar. Just use it according to the template. alter table syntax:

ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr

for example:

alter table test.table_local on cluster default_cluster update name = 'name' where id = 10000

Note: For update operations, the local table test.table_local needs to be used, and distributed tables cannot be used.

The same goes for deletion:

alter table test.table_local on cluster default_cluster delete where id = 10000

4) Distributed table, add columns, modify column types

-- 
ALTER TABLE test.table_local ON CLUSTER default_cluster ADD COLUMN product String;

-- 
ALTER TABLE test.table_local on cluster default_cluster MODIFY COLUMN uid Int64;

As you can see, the difference between the ck table with copies and the standard SQL syntax is the use of the alter table and on cluster keywords. When using it, just apply the template. For some other DDL operations, please see the specific official documents:

clickhouse.com/docs/zh/sql…

write performance

ck advocates low-frequency and large-volume writing. It only writes a few times per second and inserts tens of thousands or hundreds of thousands of data each time. This is more appropriate. Because if you have a little understanding of the underlying principles, you will know that ck will merge data blocks at intervals. It is not suitable for frequent writing to cause frequent merging and affect performance.

In the process of using Flink to import data, we need to accumulate data and write it in batches. We accumulate data through the Flink window function and write a batch of data for 5 seconds each time. I remember that when I first started using ck, the development didn’t pay attention to these, and the operation and maintenance said that it needed to be written in batches. Later, it was basically unified.

Things to note when adding an index

ck has a primary sparse index and a secondary hop index. The secondary index is based on the primary index. Sometimes after a table is built and data is written, we find that some fields need to be used in the query, and indexes and statements need to be added. :

-- 
alter table test.table_local on cluster default_cluster add index index_uid uid type minmax(100) GRANULARITY 2;

-- 
ALTER TABLE test.table_local MATERIALIZE index index_uid;

The alter table format is also used. What needs to be noted here is that the index is established when data is inserted. The new index will not take effect on historical data, and historical data needs to take effect as well.

Data deduplication

The ReplacingMergeTree engine table will delete duplicates with the same sort key value. The sort key value is the field that follows order by when creating the table. ck is not friendly to updates and has poor performance, so we can use this engine to just write every time without updating. ck will automatically save the latest version for us. The table creation statement is as follows:

CREATE TABLE test.test_local on cluster default_cluster (
    `id` UInt64,
    `type` Int32,
    `username` String,
    `password` String,
    `phone` String COMMENT 'phone number',
    `nick` String,
    `mobile` String,
    `insert_time` DateTime DEFAULT '2023-07-31 00:00:00'
) ENGINE = ReplicatedReplacingMergeTree()
partition by dt
order by id;

CREATE TABLE test.test_all on cluster default_cluster as test.test_local ENGINE = Distributed('default_cluster', 'test', 'test_local', sipHash64(id));

The insert_time field needs to be present and placed at the end so that ck can retain the latest data according to time.

Data deduplication only occurs during data merging. The merge happens in the background at an unspecified time, so you can’t plan ahead. Some data may still be unprocessed. Usually the OPTIMIZE statement is used to trigger it manually. For example, today the program stopped abnormally and I started the program. There is a high probability that there will be multiple versions of data. At this time, it needs to be merged manually:

OPTIMIZE table test.test_local on cluster default_cluster final;

This will trigger data merging, which consumes performance. Under normal circumstances, if there is no multi-version data, there is no need to trigger merging. If there is no trigger, when querying data, there will be multiple versions, which require the final keyword. Merge them when querying. If there are many queries, it will be very performance-consuming. At this time, you can choose to merge regularly.

select * from test.test_all final where id = 10000

For such multiple versions of tables, final can sometimes be avoided. For example, for deduplication, you can select distinct id from table instead of selecting distinct id from table final. This final can be omitted, etc.

Delete distributed table

Local tables and distributed tables need to be deleted:

drop table test.test_local on cluster default_cluster;
drop table test.test_all on cluster default_cluster;

Complex data type map

A certain field of some business data is of json type, and the number of keys is variable. At this time, it needs to be defined as map type in ck, which is more inclusive.

CREATE TABLE test.test_local2 on cluster default_cluster (
    `id` UInt64,
    `data` Map(String, String),
) ENGINE = ReplicatedMergeTree()
partition by dt
order by id;

CREATE TABLE test.test_all2 on cluster default_cluster as test.test_local2 ENGINE = Distributed('default_cluster', 'test', 'test_local2', sipHash64(id));

data Map(String, String): This defines a map type field, which can be checked like this when querying:

select data['key'] from test.test_all2 limit 5

For json, ck can also be solved.

select JSONExtractRaw('{"a": "hello", "b": [-100, 200.0, 300]}', 'b')
-- 结果
'[-100, 200.0, 300]'

select JSONExtractString('{"a": "hello", "b": [-100, 200.0, 300]}', 'a')
-- 结果
'hello'

More detailed official documents: clickhouse.com/docs/zh/sql…

About cost

In comparison, ck can save costs, the operation and maintenance said so. The disk is frequently expanded, but the computing performance is only expanded once.

Leave a Reply

Your email address will not be published. Required fields are marked *