A jounery of performance tuning KafkaStream application

In this post, we’ll discuss our journey of tuning the performance of a few Kafka and KafkaStream application.

Principals

Divide and conquer Breaking down overall performance target into individual components have been proved works very efficient. In a distributed system that has a dozen services, There could be many bottlenecks, which might interfere with each other; It is extremely challenging to test the hypothesis in such a dynamic environment.

By define performance target for individual components and strategically tackling “low hanging fruit”, We were able to archive significant improvement in a short period.

Measurement

It is essential to measure the system before even tuning it. Having measurement in place helps the team understand the current and historical performance of the system.

Usually, the performance tuning requirements are described in one of the following formats:

  • The 99th percentile of API response time must not be larger than x milliseconds(latency).

  • The system should completely process messages in n` seconds (throughput).

By creating customised metrics that measure latency and throughput, and create a dashboard from collected metric data, We’re able to test the hypothesis at a swift pace.

Observibility

With the measurement capability built-in, we do observe significant latency during performance testing. However, without comprehensive insights into the system, it is challenging to locate the bottlenecks.

There’re two types of metrics that are essential to have to build meaningful insights into an application:

  • infrastructure metrics e.g. CPU utilisation, Memory utilisation, Network IO, Disk IO

  • application metrics e.g. JVM metrics, Kafka producer, consumer, Kafka stream metrics etc.

Collecting these metrics and create a comprehensive dashboard for an application give us a single place to identify the potential bottleneck.

App Dashboard

There’re many options for metric collection and aggregation. We use AppDynamic to collect and visualise these metrics, it has been beneficial to be able to have the ability to look at these metrics retrospectively after we changed a few parameters.

Also, Kafka exports a load of metrics via JMX, people might be overwhelmed when looking to them. We found this blog post from datadog is the best of explaining the meaning and indication of some key metrics.

KafkaStream App Tuning

Offheap memory usage

Compared to an application that only uses plain old Kafka producer and consumer, KafkaStream application requires more resources.

We noticed that our KafkaStream application’s memory usage constantly growing and eventually used up all available memory.

It turns out that this is not a memory leak. Rocks DB used a large portion of off-heap memory (non-heap memory), even JVM heap memory usage is stabilised around 400MB, the RSS (Resident set size) of this application process continually growing.

KafkaStream creates local state store which is optionally backed up to changelog topic for stateful operation (e.g. join, groupBy, etc.).

The following formula illustrates memory requirements for a stream application with default rocks DB settings.

one_rocks_db_memory_footprint = write_buffer_size_mb  * write_buffer_count + block_cache_size_mb
 
# default one_rocks_db_memory_footprint is 16MB * 3 + 50MB = 98MB

over_all_footprint_of_one_windowed_state_store  =  partition_number * segment_count * one_rocks_db_memory_footprint 

# default over_all_footprint_of_one_windowed_state_store is 12 * 3 * 98MB = 3528MB

There’re eight windowed joins in our application, the off-heap memory is 8 * 3528MB = 28224MB.

By providing a customised rocksdb configure, we can limit the off-heap memory to less than 4GB, provided that we haven’t observed performance degradation.

Minimise overall latency

There’re multiple consumer and producers in a KafkaStream application. Any misconfigured consumer/producer properties could contribute to the overall delay.

  1. Consumer commit.interval.ms

    This value allows us to make a trade-off between throughput and latency. A larger value increases system throughput but adds “artificial delays”. A smaller value will lead to more frequent consumer offset commits.

  2. Producer batch.size and linger.ms The default value of batch.size is 16KB, and average record size is 6KB, which means the producer need perform a send operation for every 2~3 messages, plus there is inevitable network latency between our application and Kafka Broker. By increasing batch.size to 1MB and set linger.ms to 50, we reduced the network latency overhead per message, and observed improvements on throughput and latency.

Stateful Operation and Change log restore process

One of my favourite feature of KafkaStream is the stateful operation without losing the ability of horizontal scaling.

However, we could be caught by some unexpected behaviours without using this operation cautiously.

Kafka Stream creates a local state store for each partition to perform the stateful operation, with the option of back up data to changelog topic.

If retention.ms is not specified, broker config log.retention.ms(default 7 days) will be used as a retention period of changelog topic.

Choosing the right window size, changelog topic retention period is essential for avoiding a lengthy change log restore process.

Further Readings

Tracking the root cause of a topic co-partition issue

When joining streams in Kafka Stream application, one critical prerequisite is that topics need to be co-partitioned.

In this post, I’ll share my experience of tracking down an issue of topics not fulfilling co-partition guarantee, when using DefaultPartitioner and confluent’s KafkaAvroSerializer in topic key.

Background

  • Service A producing events to topic A, with key K.
  • Service B producing events to topic B, with the same key K.
  • Service C joins events from both topics A and topic B and produce a calculated result to topic C, with the same key K.
  • All topics have the same number of partitions 10.
  • These topics key schema registration strategy is TopicNameStrategy.
  • key serializer is KafkaAvroSerializer.
  • All three services use DefaultParitioner in the producer.

The Issue

Service A and B produces records with key k1 to corresponded topics, When service C create two KafkaStream from topic A and B, and join them together; it complains that no matching record for key k1.

Let’s revisit the definition of Copartition Requirements.

One additional item I would like to add to this requirements is:

For the same key, the record should be on the same partition across these co-partitioned topics.

Service C complains that it can not find a matching record for key k1 as it considers only records from the same partition on both topics, while in reality,k1 was sent to partition 2 of topic A but partition 0 of topic B.

Even the configuration of topic A and B, service A, B and C meet all the requirements defined above, why is that?

Uncover the root cause

The following diagram demonstrates how the partition number calculated. partition process

Firstly KafkaProducer delegate to KafkaAvroSerializer to serialize an Avro object to a byte array, the serialized result includes schema id.

//org.apache.kafka.clients.producer.KafkaProducer#doSend

//In current context, keySerializer is an instance of KafkaAvroSerializer
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());

Secondly, the keySerializer talk to schema registry to resvole the schema Id for topic key and append it to serialized bytes.

//io.confluent.kafka.serializers.AbstractKafkaAvroSerializer#serializeImpl

id = schemaRegistry.getId(subject, new AvroSchema(schema)); //as we're using TopicNameStrategy, the subject is "A-key" or "B-key"
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(MAGIC_BYTE);
out.write(ByteBuffer.allocate(idSize).putInt(id).array());

Thirdly, KafkaProducer hand the serialzation result to DefaultPartitioner to calculate partition number.

//org.apache.kafka.clients.producer.KafkaProducer#partition

partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster;

Lastly, DefaultPartitioner calculate parition number from serialized key bytes.

// org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition

// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

Root Cause

  • Key schema registration strategy is TopicNameStrategy, The same schema of K is registered with topic A and topic B separately; hence different schema Id is generated for topic A topic B.

  • When service A and service B serializing k1 , they append the different schema Id in the serialized bytes; Even though the Avro serialized value of k1 in two services are identical, the serialized key bytes are different.

  • DefaultParitioner calculates partition from serialized bytes, and yields different partition number in two services.

Solution

There’re two ways to address this problem; both have pros and cons.

  • Use other primitive type and Serde in topic key, e.g. StringSerializer, LongSerializer, etc.

The upside of this approach is co-partition is guaranteed as long as these requirements are met. The downside is losing the ability to evolve key schema. (who want to do this anyway?)

  • Use a customized partitioner

The upside of this approach is the ability to evolve key schema. The downside is additional complexity to services.

Conclusion

Using DefaultPartitioner and KafkaAvroSerializer in the topic key will make the topic fail to meet co-partition requirements.

Caveats

RecordNameStrategy won’t help in this case, as when there’s a need to upgrade the key schema, new schema id will be generated, which in turn generate different serialized bytes and yield different partition number. Even for the same key.

readable angular tests

In this post, I’ll provdes some tips to create readable angular test.

General Tips

Use describe or nested describe to structure specs

  • Use top level describe to describe a public function or a feature of the component to be tested.
  • Use nested describe to group specs which share the same context.Check the following example:
//top level describe
describe('#landing page', function() {
    //nested describe
    describe('logged in as admin', function() {
        beforeEach(function(){
            //context setup code for an admin user
        });
        it('should be able to see admin link', function() {});
        it('should be able to see greeting message', function() {});
    });
	describe('login as member', function() {
        beforeEach(function () {
            //context setup code for an memebr user
        });
        it('should not be able to see admin link', function() {});
        it('should be able to see greeting message', function() {});
	});
});

Put variable declaration close to usage.

It is quite common to see a bunch of local variables defined at the top of a spec then initialize them somewhere else in the code. It makes it super hard to understand the value and usage of those variables when going through the code.

Service Specs Tips

Put more business logic into Service.

Some services we created are just a very thin wrap of $http, Eg. :

angular
    .module('MyApp')
    .service('MyService', ['$http', 
    function ($http) {
        this.get = function (projectId, myId) {return $http.get('/projects/' + projectId + '/something/' + myId);};
    }]);

dead simple but is it worth to test it?

Encapsulating business logic into Service instead of Controller or Directive will lead to a higher test coverage of the code.

Directive Specs Tips

Directive is hard to test, here’re some tips:

Make directive as small as possible

One common mistake I observed is that directives have too much responsiblity.
Only have view/rendering related code should live in directive, any other business logic should be extracted into a service. If the “extra” code is related another view extract into another directive.

Limit the usage of $scope variables.

In most cases, a private variable is good enough. Compared to private variable, scope variables are public accessiable, which violate the encapsulation princaple. Also, scope variables are bound to directive lifecycle. check the following example.

angular.module('MyApp')
.controller('MyController', ['$scope', '$stateParams', 'MyService',
function ($scope, $stateParams, MyService) {
	var self = this;
	self.isAdmin = false;
	self.canCreateUser = false;
	self.myDomain = null;
	//...
	MyService.get($scope.myId).then(function(data){
	    self.myDomain = data;
	});
	//...
}
Simple template

Avoid bussiness logic in templates. Replace them with directive methods that express the intent of the logic, Eg. :

  //Before Refactor
  //directive
  $scope.isAdmin = ...;
  $scope.canCreateUser  = ...;
  $scope.isFriday = ...;

//template
<a ng-if="isOwner && canEditDocument && isFriday">click me</a>
//After refactor
//directive
$scope.isEditable = function(){
    return self.isAdmin && self.canCreateUser && self.isFriday
}

//template
<a ng-if="isEditable()">click me</a>
//Three scope variables were replaced by one scope function
Do not test directive template

Given we have made the template simple, we don’t need to test them. Testing directive methods(e.g. isEditable) is much easier to deal with doms in template, but provides enough confidence of quality.

Exposing event stream from monolith

背景

随着微服务概念的流行,微服务得到到越来越多的企业的青睐,开发团队开始构建自己的微服务,笔者了解到的企业的微服务之路大致如下:

始于单体架构(Monolith)

大多巨型应用一开始都是从一个很小的应用逐渐成长起来的,随着业务的增长,更多的功能被加入进来,几年下来,这个应用可能已经变成了庞大的代码库,单块架构的缺点已经严重拖累了团队的效率。

拥抱微服务(Mirocservices)

基于以上的问题,团队开始了解微服务实践,然而,拥抱微服务不可能一蹴而就,一个常见的方法就是分而治之:

  • 当一个新需求需要改动Monolith中的一个模块时,构建一个新的微服务,集成到Monolith。

  • 重复此步骤直至所有Monolith中的模块都被拆分到各个新的微服务中。

image

微服务的问题

在上面的过程中,首先要解决的问题就是如何把微服务与现有Monolith系统的耦合

在大多数情况,微服务都需要与Monolith中的各个模块交换数据来实现业务流程,一种最常见的方式是在Monolith中增加REST api,微服务通过同步调用这些REST api与Monolith集成,然而这也意味着这个微服务的交付周期与Monolith耦合起来了,而与Monothlith的交付周期解耦合本是创建微服务的一大驱动力。

因此,把微服务与Monolith应用的的同步依赖解耦就成为了最关键的步骤。

本文将通过这个实际案例讲述通过Streaming platform解耦微服务之间的同步依赖,在此过程中遇到的问题以及解决方案。

架构

笔者现在服务的公司的主要业务是面向建筑行业提供项目协作平台,其中核心的Monolith应用已经有20多年的历史,在过去几年时间里,我们围绕这个Monolith创建了大约十几个微服务,每个微服务都需要通过REST api与Monolith应用进行协作。

然而这种Monolith与微服务之间的同步依赖也存在着其他问题,整个系统的扩展性并没有得到提升,微服务所承担的系统压力仍然会被传递到Monolith上,我们选择的方式,移除同步依赖

我们把Monolith应用的数据以Event的形式发布到Kafka topic上, 微服务订阅这个topic获取历史数据和实时数据,在本地数据库中构建适合自身业务场景的projection,以前需要同步调用Monolith应用API的地方,都可以被替换为使用本地的projection,这样就消除了同步调用依赖。

image

如图所示,把Monolith应用数据发布为event stream需要以下几个步骤:

  • 对Monolith应用中的业务流程进行建模,得到一组表述业务事件的数据模型
  • 在各个业务流程中植入生成业务事件的代码
    • 生成业务事件每个事件被分配一个唯一的,线性增长的序列号(参考数据库中的sequence)
    • 作为在业务流程Transaction的一部分,持久化到domain_events表中
  • Source connector负责:
    • 监视domain_events表中的数据,一旦有新数据产生就读取,发布到Kafka集群中。
    • 维护当前已经发布的数据的序列号,并及时持久化序列号,尽量减少重复发布的数据。

Event是如何生成的?

从上图可以看出,前两步把Monolith系统中数据的变化以event的形式持久化下来,做为其他系统的输入,这个其实跟数据库领域中CDC的概念非常相似,不同的是,这里我们对捕捉领域模型的变化,而CDC是捕捉对数据库表的变化。Confluent开源了一个Postgres的CDC实现

Event是如何发布的?

Kafka社区已经有了很多的Kafka connector,(包括Source Connector和Sink Connector)。笔者强烈建议首先在这里寻找适合项目场景的connector, 如果找不到合适的,再去尝试实现自己的connector。

在笔者的项目里,Monolith使用的是Microsoft SQLServer, 数据库表的设计并不能很好的与现有的领域模型对应,上述的任何Source Connector都难以胜任这种场景,于是我们团队自己实现了一个 bridge 组件用于把事件数据从ms sqlserver中发送到kafka集群中。

Event是如何被消费的?

在实现event stream的消费者时,我们使用了Kafka Stream,这个库提供event stream的抽象,开发者无需关心offset管理这种底层逻辑。

值得一提的是,由于一个消息可能会出现在event stream的多个位置(kafka的at-least-once),消费者的实现必须是幂等的(idempotent)。

挑战

在实施上述架构的过程中,我们遇到了以下挑战:

数据的高可靠性 (dualbilty)

笔者所在的项目,任何数据的丢失意味着event stream上的数据与源数据永久地不一致,这对event stream的消费者来说是不可接受的。

为了保证发布到Kafka集群上的数据不会丢失,在N个broker的kafka集群中, 关于log replication的配置如下:

default.replication.factor = N #同步replica的数量 + 异步replica的数量
min.insync.replicas = N-1 #同步replica的数量

此配置的目的在保证数据一致性的前提下,Kafka集群在失去ceil(N-1/2)个节点后仍然能够接受数据读写操作。

数据的顺序保证 (order guarentee)

Kafka仅在单个partition内保证顺序,因此,挑选一个合适的partition key 极为关键。

数据的低冗余 (low duplication)

在实现bridge时,及时计算已发送成功(Kafka broker发回了成功回执)消息的最小序列号极为关键,当bridge因为某种原因停止工作,重新启动bridge后,个最小序列号就是bridge失败重试的起点。

我们通过以下手段减少数据冗余:

  • 限制in-flight(正在向Kafka broker发送并且等待回执)的message数量
  • 及时计算被已发送成功的消息的最小序列号并保存到持久化设备。

为什么用Kafka?

业务事件被保存到domain_events表之后,就需要发布到event stream了, 在选择event stream的载体时,数据的高可靠性,顺序保证,系统的可伸缩性被做为重要的衡量标准。

可靠性

Kafka采用了log based storage, 发送到broker的数据被添加在日志文件的尾部,由于避免了数据存储中昂贵的”查询修改“操作,使得其有及高的存储性能。

并且Kafka的replication保证了所有保存到Kafka Broker数据日志的数据都会有多个备份,从而降低了数据丢失的概率。

顺序保证

Kafka的数据日志支持数据切片(partition),在同一个partition内部,数据被存储的数码就是broker收到该数据的顺序,因此,选择合适 的partition key可以保证在数据的顺序。

可伸缩性

Kafka broker集群支持增加或者减少节点,我们可以根据系统容量调节集群的节点数量,Kafka可以自动地把partition在broker节点之间重新分配

对流处理平台的友好性

Confluent的Streaming data platform中介绍了Kafka作为系统间数据集成的应用场景:

The streaming platform captures streams of events or data changes and 
feeds these to other data systems such as relational databases, 
key-value stores, Hadoop, or the data warehouse. 
This is a streaming version of existing data movement technologies such as ETL systems.

而Kafka把过去和未来的数据统一在同一种api Stream<Key, Message>下,从而使得使用Kafka做流处理变得非常自然。

消息的持久性

在笔者的项目中,Kafka topic中的消息的有效期是Integer.MAX_VALUE小时(约等于245146年),event stream的消费者可以自由的设置其起始位置,处理event stream上的所有数据。

总结

Monolith持续不断地把业务事件发布到Kafka承载的event stream上, 微服务通过订阅event stream,在本地构建应用所需要的projection,再与应用自己的数据库聚合对外提供服务, 这样,我们就消灭了微服务与Monolith之间的同步依赖,转而通过event stream在微服务与Monolith传递数据,更进一步,微服务与微服务之间数据传输也可以通过Kafka承载的event stream实现。

Things need to be consider when building microservice

TLDR;

IT industry never run out of buzz words, big data, SOA, Cloud, IoT, reactive. The hostest buzz word of 2015 have to be microservice and docker. In this posts, I’ll share my experinence of few projects I worked on over last year, which are dockerised microservice. I won’t talk about what is microservice, why you should adopt microservice.

Outlines

To build a microservice, you’d better to think about provision and deployment at the beginning of your project.

Provision

People might get confused about provision and deployment.

In my opinon, provision is all about set up required infrastructures for your service, which including:

  • Virtual Machine
  • Loadbanlacner
  • Reverse Proxy
  • Database
  • Message Queue
  • Network Access Controll
  • etc.

the goal of provision is a envrionment which a artifact can be deployed to.

While deployment is deploying a specific version artifact to an target environment(which has been provisioned before).

the goal of deployment is:

  • a updated service up and running in health state. (deploment succeed)
  • a non-updated service up and running in health state.(deploment failed)

independent lifecycle is the most important feature of microservices, independent lifecycle means you need to manage your service by yourselves.

To achieve the goal of provision, you need to answer the following questions:

  • What is the target machine of your service? a public cloud service provider(e.g. AWS, Asure) or a managed private cloud or just VMs in your own datacenter?

  • What is the network requirement? is it a public facing service or an internal service which sits behind a firewall?

  • Is your service depends on any other infrastructrure? e.g. a message queue, database, do you want a provsion these infrastructures by yourself, or just use a service provided by cloud service provider(e.g. RDS, SQS from AWS)? This may affect your decision.

  • How to provision the environment? Once you’ve chosen target environment, you need to decide how to rovision the target environment? Options like ansible, cloudformation, chef, puppet, all of there have pros and cons, choose one fits your requirements.

Deployment

As I mentioned in previous section, deployment is all about deploying a specific version artifact to an target environment. to have your service deploy to target environment, you need to answer the following questions:

  • what is the deployable artifact of your service?
  • what is the automation tool of deployment?
  • can I roll back to previous version if I found a critical bug in the version just deployed?
what is the deployable artifact of your service?

in a Continous Delivery(aka. CD) world, an artifact is an result of of packaging job, which is part of a CD pipeline. artifact can be deployed to a environment and providing service.

There’re many options for artifacts, such as

  • source code
  • amazon machine image(aka, AMI)
  • rpm/deb package
  • docker image all of them have pros and cons, I’ll give a brief overview on these options.
source code

in ruby world, there’re some tools like mina, capistrano that allow you to deploy source code into target environment.

  • The Pros:

    • The advantage of this approach is that simplicity. All these tool does is provide a nicer DSL which allow you to copy source code into target machine and start them, also, they maintained few old versions on target machine, so that you can easily roll back to an old version.
  • The Cons:
    • The disadvantage of this approachs is also simplicity, if your application have any native dependencies(e.g. postgres), these tools can not help, you have to update your provision script to ensure the target environment has required native dependencies.

    • in another word, it is hard to define a clear seperation line between provison and deployment script if you’re using tool.

  • Tooling:
    • mina/capistrano in ruby
    • shell scripts
  • Conclusion: if you favor simplicity over maintaince, use these tools, otherwise, you should consider other options.
Amazon Machine Image

This is a quite intesresting approach, the main idea behind it is, Given a base amazon image, spin up a EC2 instance, install your service on it as an auto started service, create a new AMI from the EC2 instance, The newly created image is artifact.

  • The Pros:
    • By baking your service into a machine image, spin up a new machine, roll back to a old version can be finished automaticly within few minutes.
    • By baking your service and its dependence into a machine images, Consistency across difference environment can be achieved much easier.
  • The Cons:
    • Bakcing a new image could be time consuming.
    • Make your service tightly coupled to cloud vendor’s approach. in my last projects, we’d encountered a lot of issues while Amazon start introduce a new virtual technique - HVM(hardware assisted virtual), while our projects sitll use a lot paravirtual VMs.
  • Tooling:
Platform specific distribution package(e.g. rpm/deb)

The idea is creating a platform-specific package, which contains your service, also a manifest file which including dependence declaration.

  • The Pros:
    • Leverage platform’s package management capability
  • The Cons:
    • TODO
Docker Image as artifact

The idea is baking your service into docker image and distribute it into target environment. The time to build a ship docker images it quite fast, and start a dockerized application is even faster.

Disaster Recovery

  I am prepared for the worst, but hope for the best. 

                              -Benjamin Disraeli

Bad things can happened at any time, to mitigate risks of your service, Disaster Recovery should be considered as early as possible.

Disaster includes Database Corruption, Data Center outage, etc

Each service should have their own disaster recovery strategy, but there’re some principles to follow.

in my prevous project, we as a team, come out with the follow principles.

For asynchrous backend job service:
  • jobs should be idempotent, replayable
  • execution metric & monitor should be used to check system healthy.
For web application
  • applicaton should be stateless, which will make scale easier.
  • re-creatable environment.
  • automated deployment process.

Log Collection

Image your company have hundreds of services in production environment, how would you trace a specific request across these systems? login to each box and grep in logs?

A comon approach is creating a central log management system. hundreds of log collection agents running along side with services on each host, collecting logs generated by services and send back to central log management system.

Another tips is, in a large system which composed of many small services, use a generated gloabl unique transaction id across all services for each user request, this will made tracking asynchronous message much easier.

This is also a good invesment from the company’s perspective. having a central log management system, the company have a gold mine of user behaviour.

Monitoring & Alert

As Werner Vogels, CTO of Amazon, says: “you build it, you run it.” microservice developers shoud have more insights of the systems they built, as it is their own responsibility to operate it.

Metric

Many cloud service provider have already provided some built in metrics, e.g. CPU, Memory, Bandwidth usage. You can also create your own custom metrics. But you’re not tied to your cloud service, there’re many 3rd-party tooling support in this area, just add a client libraray and api key in your service, you’re able to send metrics to their server, then you’ll get a nice dashboard for your service.

With these metrices, you can even define auto scalling policy

Alert

You want to be notified when bad things happened on your service. again, there’re many 3rd-party tooling support in this area, integrate with these service are pretty straightforward. All you need to do is to define threshold and how you want to receive alert.

Conclusion

Again, microservice isn’t silver bullet, you need to pay significant cost to adopt microservice, be aware of these things when you start consider building your next system in a microservice style.