Rx revisit

TLDR;

本文是rx-java的后续, 将简要介绍Rx的起源,其多线程实现机制,另外会对其实现中的一个重要函数lift函数原理进行介绍。

Rx

Rx实际上是一种高级版本的Observer模式,把被观察者封装成Observable(可理解为一个异步地生产元素的集合), 然后通过 onNext, onError, onCompleted注册Observer在相应场景下需要执行的 回调。 值得一提的是,Rx为Observable提供了的filter,map/flatMap, merge, reduce等操作,使得对被观察者的操作就像同步集合那么便利。

起源

注: 此部分参考了这个slides

Netflix做为RxJava的主要贡献者,把Rx.Net移植到了JVM上,下面我们就通过一个架构演进来了解一下RxJava的起源。

Netflix做为知名的流媒体提供商,坐拥3300万用户,其峰值下行流量可占北美地区互联网流量的33%,API请求量达到20亿次/天,面对众多的接入设备和巨量的API请求量, Netflix的工程师发现是时候重新架构他们的API了,之前他们的API架构如下:

为了减少API调用的延迟,工程师们决定转向粗粒度的API设计:

粗粒度API设计意味着业务逻辑向服务器端迁移,必将在服务器端出现并行执行,嵌套调用。

与此同时,我们希望API的提供者能够对外部隐藏其底层并发的实现,什么意思呢?

我们来看一个反面教材:

public Bookmark getBookmark(Long userId)

如果某天,我们的程序员说,这个接口执行时间太长了,我不想阻塞调用此方法的线程,做为API提供者,你应该会立刻想到采用如下方案:

public Future<Bookmark> getBookmark(Long userId)

把返回值类型修改为Future<Bookmark>,这样就不会阻塞API调用者的线程,同时在内部使用一个新的线程或者线程池执行业务逻辑。

又过了一段时间,我们的产品经理说,需求变了,我们希望能给一个用户关联多个Bookmark,你很开心地把接口修改成如下形式:

public Future<List<Bookmark>> getBookmark(Long userId)

然后和你们对接的客户端程序员实在忍受不了三番五次的接口改动,提着刀过来找你了。

在被砍得奄奄一息之后,你终于问出了这个问题:

有没有一种数据结构能够表示一个或多个在未来可用的结果,并且仅在结果可用时把结果传递给客户程序,客户程序无需了解其内部的并发实现?

有!Observable来救场

我曾在上篇博客里提到Observable的定义,我们这里再回顾一下:

Observable用于表示一个可被消费的数据集合(data provider),它的消费者无需知道数据的产生机制同步的还是异步的,它会在数据可用,出错,以及数据流结束时通知它的消费者

实际上,ObservableIterable是一组__对偶的(dual)__的概念,它们都被用来表示多个元素的集合,不同之处在于:

  • Observable中的元素可以是异步产生的,而Iterable中的所有元素在其被消费前必须可用。
  • Observablepush- based,其可以在元素可用,出错,所有元素都被消费完成时通知他的消费者;而Iterablepull-based,其消费者必须主动地轮询新元素,主动地捕获异常,主动地处理所有元素都被消费完成。

如果用Observable来改造上述API,那么我们的API就是下面这个样子:

public Observable<Bookmark> getBookmark(Long userId)

那么无论是需要同步返回,异步返回,或者异步返回多个Bookmark,这个API都无需做任何变更, API内部可以:

  • 使用调用线程进行业务逻辑计算,在运算完成后通过onNext将结果传递给客户端(原生的阻塞式调用)。

  • 使用一个新线程,或者一个新的线程池进行业务逻辑计算,在运算完成后通过onNext将结果传递给客户端(满足了异步返回的需求)。

  • 使用多个新线程,或者一个新的线程池分别获取多个bookmark,在获得每个bookmark之后多次通过onNext将结果传递给客户端(满足了异步返回多值的需求)。

如此大的变化,客户无需做任何改动,这就是Observable高超的抽象能力带来的好处。 这就是Rx带来的巨大好处,也是Netflix把它移植到JVM上的最大动力。

原理

既然Observable这么强大,那么我们不禁会问:

  • 为什么Observable能够做到push-based
  • Observable是如何做到使用多种并发实现的?

对于第一个问题,Observable提供了subscribe方法供客户程序注册回调函数,之后Observable会自己进行运算并调用相应的回调函数, 这样看起来就像是Observable在向自己的客户程序push其运算结果。

对于第二个问题,Rx中有个非常重要的概念—— Scheduler,它是Rx提供的一种并发模型抽象,你可以在创建你的Observable时指定采用哪种并发模型, 下面我们来看下Scheduler是如何对并发模型进行抽象的。

Scheduler

Rx的默认行为是单线程的,它是一个free-threaded t1模型,意味着你可以自由选择一个线程来执行你指定的任务。 如果你创建Observable时没有引入scheduler,那么你注册的onNext, onError, onCompleted回调将被当前线程(即,创建Observable代码所在线程)上执行。 Scheduler提供一种机制,用于指定将会执行回调的线程。

Scheduler的多种实现:

  • EventLoopsScheduler

    维护一组workers,当有新的任务加入#schedule时,通过round-robin的方式选取一个worker,将任务分配给其执行Worker#scheduleActual

  • CachedThreadScheduler

    使用一个ConcurrentLinkedQueue缓存创建出来的线程以备后续任务使用,创建出来的线程有一定的有效期,超过有效期的线程会被自动清除。

  • ExecutorScheduler

    包装了一个Executor的实例并,并基于它实现了Scheduler的接口。 注意,在这个实现里,thread-hopping(线程跳跃)问题是不可避免的,因为Scheduler并不知晓其包装的Executor的线程行为。

  • ImmediateScheduler

    立即在当前线程上执行任务。

  • TrampolineScheduler

    把任务分配给当前线程,但并不立即执行,任务会被放到一个队列中等待当前任务执行完毕。

Worker

回调的实际执行者,底层由java.util.concurrent.ExecutorService执行实际的任务,同时扮演了Subscription的角色。

总结

笔者在学习Coursea上学习Principal of Reactive Programming时,注意到Erik Miller曾打趣地说,不要自己尝试去实现Observable,使用现有的库就好了。 本着强大的好奇心,笔者还是试着阅读了Rx.java的源代码,才意识到这个模型是多么的精巧,它给多线程编程带来了革命性的体验。如果你对Rx.java有兴趣,强烈推荐阅读其源码。


Note: 这本应该重开一篇博客,然而,原谅笔者的懒惰吧

lift函数

在Observable的实现里,有个函数必须得提一下,lift

Rx中巧妙提出一个Operator的这个函数类型,表述从一个Subscriber 到另一个 Subscriber的映射。

有大量对Observable的操作是通过定义OperatorliftObservable上实现的,如Observable#all, Observable#filter, Observable#finallyDo等等。

Observable#lift签名如下:

//inside Observable[T]
def lift[T, R](Operator[R, T]): Observable[R]
lift函数简介

有一定函数式编程基础的人相信对lift这个名字都不会太陌生。 lift顾名思义,把一个对简单类型操作的函数提升(lift)到复杂类型/容器类型上去。

我们来看一个对lift的定义:

有个两个类型 A, B
和一个函数 f: A => B
和一个容器 M[_]
lift 就是把f转换成一个新的函数 M[A] => M[B]

那么lift的定义如下:

 def lift[A, B, M[_]](f: A => B): M[A] => M[B]

跟上面看到的Observable#lift唯一不同的地方在于,这个lift函数的返回值是一个函数, 不过再仔细观察一下,这个M[A] => M[B]应用到一个M[A]实例后的效果和上面的Observable#lift是一样的。

听起来比较抽象,其实只要我们把上面的符号逐个替换成我们所熟知的类型,这个函数一点都不陌生。 我们来逐步应用如下替换法则:

  • A: String
  • B: Int
  • M: List[_]
  • f: String => Int

因此,lift的类型签名如下:

(String => Int) => (List[String => List[Int])

我们就暂且用 (s: String) => s.length做为我们的f吧,

假如有个字符串列表 xs: List[String]

那么 lift(f)(xs) 就会得到xs的中每个字符串的长度。

什么?

这不就是 xs.map(f)的结果吗?

是的,map函数就是一种常见的lift函数。

Observable#lift

我们再来看看liftObservable中发挥了什么样的作用?

在开始之前大家需要记住这几个类型等式(:= 表示其左右两边的类型相等):

  • Observable[_] := Subscriber[_] => Unit
  • Operator[T, R] := Subscriber[R] => Subscriber[T]

现在我们来做类型替换:

  • A: Subscriber[R]
  • B: Subscriber[T]
  • M: Observable[_] (即 Subscriber[_] => Unit)
  • f: Subscriber[R] => Subscriber[T]

因此lift就是

(Subscriber[R] => Subscriber[T]) => (Subscriber[T] => Unit) => (Subscriber[R] => Unit)

亦即

(Subscriber[R] => Subscriber[T]) => (Observable[T] => Observable[R])

假如有个ts: Observable[T] 和一个函数f: Subscriber[R] => Subscriber[T],通过lift函数,我们就能得到一个类型为 Observable[R]的结果。


t1:free-threaded模型相对的是single-threaded apartment,意味着你必须通过一个指定的线程与系统交互。

也谈响应式编程

太长不读;

本文将会围绕reactive extension介绍reactive programming的起源,其要解决的问题。

编程范式的演进

最近几年,reactive programming这个词语的热度迅速提升,下面的 google trends的这个图表很能说明问题。

自从高级编程语言被发明以来,各种编程范式的编程语言层出不穷,命令式编程(如C) 面向对象编程(如Java,Ruby),函数式编程(如Clojure, Scala,Haskell)都曾经或者正在软件开发领域占有一席之地。

面向对象编程

上世纪九十年代前,命令式编程仍然在软件开发领域占有主导地位。随着软件规模的不断增大,面向对象编程以其封装性,可重用性受到开发者和组织的青睐。

进入多核时代

随着摩尔定律的失效,单核CPU的计算能力几乎达到了极限,CPU进入了多核时代,程序员转而通过并发编程,分布式系统来应对越来越复杂的计算任务。

然而并发编程并不是银弹,做为一种基于共享内存的并发编程,多线程编程有常见的死锁线程饥饿race condition等问题,而且多线程bug的以其难以重现定位臭名昭著。

函数式编程的兴起

近年来逐渐火爆的functional programming以其提倡的:

  • 函数是编程语言的一等公民(function as first-class citizen)
  • 不可变量(immutable variable)
  • 无副作用的函数(no side-effect/reference transparency)
  • 可组合的函数(composable functions)

顺利地解决了因可变量mutabble variable被多个线程共享,修改等而导致可能的多线程的bug。

并发编程的痛点仍然存在

然而,functional programming就是现代的完美编程范式了么?远远不是。

即使使用了functional programming, 程序员总会需要处理异步任务或者事件,并且总有一些IO或者计算密集型的任务,这些任务可能还会阻塞其他活动线程,而且,处理异常,失败,线程任务之间的同步都比较困难而且容易出错。程序员需要不断地询问一个线程的运算结果(在Java中以Future<T>表示,T表示运算结果的类型)是否可用。我们来考虑一下下面两个例子:

有三个线程t1, t2, t3,他们的运算结果分别为f1, f2, f3。 有一个线程t4依赖于这三个线程的运行结果,而且每个线程都有有可能执行失败。 我们该如何编写线程t4的代码?

GUI程序中一次拖动操作中光标的位置就可被表示为Future<List<Position>>, (使用Future是因为这些Position的值是在未来的时间点生成的)。

如果我们希望在第一个Position可用时(拖动时间的开始位置)就能够在这Position所对应的位置画点,而不是等所有的Position都可用是一次性把光标的运行轨迹画出来。即我们希望程序能够尽快对输入进行响应。

即程序要及时,非阻塞地对输入响应。

上面的两个例子就是reactive programming尝试解决的问题,而Reactive Extension做为这个问题的答案,应运而生了。

Reactive Extension

Reactive Extension 这个概念最早出现在.net社区的Rx.net,一个提供处理异步事件的程序库,其核心概念是Observable,表示有限或者无限多个现在或者将来到达的事件。Observable提供了onNextonErroronCompleted供开发者定制新元素到达,出现错误,或者流结束时的程序的行为。 并提供了List上类似的操作,如mapfilterreduce,大大降低了异步事件编程的复杂度。

因为这些概念是如此的强大,以至于很多编程语言,如javarubyjavascript很快就有了各自的reactvie extension

关于reactive extension的技术细节可以在我的这篇博客里找到。这个视频详细地介绍了为什么需要reactive extension,以及reactive extension的是如何被发明出来的。

Reactive Manifesto

Wikipedia上对reactive programming解释如下:

reactive programming is a programming paradigm oriented around data flows and the propagation of change.

举个例子,在命令式编程下,表达式a = b + c,a的值在这个表达式执行完毕之后就是确定的,即使bc的值发生变化,a的值也不会改变。然而在响应式编程的语境下,a的值与bc的值是绑定的,上述表达式其实建立的是abc之间的一种依赖,a的值会随bc的变化而变化。

我们称之为能够响应输入变化的 事件(event)

然而现在来看,上述定义已经不能囊括reactive programming的含义了。随着软件系统的非功能需求要求越来越高,reactive已不仅局限于响应 事件(event)__的传递,也表示程序能够响应 __负载(load),系统运行时出现的 错误(failure)

发布于2014年9月份的Reactive Manifesto以宣言的形式提供了能够满足这些需求的软件系统架构设计的指导原则。

Reactive Architecture

在笔者看来,reactive programming可以从语言和架构两种层面上来理解, 近年来层出不穷的各种语言的 reactive extention 就是语言层面的代表,而在架构层面上,也有遵循了reactive manifesto的类库(如akka)出现,笔者暂且称之为 reactive architecture。 在后续的文章中,笔者将会带着大家理解一个 reactive architecture 是如何做到reactive的。

人去做什么,是因为心底有爱惜

回头看看最近几年写的博客,绝大部分都是技术相关的内容,然而再往前几年, 博客里更多的是对生活的回顾,憧憬,现在读来格外温暖。今天就扯一段与技术无关的小感想。

自从搬回西安,就一直对西安的空气,环境多有抱怨,然而如今已不似当年无牵无挂,可以四处奔波,想要逃避已不太容易。 年复一年,和亲爱的她走到了一起,也迎来了小叶子,生活状态又原来的二人世界变成五口之家,确实耗费了些许时间去适应,接受。 每年,除了夏天,雾霾几乎隔个几年就来,所能做得,就是给家人和自己购买号称能够防止PM2.5的3M口罩,同时劝说他们接受PM2.5对人体危害甚大的事实。 然而,每当此时,天空,地面都是灰白的颜色,心情也是。多少次想过逃离,就会有多少次被现实拉回。逃避,不会解决问题。

直到今天看了柴静的这个穹顶之下专辑, 才发现,这个看似柔弱的女子有着怎样一颗强大的内心。不去抱怨,逃避,去直面它,行动。

最喜欢她的那句话,“人去做什么,是因为心底有爱惜”。

以此勉励自己。

Java 8 collectors

Java 8 has been released quite a long time, since I come to ruby in 2011, I haven’t work with java. Even I heard that there’re few cool features come out with Java 8, like lambda, stream collection, functional interface, new date api. none of them can attract me, given ruby ship all these features since the date of birth.

While recently I do try to solve a problem with Java, and I found that Java has changed a lot compared with my impression on it. in this post, I’m going to talk about the collectors shipped with Java 8. also I’ll try to give collector example in scala.

The Problem

Reduction, (aka. iterating on a collection, apply reduction computation on each elements, and produce a single result or a smaller collection) is a common problem in any programming language.

Let’s look at a specific example:

Given an collection of employees, grouping these employees by age produce a map between age and list of employees.

Here is the class definition of Employee:

class Employee{
    private int age;
    private String name;

    Employee(String name, int age) {
        this.age = age;
        this.name = name;
    }
}

The Solution

a simple implementation could be:

List<Employee> employees = Arrays.asList(new Employee("A", 18), new Employee("B", 20), new Employee("C", 18));
Map<Integer, List<Employee>> byAge = new HashMap<>();
for(Employee employee: employees) {
  List<Employee> employeeByAge = byAge.get(employee.getAge())
  if(employeeByAge = null) {
    employeeByAge = new ArrayList<>();
  }
  employeeByAge.add(employee);
  byAge.put(employee.getAge(), employeeByAge);
}

if you have been working with Java for quite a long time, you may be sick to write these code. you must have write code in this structure for quite a long time. to demonstrate the duplication of this structure, let’s rewrite the above code to this format:

Collection<T> collections = ...
Collection<R> results = new SomeCollection;

for(T t: in collections) {
  R r = results.get(t)
  if (r == null) {
    r = new R
  }
  r.add(t)
}

all these code did is to collect some information for give collection and apply reduction on the items in this collection and produce a result container.

with Java 8s collector interface, you can simply do

List<Employee> employees = Arrays.asList(new Employee("A", 18), new Employee("B", 20), new Employee("C", 18));
Map<Integer, List<Employee>> byAge = employees.stream().collect(Collectors.groupingBy((e) -> e.age));

so what is the magic behind it:

The magic is behind the Collector<T, A, R> interface:

Collectors.groupingBy is a built in collector which acceptting a function with type T -> K which can be group against(in this case, employee.age()). it will produce a result with type Map<K, List<T>>(aka, the result type R).

Here is the official definition of Collector from its api document:

A mutable reduction operation that accumulates input elements into a mutable result container, optionally transforming the accumulated result into a final representation after all input elements have been processed. Reduction operations can be performed either sequentially or in parallel.

You see from the document, Collector take three type parameters T, A and R, where T is the type of element inside the collection, A is an intermediate type which could be used to do the mutable reduction, R is the type of result.

There four functions in this interface which work together to accumulate entries into a mutable result container.

  • supplier(), with type () -> A - creation of a new result container.
  • accumulator(), with type(A, T) -> A - incorprating a new element into the result container.
  • combiner(), with type (A, A) -> A - combing two result container into one.
  • finisher(), with type A -> R - a optional final transformation on the result container to get the result. the optional means that in some scenarios, A and R could be same, so the finisher function is not required. but in some other cases, when A and R are different, this function is required to get the final result.

In the previous example, the type of result of Collector.groupingBy is Collector<Employee, ?, Map<Integer, List<Employee>>.

let’s extend this problem a little bit: how about grouping employees by age range(e.g. 20-29 as a group, 30-39 as a group) this time, you can not find any buitin collector which is suitable to solve this problem, now, you will need a customised collector to do the reduction.

(this blog post)[http://www.nurkiewicz.com/2014/07/introduction-to-writing-custom.html] is a fairly good guide for how to create you own Collector implementation.

Collector in Scala

After found this useful pattern, I wonder if scala’s powerful collection system support this computation. Unfortunately, I can not found a similar api from any collection type. But I do found that we can easily build our own version of collector based on scala.collection.mutable.Builder.

scala.collection.mutable.Builder play the same role with accumulator (the A) in java Collector. Let’s see the following example of how we implement the collect method in scala and how we use it to solve the word count problem:

import scala.collection.mutable.Builder

//`T` is the type of element in collection, `Builder` is the type of intermediate result container, `R` is the type of reduction result.
def collect[T, R] (ts: List[T], a: Builder[T, R]):R = {
  ts.foreach (a += _) //invoke the accumulator function
  a.result //invoke the finisher function
}

//word counting builder
//from Seq[String] to Map[String, Int]
class CounterBuilder[String, Map[String, Int]] extends Builder[String, Map[String, Int]] {

  var counter = scala.collection.mutable.Map[String, Int]()

  def += (el: String) = {
    counter.get(el)  match {
      case None => counter += el -> 1
      case Some(count) => counter += el -> (count + 1)
    }
    this
  }

  def clear = counter = scala.collection.mutable.Map[String, Int]()

  def result: Map[String, Int] = counter.toMap[String, Int].asInstanceOf[Map[String, Int]]
}

and here is the code to use the CounterBuilder

//use case
val xs = List("a", "a", "a", "c", "d")

//the supplier function
val builder = new CounterBuilder
val res = collect[String, Map[String, Int]] (xs, new CounterBuilder)
//output: Map(d -> 1, a -> 3, c -> 1)

Conclusion

  • Java 8s Collector Api provide a better way to encapsulate reduction computation - not only some built in reduction(e.g. max, min, sum, average), but also customized reduction(via customised collector), Collector is designed to be composed, which means these reduction logic are much easier to be reused.
  • Scala dose not have native support for customised mutable reduction, but based on scala’s powerfull collection system, we can create our own version.

References

build rest api in scala

micro service getting more attractions in the past few years, more and more orgnizations is moving to this area. many framework were created to make building micro service easier, in java world, we have DropWizard, in ruby world, we have Rails-API, Grape and Lotus. but how the scala guys solve this problem?

in my previous post, I demostrated how to build api with spray, in this post I’ll try to use another framework - unfilter and json4s to build a similar api.

The problems:

you want to build a api which can accept http request and send response in json format.

to achieve this goal, we need:

  • a server listening on a local port, wrap received http request and pass it to the application.
  • a request mapping mechanism where you can define how http request should be handled.
  • a tool which can covert between plan scala object and json.

Unfilter have it’s answer for the first two questions:

Request mapping

in unfilter, the request mapping mechanism was called plan and intent, a accurat name, right?

from unfilter’s document:

  • An intent is a partial function for matching requests.
  • A plan binds an intent to a particular server interface.

Here is the code:

object PushApi extends Plan {
  def intent = {
    case req @ POST(Path(Seg("devices" :: Nil))) => {
      //code omitted
    }
    case req @ POST(Path(Seg("notifications" :: Nil))) => {
      //code omitted
    }
  }
}

Server

Unfilter can be run on top of jetty / netty, to do so, just run your plan with correponded server:

unfiltered.netty.Server.http(8080).plan(PushApi).run()
//or
unfiltered.jetty.Server.http(8080).plan(PushApi).run()

Json serialization/deserialization

The biggest difference between spray-json and json4s is serialization/deserialization done implicitly or explicitly.

in spray-json, you can get serialization/deserialization(aka. marshalling) implicitly if you defined your own JsonFormat, the marshalling mechanism will do their job while extracting information from request and send response implicitly. it’s cool when everything works fine, but if something went wrong it’s really hard to debug. e.g. this one took me years to find out.

with json4s, you have to serialize/deserialize your object explicitly, but the api is very neat and easy to use.

peronally, I really like json4s’s solution.

Conclusion

Compared with spray, unfilter focused on request mapping and dispatching, json4s focused on json serialization/deserialization, they both did a very good job. I highly recommand you to try it in your next scala api project.

The full example can be found here