When joining streams in Kafka Stream application, one critical prerequisite is that topics need to be co-partitioned.

In this post, I’ll share my experience of tracking down an issue of topics not fulfilling co-partition guarantee, when using DefaultPartitioner and confluent’s KafkaAvroSerializer in topic key.

Background

  • Service A producing events to topic A, with key K.
  • Service B producing events to topic B, with the same key K.
  • Service C joins events from both topics A and topic B and produce a calculated result to topic C, with the same key K.
  • All topics have the same number of partitions 10.
  • These topics key schema registration strategy is TopicNameStrategy.
  • key serializer is KafkaAvroSerializer.
  • All three services use DefaultParitioner in the producer.

The Issue

Service A and B produces records with key k1 to corresponded topics, When service C create two KafkaStream from topic A and B, and join them together; it complains that no matching record for key k1.

Let’s revisit the definition of Copartition Requirements.

One additional item I would like to add to this requirements is:

For the same key, the record should be on the same partition across these co-partitioned topics.

Service C complains that it can not find a matching record for key k1 as it considers only records from the same partition on both topics, while in reality,k1 was sent to partition 2 of topic A but partition 0 of topic B.

Even the configuration of topic A and B, service A, B and C meet all the requirements defined above, why is that?

Uncover the root cause

The following diagram demonstrates how the partition number calculated. partition process

Firstly KafkaProducer delegate to KafkaAvroSerializer to serialize an Avro object to a byte array, the serialized result includes schema id.

//org.apache.kafka.clients.producer.KafkaProducer#doSend

//In current context, keySerializer is an instance of KafkaAvroSerializer
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());

Secondly, the keySerializer talk to schema registry to resvole the schema Id for topic key and append it to serialized bytes.

//io.confluent.kafka.serializers.AbstractKafkaAvroSerializer#serializeImpl

id = schemaRegistry.getId(subject, new AvroSchema(schema)); //as we're using TopicNameStrategy, the subject is "A-key" or "B-key"
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(MAGIC_BYTE);
out.write(ByteBuffer.allocate(idSize).putInt(id).array());

Thirdly, KafkaProducer hand the serialzation result to DefaultPartitioner to calculate partition number.

//org.apache.kafka.clients.producer.KafkaProducer#partition

partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster;

Lastly, DefaultPartitioner calculate parition number from serialized key bytes.

// org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition

// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

Root Cause

  • Key schema registration strategy is TopicNameStrategy, The same schema of K is registered with topic A and topic B separately; hence different schema Id is generated for topic A topic B.

  • When service A and service B serializing k1 , they append the different schema Id in the serialized bytes; Even though the Avro serialized value of k1 in two services are identical, the serialized key bytes are different.

  • DefaultParitioner calculates partition from serialized bytes, and yields different partition number in two services.

Solution

There’re two ways to address this problem; both have pros and cons.

  • Use other primitive type and Serde in topic key, e.g. StringSerializer, LongSerializer, etc.

The upside of this approach is co-partition is guaranteed as long as these requirements are met. The downside is losing the ability to evolve key schema. (who want to do this anyway?)

  • Use a customized partitioner

The upside of this approach is the ability to evolve key schema. The downside is additional complexity to services.

Conclusion

Using DefaultPartitioner and KafkaAvroSerializer in the topic key will make the topic fail to meet co-partition requirements.

Caveats

RecordNameStrategy won’t help in this case, as when there’s a need to upgrade the key schema, new schema id will be generated, which in turn generate different serialized bytes and yield different partition number. Even for the same key.

In this post, I’ll provdes some tips to create readable angular test.

General Tips

Use describe or nested describe to structure specs

  • Use top level describe to describe a public function or a feature of the component to be tested.
  • Use nested describe to group specs which share the same context.Check the following example:
//top level describe
describe('#landing page', function() {
    //nested describe
    describe('logged in as admin', function() {
        beforeEach(function(){
            //context setup code for an admin user
        });
        it('should be able to see admin link', function() {});
        it('should be able to see greeting message', function() {});
    });
	describe('login as member', function() {
        beforeEach(function () {
            //context setup code for an memebr user
        });
        it('should not be able to see admin link', function() {});
        it('should be able to see greeting message', function() {});
	});
});

Put variable declaration close to usage.

It is quite common to see a bunch of local variables defined at the top of a spec then initialize them somewhere else in the code. It makes it super hard to understand the value and usage of those variables when going through the code.

Service Specs Tips

Put more business logic into Service.

Some services we created are just a very thin wrap of $http, Eg. :

angular
    .module('MyApp')
    .service('MyService', ['$http', 
    function ($http) {
        this.get = function (projectId, myId) {return $http.get('/projects/' + projectId + '/something/' + myId);};
    }]);

dead simple but is it worth to test it?

Encapsulating business logic into Service instead of Controller or Directive will lead to a higher test coverage of the code.

Directive Specs Tips

Directive is hard to test, here’re some tips:

Make directive as small as possible

One common mistake I observed is that directives have too much responsiblity.
Only have view/rendering related code should live in directive, any other business logic should be extracted into a service. If the “extra” code is related another view extract into another directive.

Limit the usage of $scope variables.

In most cases, a private variable is good enough. Compared to private variable, scope variables are public accessiable, which violate the encapsulation princaple. Also, scope variables are bound to directive lifecycle. check the following example.

angular.module('MyApp')
.controller('MyController', ['$scope', '$stateParams', 'MyService',
function ($scope, $stateParams, MyService) {
	var self = this;
	self.isAdmin = false;
	self.canCreateUser = false;
	self.myDomain = null;
	//...
	MyService.get($scope.myId).then(function(data){
	    self.myDomain = data;
	});
	//...
}
Simple template

Avoid bussiness logic in templates. Replace them with directive methods that express the intent of the logic, Eg. :

  //Before Refactor
  //directive
  $scope.isAdmin = ...;
  $scope.canCreateUser  = ...;
  $scope.isFriday = ...;

//template
<a ng-if="isOwner && canEditDocument && isFriday">click me</a>
//After refactor
//directive
$scope.isEditable = function(){
    return self.isAdmin && self.canCreateUser && self.isFriday
}

//template
<a ng-if="isEditable()">click me</a>
//Three scope variables were replaced by one scope function
Do not test directive template

Given we have made the template simple, we don’t need to test them. Testing directive methods(e.g. isEditable) is much easier to deal with doms in template, but provides enough confidence of quality.

背景

随着微服务概念的流行,微服务得到到越来越多的企业的青睐,开发团队开始构建自己的微服务,笔者了解到的企业的微服务之路大致如下:

始于单体架构(Monolith)

大多巨型应用一开始都是从一个很小的应用逐渐成长起来的,随着业务的增长,更多的功能被加入进来,几年下来,这个应用可能已经变成了庞大的代码库,单块架构的缺点已经严重拖累了团队的效率。

拥抱微服务(Mirocservices)

基于以上的问题,团队开始了解微服务实践,然而,拥抱微服务不可能一蹴而就,一个常见的方法就是分而治之:

  • 当一个新需求需要改动Monolith中的一个模块时,构建一个新的微服务,集成到Monolith。

  • 重复此步骤直至所有Monolith中的模块都被拆分到各个新的微服务中。

image

微服务的问题

在上面的过程中,首先要解决的问题就是如何把微服务与现有Monolith系统的耦合

在大多数情况,微服务都需要与Monolith中的各个模块交换数据来实现业务流程,一种最常见的方式是在Monolith中增加REST api,微服务通过同步调用这些REST api与Monolith集成,然而这也意味着这个微服务的交付周期与Monolith耦合起来了,而与Monothlith的交付周期解耦合本是创建微服务的一大驱动力。

因此,把微服务与Monolith应用的的同步依赖解耦就成为了最关键的步骤。

本文将通过这个实际案例讲述通过Streaming platform解耦微服务之间的同步依赖,在此过程中遇到的问题以及解决方案。

架构

笔者现在服务的公司的主要业务是面向建筑行业提供项目协作平台,其中核心的Monolith应用已经有20多年的历史,在过去几年时间里,我们围绕这个Monolith创建了大约十几个微服务,每个微服务都需要通过REST api与Monolith应用进行协作。

然而这种Monolith与微服务之间的同步依赖也存在着其他问题,整个系统的扩展性并没有得到提升,微服务所承担的系统压力仍然会被传递到Monolith上,我们选择的方式,移除同步依赖

我们把Monolith应用的数据以Event的形式发布到Kafka topic上, 微服务订阅这个topic获取历史数据和实时数据,在本地数据库中构建适合自身业务场景的projection,以前需要同步调用Monolith应用API的地方,都可以被替换为使用本地的projection,这样就消除了同步调用依赖。

image

如图所示,把Monolith应用数据发布为event stream需要以下几个步骤:

  • 对Monolith应用中的业务流程进行建模,得到一组表述业务事件的数据模型
  • 在各个业务流程中植入生成业务事件的代码
    • 生成业务事件每个事件被分配一个唯一的,线性增长的序列号(参考数据库中的sequence)
    • 作为在业务流程Transaction的一部分,持久化到domain_events表中
  • Source connector负责:
    • 监视domain_events表中的数据,一旦有新数据产生就读取,发布到Kafka集群中。
    • 维护当前已经发布的数据的序列号,并及时持久化序列号,尽量减少重复发布的数据。

Event是如何生成的?

从上图可以看出,前两步把Monolith系统中数据的变化以event的形式持久化下来,做为其他系统的输入,这个其实跟数据库领域中CDC的概念非常相似,不同的是,这里我们对捕捉领域模型的变化,而CDC是捕捉对数据库表的变化。Confluent开源了一个Postgres的CDC实现

Event是如何发布的?

Kafka社区已经有了很多的Kafka connector,(包括Source Connector和Sink Connector)。笔者强烈建议首先在这里寻找适合项目场景的connector, 如果找不到合适的,再去尝试实现自己的connector。

在笔者的项目里,Monolith使用的是Microsoft SQLServer, 数据库表的设计并不能很好的与现有的领域模型对应,上述的任何Source Connector都难以胜任这种场景,于是我们团队自己实现了一个 bridge 组件用于把事件数据从ms sqlserver中发送到kafka集群中。

Event是如何被消费的?

在实现event stream的消费者时,我们使用了Kafka Stream,这个库提供event stream的抽象,开发者无需关心offset管理这种底层逻辑。

值得一提的是,由于一个消息可能会出现在event stream的多个位置(kafka的at-least-once),消费者的实现必须是幂等的(idempotent)。

挑战

在实施上述架构的过程中,我们遇到了以下挑战:

数据的高可靠性 (dualbilty)

笔者所在的项目,任何数据的丢失意味着event stream上的数据与源数据永久地不一致,这对event stream的消费者来说是不可接受的。

为了保证发布到Kafka集群上的数据不会丢失,在N个broker的kafka集群中, 关于log replication的配置如下:

default.replication.factor = N #同步replica的数量 + 异步replica的数量
min.insync.replicas = N-1 #同步replica的数量

此配置的目的在保证数据一致性的前提下,Kafka集群在失去ceil(N-1/2)个节点后仍然能够接受数据读写操作。

数据的顺序保证 (order guarentee)

Kafka仅在单个partition内保证顺序,因此,挑选一个合适的partition key 极为关键。

数据的低冗余 (low duplication)

在实现bridge时,及时计算已发送成功(Kafka broker发回了成功回执)消息的最小序列号极为关键,当bridge因为某种原因停止工作,重新启动bridge后,个最小序列号就是bridge失败重试的起点。

我们通过以下手段减少数据冗余:

  • 限制in-flight(正在向Kafka broker发送并且等待回执)的message数量
  • 及时计算被已发送成功的消息的最小序列号并保存到持久化设备。

为什么用Kafka?

业务事件被保存到domain_events表之后,就需要发布到event stream了, 在选择event stream的载体时,数据的高可靠性,顺序保证,系统的可伸缩性被做为重要的衡量标准。

可靠性

Kafka采用了log based storage, 发送到broker的数据被添加在日志文件的尾部,由于避免了数据存储中昂贵的”查询修改“操作,使得其有及高的存储性能。

并且Kafka的replication保证了所有保存到Kafka Broker数据日志的数据都会有多个备份,从而降低了数据丢失的概率。

顺序保证

Kafka的数据日志支持数据切片(partition),在同一个partition内部,数据被存储的数码就是broker收到该数据的顺序,因此,选择合适 的partition key可以保证在数据的顺序。

可伸缩性

Kafka broker集群支持增加或者减少节点,我们可以根据系统容量调节集群的节点数量,Kafka可以自动地把partition在broker节点之间重新分配

对流处理平台的友好性

Confluent的Streaming data platform中介绍了Kafka作为系统间数据集成的应用场景:

The streaming platform captures streams of events or data changes and 
feeds these to other data systems such as relational databases, 
key-value stores, Hadoop, or the data warehouse. 
This is a streaming version of existing data movement technologies such as ETL systems.

而Kafka把过去和未来的数据统一在同一种api Stream<Key, Message>下,从而使得使用Kafka做流处理变得非常自然。

消息的持久性

在笔者的项目中,Kafka topic中的消息的有效期是Integer.MAX_VALUE小时(约等于245146年),event stream的消费者可以自由的设置其起始位置,处理event stream上的所有数据。

总结

Monolith持续不断地把业务事件发布到Kafka承载的event stream上, 微服务通过订阅event stream,在本地构建应用所需要的projection,再与应用自己的数据库聚合对外提供服务, 这样,我们就消灭了微服务与Monolith之间的同步依赖,转而通过event stream在微服务与Monolith传递数据,更进一步,微服务与微服务之间数据传输也可以通过Kafka承载的event stream实现。

TLDR;

IT industry never run out of buzz words, big data, SOA, Cloud, IoT, reactive. The hostest buzz word of 2015 have to be microservice and docker. In this posts, I’ll share my experinence of few projects I worked on over last year, which are dockerised microservice. I won’t talk about what is microservice, why you should adopt microservice.

Outlines

To build a microservice, you’d better to think about provision and deployment at the beginning of your project.

Provision

People might get confused about provision and deployment.

In my opinon, provision is all about set up required infrastructures for your service, which including:

  • Virtual Machine
  • Loadbanlacner
  • Reverse Proxy
  • Database
  • Message Queue
  • Network Access Controll
  • etc.

the goal of provision is a envrionment which a artifact can be deployed to.

While deployment is deploying a specific version artifact to an target environment(which has been provisioned before).

the goal of deployment is:

  • a updated service up and running in health state. (deploment succeed)
  • a non-updated service up and running in health state.(deploment failed)

independent lifecycle is the most important feature of microservices, independent lifecycle means you need to manage your service by yourselves.

To achieve the goal of provision, you need to answer the following questions:

  • What is the target machine of your service? a public cloud service provider(e.g. AWS, Asure) or a managed private cloud or just VMs in your own datacenter?

  • What is the network requirement? is it a public facing service or an internal service which sits behind a firewall?

  • Is your service depends on any other infrastructrure? e.g. a message queue, database, do you want a provsion these infrastructures by yourself, or just use a service provided by cloud service provider(e.g. RDS, SQS from AWS)? This may affect your decision.

  • How to provision the environment? Once you’ve chosen target environment, you need to decide how to rovision the target environment? Options like ansible, cloudformation, chef, puppet, all of there have pros and cons, choose one fits your requirements.

Deployment

As I mentioned in previous section, deployment is all about deploying a specific version artifact to an target environment. to have your service deploy to target environment, you need to answer the following questions:

  • what is the deployable artifact of your service?
  • what is the automation tool of deployment?
  • can I roll back to previous version if I found a critical bug in the version just deployed?
what is the deployable artifact of your service?

in a Continous Delivery(aka. CD) world, an artifact is an result of of packaging job, which is part of a CD pipeline. artifact can be deployed to a environment and providing service.

There’re many options for artifacts, such as

  • source code
  • amazon machine image(aka, AMI)
  • rpm/deb package
  • docker image all of them have pros and cons, I’ll give a brief overview on these options.
source code

in ruby world, there’re some tools like mina, capistrano that allow you to deploy source code into target environment.

  • The Pros:

    • The advantage of this approach is that simplicity. All these tool does is provide a nicer DSL which allow you to copy source code into target machine and start them, also, they maintained few old versions on target machine, so that you can easily roll back to an old version.
  • The Cons:
    • The disadvantage of this approachs is also simplicity, if your application have any native dependencies(e.g. postgres), these tools can not help, you have to update your provision script to ensure the target environment has required native dependencies.

    • in another word, it is hard to define a clear seperation line between provison and deployment script if you’re using tool.

  • Tooling:
    • mina/capistrano in ruby
    • shell scripts
  • Conclusion: if you favor simplicity over maintaince, use these tools, otherwise, you should consider other options.
Amazon Machine Image

This is a quite intesresting approach, the main idea behind it is, Given a base amazon image, spin up a EC2 instance, install your service on it as an auto started service, create a new AMI from the EC2 instance, The newly created image is artifact.

  • The Pros:
    • By baking your service into a machine image, spin up a new machine, roll back to a old version can be finished automaticly within few minutes.
    • By baking your service and its dependence into a machine images, Consistency across difference environment can be achieved much easier.
  • The Cons:
    • Bakcing a new image could be time consuming.
    • Make your service tightly coupled to cloud vendor’s approach. in my last projects, we’d encountered a lot of issues while Amazon start introduce a new virtual technique - HVM(hardware assisted virtual), while our projects sitll use a lot paravirtual VMs.
  • Tooling:
Platform specific distribution package(e.g. rpm/deb)

The idea is creating a platform-specific package, which contains your service, also a manifest file which including dependence declaration.

  • The Pros:
    • Leverage platform’s package management capability
  • The Cons:
    • TODO
Docker Image as artifact

The idea is baking your service into docker image and distribute it into target environment. The time to build a ship docker images it quite fast, and start a dockerized application is even faster.

Disaster Recovery

  I am prepared for the worst, but hope for the best. 

                              -Benjamin Disraeli

Bad things can happened at any time, to mitigate risks of your service, Disaster Recovery should be considered as early as possible.

Disaster includes Database Corruption, Data Center outage, etc

Each service should have their own disaster recovery strategy, but there’re some principles to follow.

in my prevous project, we as a team, come out with the follow principles.

For asynchrous backend job service:
  • jobs should be idempotent, replayable
  • execution metric & monitor should be used to check system healthy.
For web application
  • applicaton should be stateless, which will make scale easier.
  • re-creatable environment.
  • automated deployment process.

Log Collection

Image your company have hundreds of services in production environment, how would you trace a specific request across these systems? login to each box and grep in logs?

A comon approach is creating a central log management system. hundreds of log collection agents running along side with services on each host, collecting logs generated by services and send back to central log management system.

Another tips is, in a large system which composed of many small services, use a generated gloabl unique transaction id across all services for each user request, this will made tracking asynchronous message much easier.

This is also a good invesment from the company’s perspective. having a central log management system, the company have a gold mine of user behaviour.

Monitoring & Alert

As Werner Vogels, CTO of Amazon, says: “you build it, you run it.” microservice developers shoud have more insights of the systems they built, as it is their own responsibility to operate it.

Metric

Many cloud service provider have already provided some built in metrics, e.g. CPU, Memory, Bandwidth usage. You can also create your own custom metrics. But you’re not tied to your cloud service, there’re many 3rd-party tooling support in this area, just add a client libraray and api key in your service, you’re able to send metrics to their server, then you’ll get a nice dashboard for your service.

With these metrices, you can even define auto scalling policy

Alert

You want to be notified when bad things happened on your service. again, there’re many 3rd-party tooling support in this area, integrate with these service are pretty straightforward. All you need to do is to define threshold and how you want to receive alert.

Conclusion

Again, microservice isn’t silver bullet, you need to pay significant cost to adopt microservice, be aware of these things when you start consider building your next system in a microservice style.

自从8月14日离开公司后,卖车,出租房屋,打包托运行李,和朋友聚会,几乎每天的事情都安排得特别满,相当疲惫。

一开始得到offer后,当然特别兴奋,也算是又达成了自己的一个目标。记得之前跟媳妇提去国外工作这件事情时,觉得还是比较遥远的事情,然而我相当豪气地说了一句“我自从工作以来,只要是认定了的职业目标,哪个是没有达成的”。这次,又实现了。

然而,刚开始的兴奋很快就归于平静,冷静下来之后考虑的事情就越来越多,毕竟已经在国内有了比较安稳的生活,真得要抛下这一切来个二次奋斗了吗?到后来快要离开时,简直已经开始怀疑自己的这个决定了。

旅程

来墨尔本的机票是澳洲这边中介公司帮忙订的,路线并不是很好,当时也没注意,快要出发的前几天才发现,西安经吉隆坡转机到达墨尔本,晚上11点起飞,到第二天晚上11点到墨尔本,在吉隆坡机场停留了八个小时。登机后才知道原来亚航的飞机默认是不供餐的,要吃还得自费购买。

抵达墨尔本机场后,也是轻车熟路了,迅速办理入关手续,出来办了张电话卡,拖着近四十公斤的行李直奔先前预定好的Melbourne Airport Holiday Inn,然而大半夜的,出来后不辩东西南北,幸好有热心的澳洲大叔相助,才得以顺利入住。

第二天到10点才起来,退房后,坐机场大巴转火车再步行才算抵达住处。

来墨尔本之前已经在网上申请了一家银行帐号,到墨尔本的第二天就带护照去Swanston St的网点去激活帐号,接待我的是位华人,开始用英文聊着,后来她出去办了个事回来我们就很自然地切换成中文模式了~。

接下来就去REA的办公室见了下Erica,Rupi,Luke。Trent去黄金海岸,我们的可爱的麦叔回家生孩子去了。 到那边才得知Erica和我们的麦叔也马上离开REA了,加上西安那边的变动,这个让我引以为豪的Team变化得还是蛮大的,不胜唏嘘 T_T。

后来跟志超,刀哥还有在这边出差的莹莹一起吃了个饭。

虽然之前来过墨尔本好几次,这次是头一次来REA的新办公室,果然气派。

环境

来之前心情并不那么轻松,然而这边环境确实还是相当能够给人以安慰。之前几次来都住在城里,这次来了之后住在郊区,才发现,这边的房子确实都特别漂亮,下面的那一大片草地就是在我住的这个房子的后院,推开院门就是这么一番景象,确实令人震撼。

回家路上偶摄一张

心态

在来澳洲之前,四火就专门跟我聊过一次,他现在在西雅图,从一个出国定居两年的老移民的角度给了一些建议,从一开始的新鲜,到后来的孤单,再到后来的适应。不太相同的可能是我之前已经来过几次,因此那种新鲜感并不多,现在最重要的就是调整好心态,安排好这边的一切,等家人们都过来了,就再次为生活奋斗吧。折腾吧,追随自己这个不安分的心。