If you need data analytics and the size of your data is big your regular database may not suite your needs and it’s wise to unload the data processing to the ClickHouse. One of ClickHouse requirements is to run insert in batches, which can be done manually or using Kafka.
Preparation
We are going to use docker-compose to run kafka & clickhouse. Here is the minimal docker compose file with volumes configured in a way to persist data among restarts:
# docker-compose.yml
version: '2'
services:
clickhouse:
image: bitnami/clickhouse:latest
environment:
- ALLOW_EMPTY_PASSWORD=yes
ports:
- 8123:8123
deploy:
resources:
limits:
cpus: '8'
memory: 12000M
volumes:
- ./.clickhouse/:/bitnami/clickhouse
kafka:
image: bitnami/kafka:latest
ports:
- 9092:9092
- 9093:9093
- 9094:9094
deploy:
resources:
limits:
cpus: '1'
memory: 2000M
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
volumes:
- ./.kafka/:/bitnami/kafka
Having this file on a local machine you may start the services container by docker compose -f docker-compose.yml
.
Imagine we are parsing ads from a website and want to log what we have parsed. Here we create a list of dictionaries, which will be pickled later:
COUNTRIES_ABBREVIATIONS = [country.alpha_2 for country in pycountry.countries]
records = []
for i in range(args.number_of_records):
new_record = dict(
id=uuid.uuid4(),
ads_count=random.randint(1, 5000),
pages_listed=random.randint(1, 100),
parsed_at=now - datetime.timedelta(days=random.randint(0, 365)),
country=random.choice(COUNTRIES_ABBREVIATIONS),
group_id=uuid.uuid4()
)
records.append(new_record)
pickle.dump(records, open(f"records_{len(records) / 1_000_000}M.pkl", 'wb'))
Load the data into Kafka by running the following script:
import argparse
import datetime
import json
import pickle
import uuid
from json import JSONEncoder
from confluent_kafka.admin import AdminClient
from confluent_kafka.cimpl import NewTopic, Producer
class JSONEncoderEx(JSONEncoder):
def default(self, obj):
if isinstance(obj, uuid.UUID):
return str(obj)
if isinstance(obj, datetime.datetime):
return obj.isoformat()
return super().default(obj)
def create_topic(bootstrap_server, topic, partitions):
client = AdminClient({'bootstrap.servers': bootstrap_server})
new_topic = NewTopic(topic, num_partitions=partitions, replication_factor=1)
client.create_topics(new_topics=[new_topic])
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('file', type=str, help='path to records file')
parser.add_argument('topic', type=str, help='target kafka topic')
parser.add_argument('partitions', type=int, help='kafka topic partitions count')
args = parser.parse_args()
host = 'localhost:9094'
with open(args.file, 'rb') as file:
records = pickle.load(file)
create_topic(host, args.topic, args.partitions)
producer = Producer({'bootstrap.servers': host})
for record in records:
msg = json.dumps(record, cls=JSONEncoderEx).encode("utf-8")
producer.poll(0)
try:
producer.produce(args.topic, msg)
except BufferError:
producer.flush()
producer.produce(args.topic, msg)
producer.flush()
Running the script from above is going to populate a kafka topic. And then we need to get this data to the ClickHouse.
Populating ClickHouse table from a Kafka Topic
ClickHouse allows us to read kafka records in the consumer mode by using it’s Kafka engine.
create table kafka_data (
json String
) engine = Kafka settings
kafka_broker_list = 'kafka:9092',
kafka_topic_list = '30_million_p2',
kafka_group_name = 'consumer-group-1',
kafka_format = 'JSONAsString',
kafka_thread_per_consumer = 4,
kafka_num_consumers = 2
;
In order to read the data from kafka we need to specify a topic a consumer group and a broker list. Tweaking kafka_thread_per_consumer
and kafka_num_consumers
may improve data injection performance.
Now we have access to the kafka data through ClickHouse. However the tricky thing here is that kafka is a streaming application, once the consumer group has read the record the offset will be moved forward and we won’t be able to read it again. You can read the data, but only once by running the following (try, but never use):
select * from kafka_data
settings stream_like_engine_allow_direct_select = 1;
We need to be put a data into a separate table and this can be achieved by a materialized view functionality. Create a target table first:
create table parse_data (
id UUID,
group_id UUID,
parsed_at DateTime,
ads_count INTEGER,
pages_listed INTEGER,
country String
) engine = MergeTree()
order by parsed_at;
And the the materialized view to populate the data:
create materialized view consumer to parse_data as
select
JSONExtractString (json, 'id') as id,
JSONExtractString (json, 'group_id') as group_id,
JSONExtractInt (json, 'ads_count') as ads_count,
parseDateTimeBestEffortOrNull(JSONExtractString(json, 'parsed_at')) as parsed_at,
JSONExtractInt (json, 'pages_listed') as pages_listed,
JSONExtractString (json, 'country') as country
from kafka_data;
Wait a couple minutes until the data is in the table. On my machine 30 million of records were ingested in 101 seconds. Which turned out to be insanely cool since pickle.load
takes 147 seconds. Now data is available for you further analysis.
Tricky Things Not Mentioned
- In the kafka docker configuration there is a two separate properties — advertised listeners and listeners, adverstised listeners is a subset of listeners, when you connect to kafka you specify bootstrap servers, then bootstrap servers return advertised listeners to you which later will be used to refer to the broker. Our advertised listeners
KAFKA_CFG_ADVERTISED_LISTENERS
env is set toPLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
which means that bykafka
hostname we are going to listen on port 9092 and onlocalhost
we are going to listen on 9094, which means that outside of the container it’s only possible to access kafka from the port 9094. While within container bykafka
hostname we are going to connect on the port 9092. That’s why in our python script we use port 9094, since the script is run from the local machine. - In kafka you can have a maximum of 1 consumer per partition, so limit consumer count in Kafka engine to the partitions count. The second factor is threads per consumer count — on my laptop turned out the best performance was when using 1 partition and 8 threads per consumer, given 8 CPU’s given to the container (would love to hear your result on an optimal way)
- Kafka Engine operates through the kafka consumer paradigm, when you read the record, record is consumed and the offset is moved forward, offset can be reset manually, but if you are going to play with it you can use a new group each time you try to import data
- Kafka population script looks a little bit more complicated then it should — this is needed to produce a lot of records
Conclusion
Now you have a docker-compose file to create a hassle free clickhouse kafka setup (without zookeeper and clickhouse keeper) to test your ideas. Also we figured out that ClickHouse is insanely fast.