Terraform Tips: Layered Infrastrucutre

Terraform have been a significant player in the infrastructure as code field. Since its first release in 2014, it has been widely used in the industry. Terraform finally reached 1.0 on 8 June 2021.

It is dead simple to provision and manages resources via terraform’s human-readable, declarative configuration language. However, you might only see the challenges when using it with anger in real-life projects. In this post, we’ll talk about the idea behind layered infrastructure; The problem it was trying to solve, and how to adapt it in your project.

Technically, we can provision a whole environment including networks, subnets, security groups, data stores, EC2 instances in one single terraform file. See the example below.

└── infra
    ├── prod
    │   └── main.tf
    ├── qa
    │   └── main.tf
    ├── dev
    │   └── main.tf
    └── stage
        └── main.tf

However, this would lead to a slow deployment process. To apply any resources changes, terraform would have to query and compare the state for each resource defined in main.tf.

We knew that the frequency of changes to different types of resources varies drastically; for example, the chance of changing the number of EC2 instances would be significantly higher than VPC CIDR. It would be a massive waste for terraform to compare hundreds of nearly unchanged resources to increase the instance number for an AutosScalingGroup.

If we use a remote state store, we can only apply any infrastructure changes to the environment one at a time.

There’s room for improvement. In a standard application deployment, we can classify these resources into layers such as application, compute and networks; the higher layer can depend on resources in lower layers.

Resources such as docker containers, data store, SNS topics, SQS queue, Lambda function are usually owned by an application. Resources such as EC2 instances, ECS or EKS clusters, providing computing capabilities, are usually shared across different applications.

Resources such as VPC, subnets, internet gateway, Network Address Translation (NAT) Gateway, network peering are essential to provision resources mentioned above. With these layered infrastructures, we can provision resources in different layers independently.

This is the idea of “layered infrastructure”, here is a layout of the project adopting layered infrastructure.

├ components       # components for an environment
│  ├ 00-iam           # bootstrap roles which will be used in higher layers
│  ├ 01-networks
│  ├ 02-computing
│  ├ 03-application
├ modules          # in-house terraform modules

As you can see from the layout, prepending number to component name makes it easy to understand their dependency.

Now let’s have a closer look at this layout. The layered infrastructure has three key concepts, module, component and environment.

Module

A Terraform module is a set of Terraform configuration files in a single directory intended to organise, encapsulate, and reuse configuration files, providing consistency and ensuring best practices. A terraform module usually has the following structure:

.
├── LICENSE
├── README.md
├── main.tf
├── variables.tf
└── outputs.tf    

For example, terraform-aws-vpc is a community module that can be used to provision VPC with subnets.

You can also maintain in-house terraform modules for shared codes within your organisation. Module

Component

An environment components groups multiple closely related modules or resources together. It can be provisioned independently within an environment. A component might depend on other components; Cyclic dependency must be avoided in component dependencies. A component usually has the following structure:

.
├── terraform.tf // backend configuration
├── provider.tf
├── main.tf
├── variables.tf
├── outputs.tf
└── go           // entry point for `terraform plan`, `terraform apply` and `terraform destroy`

Example of network components. Component

Environment

In the context of infrastructure as code, an environment is an isolated deployed instance of one or more components configured for a specific purpose, e.g. “dev”, “test”, “staging”, “production”.

All environments should have the same layout, with knot can be adjusted according to each environment. The only difference between environments should be captured in an environment-specific file tfvar. Let’s revisit the example project layout for an environment.

├ components          # components for an environment
│  ├ 00-iam           # bootstrap roles which will be used in higher layers
│  ├ 01-networks      # manage VPC, subnets, common security groups. output vpc/subnets/security group id.
│  ├ 02-computing     # manage computing resources into the vpc/subnets.
│  ├ 03-application   # manage application-specific resources 
├ modules             # in-house terraform modules

Environment

There are many benefits of adopting this approach, such as

  • Enables independent provisioning of each component (when the component’s output doesn’t change)
  • Fast deployment for the benefits of less state comparison.

Conclusion

We explored the problem layered infrastructure trying to solve; The benefits of this approach, and how to adapt it in your project.
This idea was inspired by Terraform Best Practices.

A jounery of performance tuning KafkaStream application

In this post, we’ll discuss our journey of tuning the performance of a few Kafka and KafkaStream application.

Principals

Divide and conquer Breaking down overall performance target into individual components have been proved works very efficient. In a distributed system that has a dozen services, There could be many bottlenecks, which might interfere with each other; It is extremely challenging to test the hypothesis in such a dynamic environment.

By define performance target for individual components and strategically tackling “low hanging fruit”, We were able to archive significant improvement in a short period.

Measurement

It is essential to measure the system before even tuning it. Having measurement in place helps the team understand the current and historical performance of the system.

Usually, the performance tuning requirements are described in one of the following formats:

  • The 99th percentile of API response time must not be larger than x milliseconds(latency).

  • The system should completely process messages in n` seconds (throughput).

By creating customised metrics that measure latency and throughput, and create a dashboard from collected metric data, We’re able to test the hypothesis at a swift pace.

Observibility

With the measurement capability built-in, we do observe significant latency during performance testing. However, without comprehensive insights into the system, it is challenging to locate the bottlenecks.

There’re two types of metrics that are essential to have to build meaningful insights into an application:

  • infrastructure metrics e.g. CPU utilisation, Memory utilisation, Network IO, Disk IO

  • application metrics e.g. JVM metrics, Kafka producer, consumer, Kafka stream metrics etc.

Collecting these metrics and create a comprehensive dashboard for an application give us a single place to identify the potential bottleneck.

App Dashboard

There’re many options for metric collection and aggregation. We use AppDynamic to collect and visualise these metrics, it has been beneficial to be able to have the ability to look at these metrics retrospectively after we changed a few parameters.

Also, Kafka exports a load of metrics via JMX, people might be overwhelmed when looking to them. We found this blog post from datadog is the best of explaining the meaning and indication of some key metrics.

KafkaStream App Tuning

Offheap memory usage

Compared to an application that only uses plain old Kafka producer and consumer, KafkaStream application requires more resources.

We noticed that our KafkaStream application’s memory usage constantly growing and eventually used up all available memory.

It turns out that this is not a memory leak. Rocks DB used a large portion of off-heap memory (non-heap memory), even JVM heap memory usage is stabilised around 400MB, the RSS (Resident set size) of this application process continually growing.

KafkaStream creates local state store which is optionally backed up to changelog topic for stateful operation (e.g. join, groupBy, etc.).

The following formula illustrates memory requirements for a stream application with default rocks DB settings.

one_rocks_db_memory_footprint = write_buffer_size_mb  * write_buffer_count + block_cache_size_mb
 
# default one_rocks_db_memory_footprint is 16MB * 3 + 50MB = 98MB

over_all_footprint_of_one_windowed_state_store  =  partition_number * segment_count * one_rocks_db_memory_footprint 

# default over_all_footprint_of_one_windowed_state_store is 12 * 3 * 98MB = 3528MB

There’re eight windowed joins in our application, the off-heap memory is 8 * 3528MB = 28224MB.

By providing a customised rocksdb configure, we can limit the off-heap memory to less than 4GB, provided that we haven’t observed performance degradation.

Minimise overall latency

There’re multiple consumer and producers in a KafkaStream application. Any misconfigured consumer/producer properties could contribute to the overall delay.

  1. Consumer commit.interval.ms

    This value allows us to make a trade-off between throughput and latency. A larger value increases system throughput but adds “artificial delays”. A smaller value will lead to more frequent consumer offset commits.

  2. Producer batch.size and linger.ms The default value of batch.size is 16KB, and average record size is 6KB, which means the producer need perform a send operation for every 2~3 messages, plus there is inevitable network latency between our application and Kafka Broker. By increasing batch.size to 1MB and set linger.ms to 50, we reduced the network latency overhead per message, and observed improvements on throughput and latency.

Stateful Operation and Change log restore process

One of my favourite feature of KafkaStream is the stateful operation without losing the ability of horizontal scaling.

However, we could be caught by some unexpected behaviours without using this operation cautiously.

Kafka Stream creates a local state store for each partition to perform the stateful operation, with the option of back up data to changelog topic.

If retention.ms is not specified, broker config log.retention.ms(default 7 days) will be used as a retention period of changelog topic.

Choosing the right window size, changelog topic retention period is essential for avoiding a lengthy change log restore process.

Further Readings

Tracking the root cause of a topic co-partition issue

When joining streams in Kafka Stream application, one critical prerequisite is that topics need to be co-partitioned.

In this post, I’ll share my experience of tracking down an issue of topics not fulfilling co-partition guarantee, when using DefaultPartitioner and confluent’s KafkaAvroSerializer in topic key.

Background

  • Service A producing events to topic A, with key K.
  • Service B producing events to topic B, with the same key K.
  • Service C joins events from both topics A and topic B and produce a calculated result to topic C, with the same key K.
  • All topics have the same number of partitions 10.
  • These topics key schema registration strategy is TopicNameStrategy.
  • key serializer is KafkaAvroSerializer.
  • All three services use DefaultParitioner in the producer.

The Issue

Service A and B produces records with key k1 to corresponded topics, When service C create two KafkaStream from topic A and B, and join them together; it complains that no matching record for key k1.

Let’s revisit the definition of Copartition Requirements.

One additional item I would like to add to this requirements is:

For the same key, the record should be on the same partition across these co-partitioned topics.

Service C complains that it can not find a matching record for key k1 as it considers only records from the same partition on both topics, while in reality,k1 was sent to partition 2 of topic A but partition 0 of topic B.

Even the configuration of topic A and B, service A, B and C meet all the requirements defined above, why is that?

Uncover the root cause

The following diagram demonstrates how the partition number calculated. partition process

Firstly KafkaProducer delegate to KafkaAvroSerializer to serialize an Avro object to a byte array, the serialized result includes schema id.

//org.apache.kafka.clients.producer.KafkaProducer#doSend

//In current context, keySerializer is an instance of KafkaAvroSerializer
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());

Secondly, the keySerializer talk to schema registry to resvole the schema Id for topic key and append it to serialized bytes.

//io.confluent.kafka.serializers.AbstractKafkaAvroSerializer#serializeImpl

id = schemaRegistry.getId(subject, new AvroSchema(schema)); //as we're using TopicNameStrategy, the subject is "A-key" or "B-key"
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(MAGIC_BYTE);
out.write(ByteBuffer.allocate(idSize).putInt(id).array());

Thirdly, KafkaProducer hand the serialzation result to DefaultPartitioner to calculate partition number.

//org.apache.kafka.clients.producer.KafkaProducer#partition

partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster;

Lastly, DefaultPartitioner calculate parition number from serialized key bytes.

// org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition

// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

Root Cause

  • Key schema registration strategy is TopicNameStrategy, The same schema of K is registered with topic A and topic B separately; hence different schema Id is generated for topic A topic B.

  • When service A and service B serializing k1 , they append the different schema Id in the serialized bytes; Even though the Avro serialized value of k1 in two services are identical, the serialized key bytes are different.

  • DefaultParitioner calculates partition from serialized bytes, and yields different partition number in two services.

Solution

There’re two ways to address this problem; both have pros and cons.

  • Use other primitive type and Serde in topic key, e.g. StringSerializer, LongSerializer, etc.

The upside of this approach is co-partition is guaranteed as long as these requirements are met. The downside is losing the ability to evolve key schema. (who want to do this anyway?)

  • Use a customized partitioner

The upside of this approach is the ability to evolve key schema. The downside is additional complexity to services.

Conclusion

Using DefaultPartitioner and KafkaAvroSerializer in the topic key will make the topic fail to meet co-partition requirements.

Caveats

RecordNameStrategy won’t help in this case, as when there’s a need to upgrade the key schema, new schema id will be generated, which in turn generate different serialized bytes and yield different partition number. Even for the same key.

readable angular tests

In this post, I’ll provdes some tips to create readable angular test.

General Tips

Use describe or nested describe to structure specs

  • Use top level describe to describe a public function or a feature of the component to be tested.
  • Use nested describe to group specs which share the same context.Check the following example:
//top level describe
describe('#landing page', function() {
    //nested describe
    describe('logged in as admin', function() {
        beforeEach(function(){
            //context setup code for an admin user
        });
        it('should be able to see admin link', function() {});
        it('should be able to see greeting message', function() {});
    });
	describe('login as member', function() {
        beforeEach(function () {
            //context setup code for an memebr user
        });
        it('should not be able to see admin link', function() {});
        it('should be able to see greeting message', function() {});
	});
});

Put variable declaration close to usage.

It is quite common to see a bunch of local variables defined at the top of a spec then initialize them somewhere else in the code. It makes it super hard to understand the value and usage of those variables when going through the code.

Service Specs Tips

Put more business logic into Service.

Some services we created are just a very thin wrap of $http, Eg. :

angular
    .module('MyApp')
    .service('MyService', ['$http', 
    function ($http) {
        this.get = function (projectId, myId) {return $http.get('/projects/' + projectId + '/something/' + myId);};
    }]);

dead simple but is it worth to test it?

Encapsulating business logic into Service instead of Controller or Directive will lead to a higher test coverage of the code.

Directive Specs Tips

Directive is hard to test, here’re some tips:

Make directive as small as possible

One common mistake I observed is that directives have too much responsiblity.
Only have view/rendering related code should live in directive, any other business logic should be extracted into a service. If the “extra” code is related another view extract into another directive.

Limit the usage of $scope variables.

In most cases, a private variable is good enough. Compared to private variable, scope variables are public accessiable, which violate the encapsulation princaple. Also, scope variables are bound to directive lifecycle. check the following example.

angular.module('MyApp')
.controller('MyController', ['$scope', '$stateParams', 'MyService',
function ($scope, $stateParams, MyService) {
	var self = this;
	self.isAdmin = false;
	self.canCreateUser = false;
	self.myDomain = null;
	//...
	MyService.get($scope.myId).then(function(data){
	    self.myDomain = data;
	});
	//...
}
Simple template

Avoid bussiness logic in templates. Replace them with directive methods that express the intent of the logic, Eg. :

  //Before Refactor
  //directive
  $scope.isAdmin = ...;
  $scope.canCreateUser  = ...;
  $scope.isFriday = ...;

//template
<a ng-if="isOwner && canEditDocument && isFriday">click me</a>
//After refactor
//directive
$scope.isEditable = function(){
    return self.isAdmin && self.canCreateUser && self.isFriday
}

//template
<a ng-if="isEditable()">click me</a>
//Three scope variables were replaced by one scope function
Do not test directive template

Given we have made the template simple, we don’t need to test them. Testing directive methods(e.g. isEditable) is much easier to deal with doms in template, but provides enough confidence of quality.

Exposing event stream from monolith

背景

随着微服务概念的流行,微服务得到到越来越多的企业的青睐,开发团队开始构建自己的微服务,笔者了解到的企业的微服务之路大致如下:

始于单体架构(Monolith)

大多巨型应用一开始都是从一个很小的应用逐渐成长起来的,随着业务的增长,更多的功能被加入进来,几年下来,这个应用可能已经变成了庞大的代码库,单块架构的缺点已经严重拖累了团队的效率。

拥抱微服务(Mirocservices)

基于以上的问题,团队开始了解微服务实践,然而,拥抱微服务不可能一蹴而就,一个常见的方法就是分而治之:

  • 当一个新需求需要改动Monolith中的一个模块时,构建一个新的微服务,集成到Monolith。

  • 重复此步骤直至所有Monolith中的模块都被拆分到各个新的微服务中。

image

微服务的问题

在上面的过程中,首先要解决的问题就是如何把微服务与现有Monolith系统的耦合

在大多数情况,微服务都需要与Monolith中的各个模块交换数据来实现业务流程,一种最常见的方式是在Monolith中增加REST api,微服务通过同步调用这些REST api与Monolith集成,然而这也意味着这个微服务的交付周期与Monolith耦合起来了,而与Monothlith的交付周期解耦合本是创建微服务的一大驱动力。

因此,把微服务与Monolith应用的的同步依赖解耦就成为了最关键的步骤。

本文将通过这个实际案例讲述通过Streaming platform解耦微服务之间的同步依赖,在此过程中遇到的问题以及解决方案。

架构

笔者现在服务的公司的主要业务是面向建筑行业提供项目协作平台,其中核心的Monolith应用已经有20多年的历史,在过去几年时间里,我们围绕这个Monolith创建了大约十几个微服务,每个微服务都需要通过REST api与Monolith应用进行协作。

然而这种Monolith与微服务之间的同步依赖也存在着其他问题,整个系统的扩展性并没有得到提升,微服务所承担的系统压力仍然会被传递到Monolith上,我们选择的方式,移除同步依赖

我们把Monolith应用的数据以Event的形式发布到Kafka topic上, 微服务订阅这个topic获取历史数据和实时数据,在本地数据库中构建适合自身业务场景的projection,以前需要同步调用Monolith应用API的地方,都可以被替换为使用本地的projection,这样就消除了同步调用依赖。

image

如图所示,把Monolith应用数据发布为event stream需要以下几个步骤:

  • 对Monolith应用中的业务流程进行建模,得到一组表述业务事件的数据模型
  • 在各个业务流程中植入生成业务事件的代码
    • 生成业务事件每个事件被分配一个唯一的,线性增长的序列号(参考数据库中的sequence)
    • 作为在业务流程Transaction的一部分,持久化到domain_events表中
  • Source connector负责:
    • 监视domain_events表中的数据,一旦有新数据产生就读取,发布到Kafka集群中。
    • 维护当前已经发布的数据的序列号,并及时持久化序列号,尽量减少重复发布的数据。

Event是如何生成的?

从上图可以看出,前两步把Monolith系统中数据的变化以event的形式持久化下来,做为其他系统的输入,这个其实跟数据库领域中CDC的概念非常相似,不同的是,这里我们对捕捉领域模型的变化,而CDC是捕捉对数据库表的变化。Confluent开源了一个Postgres的CDC实现

Event是如何发布的?

Kafka社区已经有了很多的Kafka connector,(包括Source Connector和Sink Connector)。笔者强烈建议首先在这里寻找适合项目场景的connector, 如果找不到合适的,再去尝试实现自己的connector。

在笔者的项目里,Monolith使用的是Microsoft SQLServer, 数据库表的设计并不能很好的与现有的领域模型对应,上述的任何Source Connector都难以胜任这种场景,于是我们团队自己实现了一个 bridge 组件用于把事件数据从ms sqlserver中发送到kafka集群中。

Event是如何被消费的?

在实现event stream的消费者时,我们使用了Kafka Stream,这个库提供event stream的抽象,开发者无需关心offset管理这种底层逻辑。

值得一提的是,由于一个消息可能会出现在event stream的多个位置(kafka的at-least-once),消费者的实现必须是幂等的(idempotent)。

挑战

在实施上述架构的过程中,我们遇到了以下挑战:

数据的高可靠性 (dualbilty)

笔者所在的项目,任何数据的丢失意味着event stream上的数据与源数据永久地不一致,这对event stream的消费者来说是不可接受的。

为了保证发布到Kafka集群上的数据不会丢失,在N个broker的kafka集群中, 关于log replication的配置如下:

default.replication.factor = N #同步replica的数量 + 异步replica的数量
min.insync.replicas = N-1 #同步replica的数量

此配置的目的在保证数据一致性的前提下,Kafka集群在失去ceil(N-1/2)个节点后仍然能够接受数据读写操作。

数据的顺序保证 (order guarentee)

Kafka仅在单个partition内保证顺序,因此,挑选一个合适的partition key 极为关键。

数据的低冗余 (low duplication)

在实现bridge时,及时计算已发送成功(Kafka broker发回了成功回执)消息的最小序列号极为关键,当bridge因为某种原因停止工作,重新启动bridge后,个最小序列号就是bridge失败重试的起点。

我们通过以下手段减少数据冗余:

  • 限制in-flight(正在向Kafka broker发送并且等待回执)的message数量
  • 及时计算被已发送成功的消息的最小序列号并保存到持久化设备。

为什么用Kafka?

业务事件被保存到domain_events表之后,就需要发布到event stream了, 在选择event stream的载体时,数据的高可靠性,顺序保证,系统的可伸缩性被做为重要的衡量标准。

可靠性

Kafka采用了log based storage, 发送到broker的数据被添加在日志文件的尾部,由于避免了数据存储中昂贵的”查询修改“操作,使得其有及高的存储性能。

并且Kafka的replication保证了所有保存到Kafka Broker数据日志的数据都会有多个备份,从而降低了数据丢失的概率。

顺序保证

Kafka的数据日志支持数据切片(partition),在同一个partition内部,数据被存储的数码就是broker收到该数据的顺序,因此,选择合适 的partition key可以保证在数据的顺序。

可伸缩性

Kafka broker集群支持增加或者减少节点,我们可以根据系统容量调节集群的节点数量,Kafka可以自动地把partition在broker节点之间重新分配

对流处理平台的友好性

Confluent的Streaming data platform中介绍了Kafka作为系统间数据集成的应用场景:

The streaming platform captures streams of events or data changes and 
feeds these to other data systems such as relational databases, 
key-value stores, Hadoop, or the data warehouse. 
This is a streaming version of existing data movement technologies such as ETL systems.

而Kafka把过去和未来的数据统一在同一种api Stream<Key, Message>下,从而使得使用Kafka做流处理变得非常自然。

消息的持久性

在笔者的项目中,Kafka topic中的消息的有效期是Integer.MAX_VALUE小时(约等于245146年),event stream的消费者可以自由的设置其起始位置,处理event stream上的所有数据。

总结

Monolith持续不断地把业务事件发布到Kafka承载的event stream上, 微服务通过订阅event stream,在本地构建应用所需要的projection,再与应用自己的数据库聚合对外提供服务, 这样,我们就消灭了微服务与Monolith之间的同步依赖,转而通过event stream在微服务与Monolith传递数据,更进一步,微服务与微服务之间数据传输也可以通过Kafka承载的event stream实现。