我曾经看到过很多玩具项目,都是通过一个系统支撑所有的业务需求。 然而真正在产品环境运行的系统,一般都由多个子系统组成,各个子系统承担不同的职责,子系统之间以某种通信方式进行合作。 究其原因,不外乎单块系统架构带来的复杂性,单机系统的不可靠性。 本文笔者会将一个实际项目中系统设计部分的思考方式分享给大家。

背景介绍

我们的客户希望构建一个信息发布系统,其能够提供以下功能:

  • 通过多种渠道(e.g. email/sms/mobile push notification)向他们的用户发送消息。
  • 尽可能地收集用户收到这些信息后的行为。
  • 基于这些行为分析用户对某类信息的接受度,以支撑后续业务决策。

模块划分

基于上面的需求,我们做的第一件事请就是划分子系统,确定系统之间的职责边界。

  • api 向外暴露一组api,接受即将发送的消息并持久化起来。
  • dispatcher 负责将持久化后的消息分发给一组sender,并且保证一个消息只会被dispatch一次。
  • sender 负责尽可能快地将接受到的消息发送出去。
  • event-collector 负责收集用户对消息的反馈。
  • analyser 负责对收集到的用户行为数据进行分析并生成报表。

问题驱动

对于每个子系统,要能够正常地完成期望的工作,可以从以下几个角度考虑:

  • 性能
  • 可靠性
  • 监控与告警
性能

对一个web系统而言,最直观的两个性能指标就是TPS(transaction per second),response time。 一个系统能否在性能上满足需求,做为架构师,我们需要能够回答下面几个问题:

  • 这个系统即将面对的请求模型是什么?比如每个endpoint接受的请求占整个请求的百分比,比如可遇见的peek time(如天猫双十一)。
  • 确定一个合理的期望的TPS,response time值(不要拍脑袋定,不要拍脑袋定,不要拍脑袋定,否则开发人员会被搞死的)。
  • 考虑你的系统如何应对设计容量之外的请求,系统是否需要支持自动扩展?
  • 如果需要,自动扩展的策略是什么?(如,响应时间大于2秒,CPU占用率大于90%并持续10分钟以上)
  • 系统是否需要有流控能力(throttling & back-pressure)?例如,在上游系统消息发送过快时,告知对方降低发送速率,当没有需要处理的消息时,告知上游系统发送消息。
  • 消息的时效性是什么?过了多久就可以任为这个消息是过期的?对过期消息的处理策略又是什么?

可靠性

我们最常听到人们谈到系统可靠性指标时,会提到‘两个九,三个九’,指的是系统不可用时间占比,以一年为例,99.99%的可靠性就是指一年内服务中断时间不能超过52分钟。

为了保证系统的高可靠性,我们需要考虑以下几个问题:

  • 是否支持零宕机部署?即在不中断服务的情况下部署系统
  • 系统是否有fail-over策略?即在某个机器宕机的情况下, 如何启动新服务继续提供服务。在我们的系统中,采用了AutoscalingGroup的能力保证提供服务的最小机器数量。
  • 系统是否有fault-tolerance能力?在我们的系统中,对于一些关键数据,除了正常在数据流中流转之外,还会在S3上存放一份备份,以便在出错时还可以从备份中恢复数据。
  • 系统是否有灾难恢复能力? 这是一种比较极端的场景,然而一旦出现,其带来的影响可能是毁灭性的。试想一下某个公司的服务器所在的机房突发火灾,所有资料全部丢失,那么这个公司也基本完蛋了。 我们的系统是部署在Amazon的云服务上,为了能够具备这一能力,我们创建了一些可重用的部署模板和部署脚本。那怕Amazon的某个区全部不可用,我们也有能力在另外一个区很快地把整个系统重建出来。
  • 系统之间的依赖是否真得需要实时依赖? 例如,子系统A在完成某个功能时,调用系统B的某个接口并等待其返回,待结果返回后再执行后续操作,这就是实时依赖。 实时依赖有一个很大的弊端就是cascade failure - 当被依赖的系统不可用时,所有实时依赖于它的系统都变得不可用。 因此,要尽可能地避免这种实时依赖。

    有些同事对如何避免实时依赖比较感兴趣,在这里就多说一点。假定有两个系统A和B,A系统有个功能需要从B系统获取一些信息,如果是实时依赖的实现方式,那么两个系统之间的交互方式就如下图:

在我们的项目中采用了异步任务的方式来尽量减少实时依赖的影响,方式如下: 把pre-workapi调用after-work划分为三个独立的任务,这些异步任务有如下特性:

  • 前一个任务完成后会触发下一个任务。
  • 是幂等的,即多次执行和一次执行的结果是一致的。
  • 是可重试的。

异步化之后的交互方式如下图:

总结一下这个方案背后的思路: 通过把一个包含实时依赖的任务划分成多个有依赖关系的子任务,把实时依赖失败可能导致的影响限制在一个很小的范围内,并且,提供了良好的错误恢复机制,这也是design-for-failure思想的体现。

监控与告警

系统上线之后,我们可不希望直到客服找到我们的时候才执行我们的系统出现问题。 因此,在上线之前,我们需要回答下面这些问题:

  • 系统的一些关键路径的性能如何,我们能时刻知道吗?
  • 当系统主要功能出现问题时,系统能够主动报警吗?

我们系统里使用了不少工具来解决此类问题,这里跟大家分享一下

Newrelic

newrelic是一个SaaS的付费服务,主要提供了性能监控,报警等能力。 newrelic提供了多种语言的api,开发人员可以通过这些api监控代码中的关键路径。在系统运行时,newrelic agent会随之启动,并监控这些关键路径的执行时间,错误信息并上报给newrelic server。

Pagerduty

提供告警能力,在告警发生时,可通过电话,短信,邮件,push notification多种渠道通知运维人员。

CloudWatch

CloudWatch是Amazon提供的一种度量服务,这里我主要说下customised metric在系统关键路径收集数据并通过CloudWatch api上报,Amazon会存储这些数据并通过API暴露出来,开发人员可以选择用合适的方式将其可视化出来。 我们系统的dashboad就是把这些度量数据展示出来。

Splunk

splunk主要有两种角色:splunk-forwarder和splunk-indexer 在每个子系统上运行一个splunk-forwarder,收集指定的日志文件并转发给splunk-indexer。 splunk-indexer提供日志的索引,搜索能力。

TLDR;

本文是rx-java的后续, 将简要介绍Rx的起源,其多线程实现机制,另外会对其实现中的一个重要函数lift函数原理进行介绍。

Rx

Rx实际上是一种高级版本的Observer模式,把被观察者封装成Observable(可理解为一个异步地生产元素的集合), 然后通过 onNext, onError, onCompleted注册Observer在相应场景下需要执行的 回调。 值得一提的是,Rx为Observable提供了的filter,map/flatMap, merge, reduce等操作,使得对被观察者的操作就像同步集合那么便利。

起源

注: 此部分参考了这个slides

Netflix做为RxJava的主要贡献者,把Rx.Net移植到了JVM上,下面我们就通过一个架构演进来了解一下RxJava的起源。

Netflix做为知名的流媒体提供商,坐拥3300万用户,其峰值下行流量可占北美地区互联网流量的33%,API请求量达到20亿次/天,面对众多的接入设备和巨量的API请求量, Netflix的工程师发现是时候重新架构他们的API了,之前他们的API架构如下:

为了减少API调用的延迟,工程师们决定转向粗粒度的API设计:

粗粒度API设计意味着业务逻辑向服务器端迁移,必将在服务器端出现并行执行,嵌套调用。

与此同时,我们希望API的提供者能够对外部隐藏其底层并发的实现,什么意思呢?

我们来看一个反面教材:

public Bookmark getBookmark(Long userId)

如果某天,我们的程序员说,这个接口执行时间太长了,我不想阻塞调用此方法的线程,做为API提供者,你应该会立刻想到采用如下方案:

public Future<Bookmark> getBookmark(Long userId)

把返回值类型修改为Future<Bookmark>,这样就不会阻塞API调用者的线程,同时在内部使用一个新的线程或者线程池执行业务逻辑。

又过了一段时间,我们的产品经理说,需求变了,我们希望能给一个用户关联多个Bookmark,你很开心地把接口修改成如下形式:

public Future<List<Bookmark>> getBookmark(Long userId)

然后和你们对接的客户端程序员实在忍受不了三番五次的接口改动,提着刀过来找你了。

在被砍得奄奄一息之后,你终于问出了这个问题:

有没有一种数据结构能够表示一个或多个在未来可用的结果,并且仅在结果可用时把结果传递给客户程序,客户程序无需了解其内部的并发实现?

有!Observable来救场

我曾在上篇博客里提到Observable的定义,我们这里再回顾一下:

Observable用于表示一个可被消费的数据集合(data provider),它的消费者无需知道数据的产生机制同步的还是异步的,它会在数据可用,出错,以及数据流结束时通知它的消费者

实际上,ObservableIterable是一组__对偶的(dual)__的概念,它们都被用来表示多个元素的集合,不同之处在于:

  • Observable中的元素可以是异步产生的,而Iterable中的所有元素在其被消费前必须可用。
  • Observablepush- based,其可以在元素可用,出错,所有元素都被消费完成时通知他的消费者;而Iterablepull-based,其消费者必须主动地轮询新元素,主动地捕获异常,主动地处理所有元素都被消费完成。

如果用Observable来改造上述API,那么我们的API就是下面这个样子:

public Observable<Bookmark> getBookmark(Long userId)

那么无论是需要同步返回,异步返回,或者异步返回多个Bookmark,这个API都无需做任何变更, API内部可以:

  • 使用调用线程进行业务逻辑计算,在运算完成后通过onNext将结果传递给客户端(原生的阻塞式调用)。

  • 使用一个新线程,或者一个新的线程池进行业务逻辑计算,在运算完成后通过onNext将结果传递给客户端(满足了异步返回的需求)。

  • 使用多个新线程,或者一个新的线程池分别获取多个bookmark,在获得每个bookmark之后多次通过onNext将结果传递给客户端(满足了异步返回多值的需求)。

如此大的变化,客户无需做任何改动,这就是Observable高超的抽象能力带来的好处。 这就是Rx带来的巨大好处,也是Netflix把它移植到JVM上的最大动力。

原理

既然Observable这么强大,那么我们不禁会问:

  • 为什么Observable能够做到push-based
  • Observable是如何做到使用多种并发实现的?

对于第一个问题,Observable提供了subscribe方法供客户程序注册回调函数,之后Observable会自己进行运算并调用相应的回调函数, 这样看起来就像是Observable在向自己的客户程序push其运算结果。

对于第二个问题,Rx中有个非常重要的概念—— Scheduler,它是Rx提供的一种并发模型抽象,你可以在创建你的Observable时指定采用哪种并发模型, 下面我们来看下Scheduler是如何对并发模型进行抽象的。

Scheduler

Rx的默认行为是单线程的,它是一个free-threaded t1模型,意味着你可以自由选择一个线程来执行你指定的任务。 如果你创建Observable时没有引入scheduler,那么你注册的onNext, onError, onCompleted回调将被当前线程(即,创建Observable代码所在线程)上执行。 Scheduler提供一种机制,用于指定将会执行回调的线程。

Scheduler的多种实现:

  • EventLoopsScheduler

    维护一组workers,当有新的任务加入#schedule时,通过round-robin的方式选取一个worker,将任务分配给其执行Worker#scheduleActual

  • CachedThreadScheduler

    使用一个ConcurrentLinkedQueue缓存创建出来的线程以备后续任务使用,创建出来的线程有一定的有效期,超过有效期的线程会被自动清除。

  • ExecutorScheduler

    包装了一个Executor的实例并,并基于它实现了Scheduler的接口。 注意,在这个实现里,thread-hopping(线程跳跃)问题是不可避免的,因为Scheduler并不知晓其包装的Executor的线程行为。

  • ImmediateScheduler

    立即在当前线程上执行任务。

  • TrampolineScheduler

    把任务分配给当前线程,但并不立即执行,任务会被放到一个队列中等待当前任务执行完毕。

Worker

回调的实际执行者,底层由java.util.concurrent.ExecutorService执行实际的任务,同时扮演了Subscription的角色。

总结

笔者在学习Coursea上学习Principal of Reactive Programming时,注意到Erik Miller曾打趣地说,不要自己尝试去实现Observable,使用现有的库就好了。 本着强大的好奇心,笔者还是试着阅读了Rx.java的源代码,才意识到这个模型是多么的精巧,它给多线程编程带来了革命性的体验。如果你对Rx.java有兴趣,强烈推荐阅读其源码。


Note: 这本应该重开一篇博客,然而,原谅笔者的懒惰吧

lift函数

在Observable的实现里,有个函数必须得提一下,lift

Rx中巧妙提出一个Operator的这个函数类型,表述从一个Subscriber 到另一个 Subscriber的映射。

有大量对Observable的操作是通过定义OperatorliftObservable上实现的,如Observable#all, Observable#filter, Observable#finallyDo等等。

Observable#lift签名如下:

//inside Observable[T]
def lift[T, R](Operator[R, T]): Observable[R]

#####lift函数简介 有一定函数式编程基础的人相信对lift这个名字都不会太陌生。 lift顾名思义,把一个对简单类型操作的函数提升(lift)到复杂类型/容器类型上去。

我们来看一个对lift的定义:

有个两个类型 A, B
和一个函数 f: A => B
和一个容器 M[_]
lift 就是把f转换成一个新的函数 M[A] => M[B]

那么lift的定义如下:

 def lift[A, B, M[_]](f: A => B): M[A] => M[B]

跟上面看到的Observable#lift唯一不同的地方在于,这个lift函数的返回值是一个函数, 不过再仔细观察一下,这个M[A] => M[B]应用到一个M[A]实例后的效果和上面的Observable#lift是一样的。

听起来比较抽象,其实只要我们把上面的符号逐个替换成我们所熟知的类型,这个函数一点都不陌生。 我们来逐步应用如下替换法则:

  • A: String
  • B: Int
  • M: List[_]
  • f: String => Int

因此,lift的类型签名如下:

(String => Int) => (List[String => List[Int])

我们就暂且用 (s: String) => s.length做为我们的f吧,

假如有个字符串列表 xs: List[String]

那么 lift(f)(xs) 就会得到xs的中每个字符串的长度。

什么?

这不就是 xs.map(f)的结果吗?

是的,map函数就是一种常见的lift函数。

Observable#lift

我们再来看看liftObservable中发挥了什么样的作用?

在开始之前大家需要记住这几个类型等式(:= 表示其左右两边的类型相等):

  • Observable[_] := Subscriber[_] => Unit
  • Operator[T, R] := Subscriber[R] => Subscriber[T]

现在我们来做类型替换:

  • A: Subscriber[R]
  • B: Subscriber[T]
  • M: Observable[_] (即 Subscriber[_] => Unit)
  • f: Subscriber[R] => Subscriber[T]

因此lift就是

(Subscriber[R] => Subscriber[T]) => (Subscriber[T] => Unit) => (Subscriber[R] => Unit)

亦即

(Subscriber[R] => Subscriber[T]) => (Observable[T] => Observable[R])

假如有个ts: Observable[T] 和一个函数f: Subscriber[R] => Subscriber[T],通过lift函数,我们就能得到一个类型为 Observable[R]的结果。


t1:free-threaded模型相对的是single-threaded apartment,意味着你必须通过一个指定的线程与系统交互。

太长不读;

本文将会围绕reactive extension介绍reactive programming的起源,其要解决的问题。

编程范式的演进

最近几年,reactive programming这个词语的热度迅速提升,下面的 google trends的这个图表很能说明问题。

自从高级编程语言被发明以来,各种编程范式的编程语言层出不穷,命令式编程(如C) 面向对象编程(如Java,Ruby),函数式编程(如Clojure, Scala,Haskell)都曾经或者正在软件开发领域占有一席之地。

面向对象编程

上世纪九十年代前,命令式编程仍然在软件开发领域占有主导地位。随着软件规模的不断增大,面向对象编程以其封装性,可重用性受到开发者和组织的青睐。

进入多核时代

随着摩尔定律的失效,单核CPU的计算能力几乎达到了极限,CPU进入了多核时代,程序员转而通过并发编程,分布式系统来应对越来越复杂的计算任务。

然而并发编程并不是银弹,做为一种基于共享内存的并发编程,多线程编程有常见的死锁线程饥饿race condition等问题,而且多线程bug的以其难以重现定位臭名昭著。

函数式编程的兴起

近年来逐渐火爆的functional programming以其提倡的:

  • 函数是编程语言的一等公民(function as first-class citizen)
  • 不可变量(immutable variable)
  • 无副作用的函数(no side-effect/reference transparency)
  • 可组合的函数(composable functions)

顺利地解决了因可变量mutabble variable被多个线程共享,修改等而导致可能的多线程的bug。

并发编程的痛点仍然存在

然而,functional programming就是现代的完美编程范式了么?远远不是。

即使使用了functional programming, 程序员总会需要处理异步任务或者事件,并且总有一些IO或者计算密集型的任务,这些任务可能还会阻塞其他活动线程,而且,处理异常,失败,线程任务之间的同步都比较困难而且容易出错。程序员需要不断地询问一个线程的运算结果(在Java中以Future<T>表示,T表示运算结果的类型)是否可用。我们来考虑一下下面两个例子:

有三个线程t1, t2, t3,他们的运算结果分别为f1, f2, f3。 有一个线程t4依赖于这三个线程的运行结果,而且每个线程都有有可能执行失败。 我们该如何编写线程t4的代码?

GUI程序中一次拖动操作中光标的位置就可被表示为Future<List<Position>>, (使用Future是因为这些Position的值是在未来的时间点生成的)。

如果我们希望在第一个Position可用时(拖动时间的开始位置)就能够在这Position所对应的位置画点,而不是等所有的Position都可用是一次性把光标的运行轨迹画出来。即我们希望程序能够尽快对输入进行响应。

即程序要及时,非阻塞地对输入响应。

上面的两个例子就是reactive programming尝试解决的问题,而Reactive Extension做为这个问题的答案,应运而生了。

Reactive Extension

Reactive Extension 这个概念最早出现在.net社区的Rx.net,一个提供处理异步事件的程序库,其核心概念是Observable,表示有限或者无限多个现在或者将来到达的事件。Observable提供了onNextonErroronCompleted供开发者定制新元素到达,出现错误,或者流结束时的程序的行为。 并提供了List上类似的操作,如mapfilterreduce,大大降低了异步事件编程的复杂度。

因为这些概念是如此的强大,以至于很多编程语言,如javarubyjavascript很快就有了各自的reactvie extension

关于reactive extension的技术细节可以在我的这篇博客里找到。这个视频详细地介绍了为什么需要reactive extension,以及reactive extension的是如何被发明出来的。

Reactive Manifesto

Wikipedia上对reactive programming解释如下:

reactive programming is a programming paradigm oriented around data flows and the propagation of change.

举个例子,在命令式编程下,表达式a = b + c,a的值在这个表达式执行完毕之后就是确定的,即使bc的值发生变化,a的值也不会改变。然而在响应式编程的语境下,a的值与bc的值是绑定的,上述表达式其实建立的是abc之间的一种依赖,a的值会随bc的变化而变化。

我们称之为能够响应输入变化的 事件(event)

然而现在来看,上述定义已经不能囊括reactive programming的含义了。随着软件系统的非功能需求要求越来越高,reactive已不仅局限于响应 事件(event)__的传递,也表示程序能够响应 __负载(load),系统运行时出现的 错误(failure)

发布于2014年9月份的Reactive Manifesto以宣言的形式提供了能够满足这些需求的软件系统架构设计的指导原则。

Reactive Architecture

在笔者看来,reactive programming可以从语言和架构两种层面上来理解, 近年来层出不穷的各种语言的 reactive extention 就是语言层面的代表,而在架构层面上,也有遵循了reactive manifesto的类库(如akka)出现,笔者暂且称之为 reactive architecture。 在后续的文章中,笔者将会带着大家理解一个 reactive architecture 是如何做到reactive的。

回头看看最近几年写的博客,绝大部分都是技术相关的内容,然而再往前几年, 博客里更多的是对生活的回顾,憧憬,现在读来格外温暖。今天就扯一段与技术无关的小感想。

自从搬回西安,就一直对西安的空气,环境多有抱怨,然而如今已不似当年无牵无挂,可以四处奔波,想要逃避已不太容易。 年复一年,和亲爱的她走到了一起,也迎来了小叶子,生活状态又原来的二人世界变成五口之家,确实耗费了些许时间去适应,接受。 每年,除了夏天,雾霾几乎隔个几年就来,所能做得,就是给家人和自己购买号称能够防止PM2.5的3M口罩,同时劝说他们接受PM2.5对人体危害甚大的事实。 然而,每当此时,天空,地面都是灰白的颜色,心情也是。多少次想过逃离,就会有多少次被现实拉回。逃避,不会解决问题。

直到今天看了柴静的这个穹顶之下专辑, 才发现,这个看似柔弱的女子有着怎样一颗强大的内心。不去抱怨,逃避,去直面它,行动。

最喜欢她的那句话,“人去做什么,是因为心底有爱惜”。

以此勉励自己。

Java 8 has been released quite a long time, since I come to ruby in 2011, I haven’t work with java. Even I heard that there’re few cool features come out with Java 8, like lambda, stream collection, functional interface, new date api. none of them can attract me, given ruby ship all these features since the date of birth.

While recently I do try to solve a problem with Java, and I found that Java has changed a lot compared with my impression on it. in this post, I’m going to talk about the collectors shipped with Java 8. also I’ll try to give collector example in scala.

The Problem

Reduction, (aka. iterating on a collection, apply reduction computation on each elements, and produce a single result or a smaller collection) is a common problem in any programming language.

Let’s look at a specific example:

Given an collection of employees, grouping these employees by age produce a map between age and list of employees.

Here is the class definition of Employee:

class Employee{
    private int age;
    private String name;

    Employee(String name, int age) {
        this.age = age;
        this.name = name;
    }
}

The Solution

a simple implementation could be:

List<Employee> employees = Arrays.asList(new Employee("A", 18), new Employee("B", 20), new Employee("C", 18));
Map<Integer, List<Employee>> byAge = new HashMap<>();
for(Employee employee: employees) {
  List<Employee> employeeByAge = byAge.get(employee.getAge())
  if(employeeByAge = null) {
    employeeByAge = new ArrayList<>();
  }
  employeeByAge.add(employee);
  byAge.put(employee.getAge(), employeeByAge);
}

if you have been working with Java for quite a long time, you may be sick to write these code. you must have write code in this structure for quite a long time. to demonstrate the duplication of this structure, let’s rewrite the above code to this format:

Collection<T> collections = ...
Collection<R> results = new SomeCollection;

for(T t: in collections) {
  R r = results.get(t)
  if (r == null) {
    r = new R
  }
  r.add(t)
}

all these code did is to collect some information for give collection and apply reduction on the items in this collection and produce a result container.

with Java 8s collector interface, you can simply do

List<Employee> employees = Arrays.asList(new Employee("A", 18), new Employee("B", 20), new Employee("C", 18));
Map<Integer, List<Employee>> byAge = employees.stream().collect(Collectors.groupingBy((e) -> e.age));

so what is the magic behind it:

The magic is behind the Collector<T, A, R> interface:

Collectors.groupingBy is a built in collector which acceptting a function with type T -> K which can be group against(in this case, employee.age()). it will produce a result with type Map<K, List<T>>(aka, the result type R).

Here is the official definition of Collector from its api document:

A mutable reduction operation that accumulates input elements into a mutable result container, optionally transforming the accumulated result into a final representation after all input elements have been processed. Reduction operations can be performed either sequentially or in parallel.

You see from the document, Collector take three type parameters T, A and R, where T is the type of element inside the collection, A is an intermediate type which could be used to do the mutable reduction, R is the type of result.

There four functions in this interface which work together to accumulate entries into a mutable result container.

  • supplier(), with type () -> A - creation of a new result container.
  • accumulator(), with type(A, T) -> A - incorprating a new element into the result container.
  • combiner(), with type (A, A) -> A - combing two result container into one.
  • finisher(), with type A -> R - a optional final transformation on the result container to get the result. the optional means that in some scenarios, A and R could be same, so the finisher function is not required. but in some other cases, when A and R are different, this function is required to get the final result.

In the previous example, the type of result of Collector.groupingBy is Collector<Employee, ?, Map<Integer, List<Employee>>.

let’s extend this problem a little bit: how about grouping employees by age range(e.g. 20-29 as a group, 30-39 as a group) this time, you can not find any buitin collector which is suitable to solve this problem, now, you will need a customised collector to do the reduction.

(this blog post)[http://www.nurkiewicz.com/2014/07/introduction-to-writing-custom.html] is a fairly good guide for how to create you own Collector implementation.

Collector in Scala

After found this useful pattern, I wonder if scala’s powerful collection system support this computation. Unfortunately, I can not found a similar api from any collection type. But I do found that we can easily build our own version of collector based on scala.collection.mutable.Builder.

scala.collection.mutable.Builder play the same role with accumulator (the A) in java Collector. Let’s see the following example of how we implement the collect method in scala and how we use it to solve the word count problem:

import scala.collection.mutable.Builder

//`T` is the type of element in collection, `Builder` is the type of intermediate result container, `R` is the type of reduction result.
def collect[T, R] (ts: List[T], a: Builder[T, R]):R = {
  ts.foreach (a += _) //invoke the accumulator function
  a.result //invoke the finisher function
}

//word counting builder
//from Seq[String] to Map[String, Int]
class CounterBuilder[String, Map[String, Int]] extends Builder[String, Map[String, Int]] {

  var counter = scala.collection.mutable.Map[String, Int]()

  def += (el: String) = {
    counter.get(el)  match {
      case None => counter += el -> 1
      case Some(count) => counter += el -> (count + 1)
    }
    this
  }

  def clear = counter = scala.collection.mutable.Map[String, Int]()

  def result: Map[String, Int] = counter.toMap[String, Int].asInstanceOf[Map[String, Int]]
}

and here is the code to use the CounterBuilder

//use case
val xs = List("a", "a", "a", "c", "d")

//the supplier function
val builder = new CounterBuilder
val res = collect[String, Map[String, Int]] (xs, new CounterBuilder)
//output: Map(d -> 1, a -> 3, c -> 1)

Conclusion

  • Java 8s Collector Api provide a better way to encapsulate reduction computation - not only some built in reduction(e.g. max, min, sum, average), but also customized reduction(via customised collector), Collector is designed to be composed, which means these reduction logic are much easier to be reused.
  • Scala dose not have native support for customised mutable reduction, but based on scala’s powerfull collection system, we can create our own version.

References