Exposing event stream from monolith

背景

随着微服务概念的流行,微服务得到到越来越多的企业的青睐,开发团队开始构建自己的微服务,笔者了解到的企业的微服务之路大致如下:

始于单体架构(Monolith)

大多巨型应用一开始都是从一个很小的应用逐渐成长起来的,随着业务的增长,更多的功能被加入进来,几年下来,这个应用可能已经变成了庞大的代码库,单块架构的缺点已经严重拖累了团队的效率。

拥抱微服务(Mirocservices)

基于以上的问题,团队开始了解微服务实践,然而,拥抱微服务不可能一蹴而就,一个常见的方法就是分而治之:

  • 当一个新需求需要改动Monolith中的一个模块时,构建一个新的微服务,集成到Monolith。

  • 重复此步骤直至所有Monolith中的模块都被拆分到各个新的微服务中。

image

微服务的问题

在上面的过程中,首先要解决的问题就是如何把微服务与现有Monolith系统的耦合

在大多数情况,微服务都需要与Monolith中的各个模块交换数据来实现业务流程,一种最常见的方式是在Monolith中增加REST api,微服务通过同步调用这些REST api与Monolith集成,然而这也意味着这个微服务的交付周期与Monolith耦合起来了,而与Monothlith的交付周期解耦合本是创建微服务的一大驱动力。

因此,把微服务与Monolith应用的的同步依赖解耦就成为了最关键的步骤。

本文将通过这个实际案例讲述通过Streaming platform解耦微服务之间的同步依赖,在此过程中遇到的问题以及解决方案。

架构

笔者现在服务的公司的主要业务是面向建筑行业提供项目协作平台,其中核心的Monolith应用已经有20多年的历史,在过去几年时间里,我们围绕这个Monolith创建了大约十几个微服务,每个微服务都需要通过REST api与Monolith应用进行协作。

然而这种Monolith与微服务之间的同步依赖也存在着其他问题,整个系统的扩展性并没有得到提升,微服务所承担的系统压力仍然会被传递到Monolith上,我们选择的方式,移除同步依赖

我们把Monolith应用的数据以Event的形式发布到Kafka topic上, 微服务订阅这个topic获取历史数据和实时数据,在本地数据库中构建适合自身业务场景的projection,以前需要同步调用Monolith应用API的地方,都可以被替换为使用本地的projection,这样就消除了同步调用依赖。

image

如图所示,把Monolith应用数据发布为event stream需要以下几个步骤:

  • 对Monolith应用中的业务流程进行建模,得到一组表述业务事件的数据模型
  • 在各个业务流程中植入生成业务事件的代码
    • 生成业务事件每个事件被分配一个唯一的,线性增长的序列号(参考数据库中的sequence)
    • 作为在业务流程Transaction的一部分,持久化到domain_events表中
  • Source connector负责:
    • 监视domain_events表中的数据,一旦有新数据产生就读取,发布到Kafka集群中。
    • 维护当前已经发布的数据的序列号,并及时持久化序列号,尽量减少重复发布的数据。

Event是如何生成的?

从上图可以看出,前两步把Monolith系统中数据的变化以event的形式持久化下来,做为其他系统的输入,这个其实跟数据库领域中CDC的概念非常相似,不同的是,这里我们对捕捉领域模型的变化,而CDC是捕捉对数据库表的变化。Confluent开源了一个Postgres的CDC实现

Event是如何发布的?

Kafka社区已经有了很多的Kafka connector,(包括Source Connector和Sink Connector)。笔者强烈建议首先在这里寻找适合项目场景的connector, 如果找不到合适的,再去尝试实现自己的connector。

在笔者的项目里,Monolith使用的是Microsoft SQLServer, 数据库表的设计并不能很好的与现有的领域模型对应,上述的任何Source Connector都难以胜任这种场景,于是我们团队自己实现了一个 bridge 组件用于把事件数据从ms sqlserver中发送到kafka集群中。

Event是如何被消费的?

在实现event stream的消费者时,我们使用了Kafka Stream,这个库提供event stream的抽象,开发者无需关心offset管理这种底层逻辑。

值得一提的是,由于一个消息可能会出现在event stream的多个位置(kafka的at-least-once),消费者的实现必须是幂等的(idempotent)。

挑战

在实施上述架构的过程中,我们遇到了以下挑战:

数据的高可靠性 (dualbilty)

笔者所在的项目,任何数据的丢失意味着event stream上的数据与源数据永久地不一致,这对event stream的消费者来说是不可接受的。

为了保证发布到Kafka集群上的数据不会丢失,在N个broker的kafka集群中, 关于log replication的配置如下:

default.replication.factor = N #同步replica的数量 + 异步replica的数量
min.insync.replicas = N-1 #同步replica的数量

此配置的目的在保证数据一致性的前提下,Kafka集群在失去ceil(N-1/2)个节点后仍然能够接受数据读写操作。

数据的顺序保证 (order guarentee)

Kafka仅在单个partition内保证顺序,因此,挑选一个合适的partition key 极为关键。

数据的低冗余 (low duplication)

在实现bridge时,及时计算已发送成功(Kafka broker发回了成功回执)消息的最小序列号极为关键,当bridge因为某种原因停止工作,重新启动bridge后,个最小序列号就是bridge失败重试的起点。

我们通过以下手段减少数据冗余:

  • 限制in-flight(正在向Kafka broker发送并且等待回执)的message数量
  • 及时计算被已发送成功的消息的最小序列号并保存到持久化设备。

为什么用Kafka?

业务事件被保存到domain_events表之后,就需要发布到event stream了, 在选择event stream的载体时,数据的高可靠性,顺序保证,系统的可伸缩性被做为重要的衡量标准。

可靠性

Kafka采用了log based storage, 发送到broker的数据被添加在日志文件的尾部,由于避免了数据存储中昂贵的”查询修改“操作,使得其有及高的存储性能。

并且Kafka的replication保证了所有保存到Kafka Broker数据日志的数据都会有多个备份,从而降低了数据丢失的概率。

顺序保证

Kafka的数据日志支持数据切片(partition),在同一个partition内部,数据被存储的数码就是broker收到该数据的顺序,因此,选择合适 的partition key可以保证在数据的顺序。

可伸缩性

Kafka broker集群支持增加或者减少节点,我们可以根据系统容量调节集群的节点数量,Kafka可以自动地把partition在broker节点之间重新分配

对流处理平台的友好性

Confluent的Streaming data platform中介绍了Kafka作为系统间数据集成的应用场景:

The streaming platform captures streams of events or data changes and 
feeds these to other data systems such as relational databases, 
key-value stores, Hadoop, or the data warehouse. 
This is a streaming version of existing data movement technologies such as ETL systems.

而Kafka把过去和未来的数据统一在同一种api Stream<Key, Message>下,从而使得使用Kafka做流处理变得非常自然。

消息的持久性

在笔者的项目中,Kafka topic中的消息的有效期是Integer.MAX_VALUE小时(约等于245146年),event stream的消费者可以自由的设置其起始位置,处理event stream上的所有数据。

总结

Monolith持续不断地把业务事件发布到Kafka承载的event stream上, 微服务通过订阅event stream,在本地构建应用所需要的projection,再与应用自己的数据库聚合对外提供服务, 这样,我们就消灭了微服务与Monolith之间的同步依赖,转而通过event stream在微服务与Monolith传递数据,更进一步,微服务与微服务之间数据传输也可以通过Kafka承载的event stream实现。

Things need to be consider when building microservice

TLDR;

IT industry never run out of buzz words, big data, SOA, Cloud, IoT, reactive. The hostest buzz word of 2015 have to be microservice and docker. In this posts, I’ll share my experinence of few projects I worked on over last year, which are dockerised microservice. I won’t talk about what is microservice, why you should adopt microservice.

Outlines

To build a microservice, you’d better to think about provision and deployment at the beginning of your project.

Provision

People might get confused about provision and deployment.

In my opinon, provision is all about set up required infrastructures for your service, which including:

  • Virtual Machine
  • Loadbanlacner
  • Reverse Proxy
  • Database
  • Message Queue
  • Network Access Controll
  • etc.

the goal of provision is a envrionment which a artifact can be deployed to.

While deployment is deploying a specific version artifact to an target environment(which has been provisioned before).

the goal of deployment is:

  • a updated service up and running in health state. (deploment succeed)
  • a non-updated service up and running in health state.(deploment failed)

independent lifecycle is the most important feature of microservices, independent lifecycle means you need to manage your service by yourselves.

To achieve the goal of provision, you need to answer the following questions:

  • What is the target machine of your service? a public cloud service provider(e.g. AWS, Asure) or a managed private cloud or just VMs in your own datacenter?

  • What is the network requirement? is it a public facing service or an internal service which sits behind a firewall?

  • Is your service depends on any other infrastructrure? e.g. a message queue, database, do you want a provsion these infrastructures by yourself, or just use a service provided by cloud service provider(e.g. RDS, SQS from AWS)? This may affect your decision.

  • How to provision the environment? Once you’ve chosen target environment, you need to decide how to rovision the target environment? Options like ansible, cloudformation, chef, puppet, all of there have pros and cons, choose one fits your requirements.

Deployment

As I mentioned in previous section, deployment is all about deploying a specific version artifact to an target environment. to have your service deploy to target environment, you need to answer the following questions:

  • what is the deployable artifact of your service?
  • what is the automation tool of deployment?
  • can I roll back to previous version if I found a critical bug in the version just deployed?
what is the deployable artifact of your service?

in a Continous Delivery(aka. CD) world, an artifact is an result of of packaging job, which is part of a CD pipeline. artifact can be deployed to a environment and providing service.

There’re many options for artifacts, such as

  • source code
  • amazon machine image(aka, AMI)
  • rpm/deb package
  • docker image all of them have pros and cons, I’ll give a brief overview on these options.
source code

in ruby world, there’re some tools like mina, capistrano that allow you to deploy source code into target environment.

  • The Pros:

    • The advantage of this approach is that simplicity. All these tool does is provide a nicer DSL which allow you to copy source code into target machine and start them, also, they maintained few old versions on target machine, so that you can easily roll back to an old version.
  • The Cons:
    • The disadvantage of this approachs is also simplicity, if your application have any native dependencies(e.g. postgres), these tools can not help, you have to update your provision script to ensure the target environment has required native dependencies.

    • in another word, it is hard to define a clear seperation line between provison and deployment script if you’re using tool.

  • Tooling:
    • mina/capistrano in ruby
    • shell scripts
  • Conclusion: if you favor simplicity over maintaince, use these tools, otherwise, you should consider other options.
Amazon Machine Image

This is a quite intesresting approach, the main idea behind it is, Given a base amazon image, spin up a EC2 instance, install your service on it as an auto started service, create a new AMI from the EC2 instance, The newly created image is artifact.

  • The Pros:
    • By baking your service into a machine image, spin up a new machine, roll back to a old version can be finished automaticly within few minutes.
    • By baking your service and its dependence into a machine images, Consistency across difference environment can be achieved much easier.
  • The Cons:
    • Bakcing a new image could be time consuming.
    • Make your service tightly coupled to cloud vendor’s approach. in my last projects, we’d encountered a lot of issues while Amazon start introduce a new virtual technique - HVM(hardware assisted virtual), while our projects sitll use a lot paravirtual VMs.
  • Tooling:
Platform specific distribution package(e.g. rpm/deb)

The idea is creating a platform-specific package, which contains your service, also a manifest file which including dependence declaration.

  • The Pros:
    • Leverage platform’s package management capability
  • The Cons:
    • TODO
Docker Image as artifact

The idea is baking your service into docker image and distribute it into target environment. The time to build a ship docker images it quite fast, and start a dockerized application is even faster.

Disaster Recovery

  I am prepared for the worst, but hope for the best. 

                              -Benjamin Disraeli

Bad things can happened at any time, to mitigate risks of your service, Disaster Recovery should be considered as early as possible.

Disaster includes Database Corruption, Data Center outage, etc

Each service should have their own disaster recovery strategy, but there’re some principles to follow.

in my prevous project, we as a team, come out with the follow principles.

For asynchrous backend job service:
  • jobs should be idempotent, replayable
  • execution metric & monitor should be used to check system healthy.
For web application
  • applicaton should be stateless, which will make scale easier.
  • re-creatable environment.
  • automated deployment process.

Log Collection

Image your company have hundreds of services in production environment, how would you trace a specific request across these systems? login to each box and grep in logs?

A comon approach is creating a central log management system. hundreds of log collection agents running along side with services on each host, collecting logs generated by services and send back to central log management system.

Another tips is, in a large system which composed of many small services, use a generated gloabl unique transaction id across all services for each user request, this will made tracking asynchronous message much easier.

This is also a good invesment from the company’s perspective. having a central log management system, the company have a gold mine of user behaviour.

Monitoring & Alert

As Werner Vogels, CTO of Amazon, says: “you build it, you run it.” microservice developers shoud have more insights of the systems they built, as it is their own responsibility to operate it.

Metric

Many cloud service provider have already provided some built in metrics, e.g. CPU, Memory, Bandwidth usage. You can also create your own custom metrics. But you’re not tied to your cloud service, there’re many 3rd-party tooling support in this area, just add a client libraray and api key in your service, you’re able to send metrics to their server, then you’ll get a nice dashboard for your service.

With these metrices, you can even define auto scalling policy

Alert

You want to be notified when bad things happened on your service. again, there’re many 3rd-party tooling support in this area, integrate with these service are pretty straightforward. All you need to do is to define threshold and how you want to receive alert.

Conclusion

Again, microservice isn’t silver bullet, you need to pay significant cost to adopt microservice, be aware of these things when you start consider building your next system in a microservice style.

墨尔本,新生活

自从8月14日离开公司后,卖车,出租房屋,打包托运行李,和朋友聚会,几乎每天的事情都安排得特别满,相当疲惫。

一开始得到offer后,当然特别兴奋,也算是又达成了自己的一个目标。记得之前跟媳妇提去国外工作这件事情时,觉得还是比较遥远的事情,然而我相当豪气地说了一句“我自从工作以来,只要是认定了的职业目标,哪个是没有达成的”。这次,又实现了。

然而,刚开始的兴奋很快就归于平静,冷静下来之后考虑的事情就越来越多,毕竟已经在国内有了比较安稳的生活,真得要抛下这一切来个二次奋斗了吗?到后来快要离开时,简直已经开始怀疑自己的这个决定了。

旅程

来墨尔本的机票是澳洲这边中介公司帮忙订的,路线并不是很好,当时也没注意,快要出发的前几天才发现,西安经吉隆坡转机到达墨尔本,晚上11点起飞,到第二天晚上11点到墨尔本,在吉隆坡机场停留了八个小时。登机后才知道原来亚航的飞机默认是不供餐的,要吃还得自费购买。

抵达墨尔本机场后,也是轻车熟路了,迅速办理入关手续,出来办了张电话卡,拖着近四十公斤的行李直奔先前预定好的Melbourne Airport Holiday Inn,然而大半夜的,出来后不辩东西南北,幸好有热心的澳洲大叔相助,才得以顺利入住。

第二天到10点才起来,退房后,坐机场大巴转火车再步行才算抵达住处。

来墨尔本之前已经在网上申请了一家银行帐号,到墨尔本的第二天就带护照去Swanston St的网点去激活帐号,接待我的是位华人,开始用英文聊着,后来她出去办了个事回来我们就很自然地切换成中文模式了~。

接下来就去REA的办公室见了下Erica,Rupi,Luke。Trent去黄金海岸,我们的可爱的麦叔回家生孩子去了。 到那边才得知Erica和我们的麦叔也马上离开REA了,加上西安那边的变动,这个让我引以为豪的Team变化得还是蛮大的,不胜唏嘘 T_T。

后来跟志超,刀哥还有在这边出差的莹莹一起吃了个饭。

虽然之前来过墨尔本好几次,这次是头一次来REA的新办公室,果然气派。

环境

来之前心情并不那么轻松,然而这边环境确实还是相当能够给人以安慰。之前几次来都住在城里,这次来了之后住在郊区,才发现,这边的房子确实都特别漂亮,下面的那一大片草地就是在我住的这个房子的后院,推开院门就是这么一番景象,确实令人震撼。

回家路上偶摄一张

心态

在来澳洲之前,四火就专门跟我聊过一次,他现在在西雅图,从一个出国定居两年的老移民的角度给了一些建议,从一开始的新鲜,到后来的孤单,再到后来的适应。不太相同的可能是我之前已经来过几次,因此那种新鲜感并不多,现在最重要的就是调整好心态,安排好这边的一切,等家人们都过来了,就再次为生活奋斗吧。折腾吧,追随自己这个不安分的心。

分布式系统

我曾经看到过很多玩具项目,都是通过一个系统支撑所有的业务需求。 然而真正在产品环境运行的系统,一般都由多个子系统组成,各个子系统承担不同的职责,子系统之间以某种通信方式进行合作。 究其原因,不外乎单块系统架构带来的复杂性,单机系统的不可靠性。 本文笔者会将一个实际项目中系统设计部分的思考方式分享给大家。

背景介绍

我们的客户希望构建一个信息发布系统,其能够提供以下功能:

  • 通过多种渠道(e.g. email/sms/mobile push notification)向他们的用户发送消息。
  • 尽可能地收集用户收到这些信息后的行为。
  • 基于这些行为分析用户对某类信息的接受度,以支撑后续业务决策。

模块划分

基于上面的需求,我们做的第一件事请就是划分子系统,确定系统之间的职责边界。

  • api 向外暴露一组api,接受即将发送的消息并持久化起来。
  • dispatcher 负责将持久化后的消息分发给一组sender,并且保证一个消息只会被dispatch一次。
  • sender 负责尽可能快地将接受到的消息发送出去。
  • event-collector 负责收集用户对消息的反馈。
  • analyser 负责对收集到的用户行为数据进行分析并生成报表。

问题驱动

对于每个子系统,要能够正常地完成期望的工作,可以从以下几个角度考虑:

  • 性能
  • 可靠性
  • 监控与告警
性能

对一个web系统而言,最直观的两个性能指标就是TPS(transaction per second),response time。 一个系统能否在性能上满足需求,做为架构师,我们需要能够回答下面几个问题:

  • 这个系统即将面对的请求模型是什么?比如每个endpoint接受的请求占整个请求的百分比,比如可遇见的peek time(如天猫双十一)。
  • 确定一个合理的期望的TPS,response time值(不要拍脑袋定,不要拍脑袋定,不要拍脑袋定,否则开发人员会被搞死的)。
  • 考虑你的系统如何应对设计容量之外的请求,系统是否需要支持自动扩展?
  • 如果需要,自动扩展的策略是什么?(如,响应时间大于2秒,CPU占用率大于90%并持续10分钟以上)
  • 系统是否需要有流控能力(throttling & back-pressure)?例如,在上游系统消息发送过快时,告知对方降低发送速率,当没有需要处理的消息时,告知上游系统发送消息。
  • 消息的时效性是什么?过了多久就可以任为这个消息是过期的?对过期消息的处理策略又是什么?

可靠性

我们最常听到人们谈到系统可靠性指标时,会提到‘两个九,三个九’,指的是系统不可用时间占比,以一年为例,99.99%的可靠性就是指一年内服务中断时间不能超过52分钟。

为了保证系统的高可靠性,我们需要考虑以下几个问题:

  • 是否支持零宕机部署?即在不中断服务的情况下部署系统
  • 系统是否有fail-over策略?即在某个机器宕机的情况下, 如何启动新服务继续提供服务。在我们的系统中,采用了AutoscalingGroup的能力保证提供服务的最小机器数量。
  • 系统是否有fault-tolerance能力?在我们的系统中,对于一些关键数据,除了正常在数据流中流转之外,还会在S3上存放一份备份,以便在出错时还可以从备份中恢复数据。
  • 系统是否有灾难恢复能力? 这是一种比较极端的场景,然而一旦出现,其带来的影响可能是毁灭性的。试想一下某个公司的服务器所在的机房突发火灾,所有资料全部丢失,那么这个公司也基本完蛋了。 我们的系统是部署在Amazon的云服务上,为了能够具备这一能力,我们创建了一些可重用的部署模板和部署脚本。那怕Amazon的某个区全部不可用,我们也有能力在另外一个区很快地把整个系统重建出来。
  • 系统之间的依赖是否真得需要实时依赖? 例如,子系统A在完成某个功能时,调用系统B的某个接口并等待其返回,待结果返回后再执行后续操作,这就是实时依赖。 实时依赖有一个很大的弊端就是cascade failure - 当被依赖的系统不可用时,所有实时依赖于它的系统都变得不可用。 因此,要尽可能地避免这种实时依赖。

    有些同事对如何避免实时依赖比较感兴趣,在这里就多说一点。假定有两个系统A和B,A系统有个功能需要从B系统获取一些信息,如果是实时依赖的实现方式,那么两个系统之间的交互方式就如下图:

在我们的项目中采用了异步任务的方式来尽量减少实时依赖的影响,方式如下: 把pre-workapi调用after-work划分为三个独立的任务,这些异步任务有如下特性:

  • 前一个任务完成后会触发下一个任务。
  • 是幂等的,即多次执行和一次执行的结果是一致的。
  • 是可重试的。

异步化之后的交互方式如下图:

总结一下这个方案背后的思路: 通过把一个包含实时依赖的任务划分成多个有依赖关系的子任务,把实时依赖失败可能导致的影响限制在一个很小的范围内,并且,提供了良好的错误恢复机制,这也是design-for-failure思想的体现。

监控与告警

系统上线之后,我们可不希望直到客服找到我们的时候才执行我们的系统出现问题。 因此,在上线之前,我们需要回答下面这些问题:

  • 系统的一些关键路径的性能如何,我们能时刻知道吗?
  • 当系统主要功能出现问题时,系统能够主动报警吗?

我们系统里使用了不少工具来解决此类问题,这里跟大家分享一下

Newrelic

newrelic是一个SaaS的付费服务,主要提供了性能监控,报警等能力。 newrelic提供了多种语言的api,开发人员可以通过这些api监控代码中的关键路径。在系统运行时,newrelic agent会随之启动,并监控这些关键路径的执行时间,错误信息并上报给newrelic server。

Pagerduty

提供告警能力,在告警发生时,可通过电话,短信,邮件,push notification多种渠道通知运维人员。

CloudWatch

CloudWatch是Amazon提供的一种度量服务,这里我主要说下customised metric在系统关键路径收集数据并通过CloudWatch api上报,Amazon会存储这些数据并通过API暴露出来,开发人员可以选择用合适的方式将其可视化出来。 我们系统的dashboad就是把这些度量数据展示出来。

Splunk

splunk主要有两种角色:splunk-forwarder和splunk-indexer 在每个子系统上运行一个splunk-forwarder,收集指定的日志文件并转发给splunk-indexer。 splunk-indexer提供日志的索引,搜索能力。

Rx revisit

TLDR;

本文是rx-java的后续, 将简要介绍Rx的起源,其多线程实现机制,另外会对其实现中的一个重要函数lift函数原理进行介绍。

Rx

Rx实际上是一种高级版本的Observer模式,把被观察者封装成Observable(可理解为一个异步地生产元素的集合), 然后通过 onNext, onError, onCompleted注册Observer在相应场景下需要执行的 回调。 值得一提的是,Rx为Observable提供了的filter,map/flatMap, merge, reduce等操作,使得对被观察者的操作就像同步集合那么便利。

起源

注: 此部分参考了这个slides

Netflix做为RxJava的主要贡献者,把Rx.Net移植到了JVM上,下面我们就通过一个架构演进来了解一下RxJava的起源。

Netflix做为知名的流媒体提供商,坐拥3300万用户,其峰值下行流量可占北美地区互联网流量的33%,API请求量达到20亿次/天,面对众多的接入设备和巨量的API请求量, Netflix的工程师发现是时候重新架构他们的API了,之前他们的API架构如下:

为了减少API调用的延迟,工程师们决定转向粗粒度的API设计:

粗粒度API设计意味着业务逻辑向服务器端迁移,必将在服务器端出现并行执行,嵌套调用。

与此同时,我们希望API的提供者能够对外部隐藏其底层并发的实现,什么意思呢?

我们来看一个反面教材:

public Bookmark getBookmark(Long userId)

如果某天,我们的程序员说,这个接口执行时间太长了,我不想阻塞调用此方法的线程,做为API提供者,你应该会立刻想到采用如下方案:

public Future<Bookmark> getBookmark(Long userId)

把返回值类型修改为Future<Bookmark>,这样就不会阻塞API调用者的线程,同时在内部使用一个新的线程或者线程池执行业务逻辑。

又过了一段时间,我们的产品经理说,需求变了,我们希望能给一个用户关联多个Bookmark,你很开心地把接口修改成如下形式:

public Future<List<Bookmark>> getBookmark(Long userId)

然后和你们对接的客户端程序员实在忍受不了三番五次的接口改动,提着刀过来找你了。

在被砍得奄奄一息之后,你终于问出了这个问题:

有没有一种数据结构能够表示一个或多个在未来可用的结果,并且仅在结果可用时把结果传递给客户程序,客户程序无需了解其内部的并发实现?

有!Observable来救场

我曾在上篇博客里提到Observable的定义,我们这里再回顾一下:

Observable用于表示一个可被消费的数据集合(data provider),它的消费者无需知道数据的产生机制同步的还是异步的,它会在数据可用,出错,以及数据流结束时通知它的消费者

实际上,ObservableIterable是一组__对偶的(dual)__的概念,它们都被用来表示多个元素的集合,不同之处在于:

  • Observable中的元素可以是异步产生的,而Iterable中的所有元素在其被消费前必须可用。
  • Observablepush- based,其可以在元素可用,出错,所有元素都被消费完成时通知他的消费者;而Iterablepull-based,其消费者必须主动地轮询新元素,主动地捕获异常,主动地处理所有元素都被消费完成。

如果用Observable来改造上述API,那么我们的API就是下面这个样子:

public Observable<Bookmark> getBookmark(Long userId)

那么无论是需要同步返回,异步返回,或者异步返回多个Bookmark,这个API都无需做任何变更, API内部可以:

  • 使用调用线程进行业务逻辑计算,在运算完成后通过onNext将结果传递给客户端(原生的阻塞式调用)。

  • 使用一个新线程,或者一个新的线程池进行业务逻辑计算,在运算完成后通过onNext将结果传递给客户端(满足了异步返回的需求)。

  • 使用多个新线程,或者一个新的线程池分别获取多个bookmark,在获得每个bookmark之后多次通过onNext将结果传递给客户端(满足了异步返回多值的需求)。

如此大的变化,客户无需做任何改动,这就是Observable高超的抽象能力带来的好处。 这就是Rx带来的巨大好处,也是Netflix把它移植到JVM上的最大动力。

原理

既然Observable这么强大,那么我们不禁会问:

  • 为什么Observable能够做到push-based
  • Observable是如何做到使用多种并发实现的?

对于第一个问题,Observable提供了subscribe方法供客户程序注册回调函数,之后Observable会自己进行运算并调用相应的回调函数, 这样看起来就像是Observable在向自己的客户程序push其运算结果。

对于第二个问题,Rx中有个非常重要的概念—— Scheduler,它是Rx提供的一种并发模型抽象,你可以在创建你的Observable时指定采用哪种并发模型, 下面我们来看下Scheduler是如何对并发模型进行抽象的。

Scheduler

Rx的默认行为是单线程的,它是一个free-threaded t1模型,意味着你可以自由选择一个线程来执行你指定的任务。 如果你创建Observable时没有引入scheduler,那么你注册的onNext, onError, onCompleted回调将被当前线程(即,创建Observable代码所在线程)上执行。 Scheduler提供一种机制,用于指定将会执行回调的线程。

Scheduler的多种实现:

  • EventLoopsScheduler

    维护一组workers,当有新的任务加入#schedule时,通过round-robin的方式选取一个worker,将任务分配给其执行Worker#scheduleActual

  • CachedThreadScheduler

    使用一个ConcurrentLinkedQueue缓存创建出来的线程以备后续任务使用,创建出来的线程有一定的有效期,超过有效期的线程会被自动清除。

  • ExecutorScheduler

    包装了一个Executor的实例并,并基于它实现了Scheduler的接口。 注意,在这个实现里,thread-hopping(线程跳跃)问题是不可避免的,因为Scheduler并不知晓其包装的Executor的线程行为。

  • ImmediateScheduler

    立即在当前线程上执行任务。

  • TrampolineScheduler

    把任务分配给当前线程,但并不立即执行,任务会被放到一个队列中等待当前任务执行完毕。

Worker

回调的实际执行者,底层由java.util.concurrent.ExecutorService执行实际的任务,同时扮演了Subscription的角色。

总结

笔者在学习Coursea上学习Principal of Reactive Programming时,注意到Erik Miller曾打趣地说,不要自己尝试去实现Observable,使用现有的库就好了。 本着强大的好奇心,笔者还是试着阅读了Rx.java的源代码,才意识到这个模型是多么的精巧,它给多线程编程带来了革命性的体验。如果你对Rx.java有兴趣,强烈推荐阅读其源码。


Note: 这本应该重开一篇博客,然而,原谅笔者的懒惰吧

lift函数

在Observable的实现里,有个函数必须得提一下,lift

Rx中巧妙提出一个Operator的这个函数类型,表述从一个Subscriber 到另一个 Subscriber的映射。

有大量对Observable的操作是通过定义OperatorliftObservable上实现的,如Observable#all, Observable#filter, Observable#finallyDo等等。

Observable#lift签名如下:

//inside Observable[T]
def lift[T, R](Operator[R, T]): Observable[R]
lift函数简介

有一定函数式编程基础的人相信对lift这个名字都不会太陌生。 lift顾名思义,把一个对简单类型操作的函数提升(lift)到复杂类型/容器类型上去。

我们来看一个对lift的定义:

有个两个类型 A, B
和一个函数 f: A => B
和一个容器 M[_]
lift 就是把f转换成一个新的函数 M[A] => M[B]

那么lift的定义如下:

 def lift[A, B, M[_]](f: A => B): M[A] => M[B]

跟上面看到的Observable#lift唯一不同的地方在于,这个lift函数的返回值是一个函数, 不过再仔细观察一下,这个M[A] => M[B]应用到一个M[A]实例后的效果和上面的Observable#lift是一样的。

听起来比较抽象,其实只要我们把上面的符号逐个替换成我们所熟知的类型,这个函数一点都不陌生。 我们来逐步应用如下替换法则:

  • A: String
  • B: Int
  • M: List[_]
  • f: String => Int

因此,lift的类型签名如下:

(String => Int) => (List[String => List[Int])

我们就暂且用 (s: String) => s.length做为我们的f吧,

假如有个字符串列表 xs: List[String]

那么 lift(f)(xs) 就会得到xs的中每个字符串的长度。

什么?

这不就是 xs.map(f)的结果吗?

是的,map函数就是一种常见的lift函数。

Observable#lift

我们再来看看liftObservable中发挥了什么样的作用?

在开始之前大家需要记住这几个类型等式(:= 表示其左右两边的类型相等):

  • Observable[_] := Subscriber[_] => Unit
  • Operator[T, R] := Subscriber[R] => Subscriber[T]

现在我们来做类型替换:

  • A: Subscriber[R]
  • B: Subscriber[T]
  • M: Observable[_] (即 Subscriber[_] => Unit)
  • f: Subscriber[R] => Subscriber[T]

因此lift就是

(Subscriber[R] => Subscriber[T]) => (Subscriber[T] => Unit) => (Subscriber[R] => Unit)

亦即

(Subscriber[R] => Subscriber[T]) => (Observable[T] => Observable[R])

假如有个ts: Observable[T] 和一个函数f: Subscriber[R] => Subscriber[T],通过lift函数,我们就能得到一个类型为 Observable[R]的结果。


t1:free-threaded模型相对的是single-threaded apartment,意味着你必须通过一个指定的线程与系统交互。