也谈响应式编程

太长不读;

本文将会围绕reactive extension介绍reactive programming的起源,其要解决的问题。

编程范式的演进

最近几年,reactive programming这个词语的热度迅速提升,下面的 google trends的这个图表很能说明问题。

自从高级编程语言被发明以来,各种编程范式的编程语言层出不穷,命令式编程(如C) 面向对象编程(如Java,Ruby),函数式编程(如Clojure, Scala,Haskell)都曾经或者正在软件开发领域占有一席之地。

面向对象编程

上世纪九十年代前,命令式编程仍然在软件开发领域占有主导地位。随着软件规模的不断增大,面向对象编程以其封装性,可重用性受到开发者和组织的青睐。

进入多核时代

随着摩尔定律的失效,单核CPU的计算能力几乎达到了极限,CPU进入了多核时代,程序员转而通过并发编程,分布式系统来应对越来越复杂的计算任务。

然而并发编程并不是银弹,做为一种基于共享内存的并发编程,多线程编程有常见的死锁线程饥饿race condition等问题,而且多线程bug的以其难以重现定位臭名昭著。

函数式编程的兴起

近年来逐渐火爆的functional programming以其提倡的:

  • 函数是编程语言的一等公民(function as first-class citizen)
  • 不可变量(immutable variable)
  • 无副作用的函数(no side-effect/reference transparency)
  • 可组合的函数(composable functions)

顺利地解决了因可变量mutabble variable被多个线程共享,修改等而导致可能的多线程的bug。

并发编程的痛点仍然存在

然而,functional programming就是现代的完美编程范式了么?远远不是。

即使使用了functional programming, 程序员总会需要处理异步任务或者事件,并且总有一些IO或者计算密集型的任务,这些任务可能还会阻塞其他活动线程,而且,处理异常,失败,线程任务之间的同步都比较困难而且容易出错。程序员需要不断地询问一个线程的运算结果(在Java中以Future<T>表示,T表示运算结果的类型)是否可用。我们来考虑一下下面两个例子:

有三个线程t1, t2, t3,他们的运算结果分别为f1, f2, f3。 有一个线程t4依赖于这三个线程的运行结果,而且每个线程都有有可能执行失败。 我们该如何编写线程t4的代码?

GUI程序中一次拖动操作中光标的位置就可被表示为Future<List<Position>>, (使用Future是因为这些Position的值是在未来的时间点生成的)。

如果我们希望在第一个Position可用时(拖动时间的开始位置)就能够在这Position所对应的位置画点,而不是等所有的Position都可用是一次性把光标的运行轨迹画出来。即我们希望程序能够尽快对输入进行响应。

即程序要及时,非阻塞地对输入响应。

上面的两个例子就是reactive programming尝试解决的问题,而Reactive Extension做为这个问题的答案,应运而生了。

Reactive Extension

Reactive Extension 这个概念最早出现在.net社区的Rx.net,一个提供处理异步事件的程序库,其核心概念是Observable,表示有限或者无限多个现在或者将来到达的事件。Observable提供了onNextonErroronCompleted供开发者定制新元素到达,出现错误,或者流结束时的程序的行为。 并提供了List上类似的操作,如mapfilterreduce,大大降低了异步事件编程的复杂度。

因为这些概念是如此的强大,以至于很多编程语言,如javarubyjavascript很快就有了各自的reactvie extension

关于reactive extension的技术细节可以在我的这篇博客里找到。这个视频详细地介绍了为什么需要reactive extension,以及reactive extension的是如何被发明出来的。

Reactive Manifesto

Wikipedia上对reactive programming解释如下:

reactive programming is a programming paradigm oriented around data flows and the propagation of change.

举个例子,在命令式编程下,表达式a = b + c,a的值在这个表达式执行完毕之后就是确定的,即使bc的值发生变化,a的值也不会改变。然而在响应式编程的语境下,a的值与bc的值是绑定的,上述表达式其实建立的是abc之间的一种依赖,a的值会随bc的变化而变化。

我们称之为能够响应输入变化的 事件(event)

然而现在来看,上述定义已经不能囊括reactive programming的含义了。随着软件系统的非功能需求要求越来越高,reactive已不仅局限于响应 事件(event)__的传递,也表示程序能够响应 __负载(load),系统运行时出现的 错误(failure)

发布于2014年9月份的Reactive Manifesto以宣言的形式提供了能够满足这些需求的软件系统架构设计的指导原则。

Reactive Architecture

在笔者看来,reactive programming可以从语言和架构两种层面上来理解, 近年来层出不穷的各种语言的 reactive extention 就是语言层面的代表,而在架构层面上,也有遵循了reactive manifesto的类库(如akka)出现,笔者暂且称之为 reactive architecture。 在后续的文章中,笔者将会带着大家理解一个 reactive architecture 是如何做到reactive的。

人去做什么,是因为心底有爱惜

回头看看最近几年写的博客,绝大部分都是技术相关的内容,然而再往前几年, 博客里更多的是对生活的回顾,憧憬,现在读来格外温暖。今天就扯一段与技术无关的小感想。

自从搬回西安,就一直对西安的空气,环境多有抱怨,然而如今已不似当年无牵无挂,可以四处奔波,想要逃避已不太容易。 年复一年,和亲爱的她走到了一起,也迎来了小叶子,生活状态又原来的二人世界变成五口之家,确实耗费了些许时间去适应,接受。 每年,除了夏天,雾霾几乎隔个几年就来,所能做得,就是给家人和自己购买号称能够防止PM2.5的3M口罩,同时劝说他们接受PM2.5对人体危害甚大的事实。 然而,每当此时,天空,地面都是灰白的颜色,心情也是。多少次想过逃离,就会有多少次被现实拉回。逃避,不会解决问题。

直到今天看了柴静的这个穹顶之下专辑, 才发现,这个看似柔弱的女子有着怎样一颗强大的内心。不去抱怨,逃避,去直面它,行动。

最喜欢她的那句话,“人去做什么,是因为心底有爱惜”。

以此勉励自己。

Java 8 collectors

Java 8 has been released quite a long time, since I come to ruby in 2011, I haven’t work with java. Even I heard that there’re few cool features come out with Java 8, like lambda, stream collection, functional interface, new date api. none of them can attract me, given ruby ship all these features since the date of birth.

While recently I do try to solve a problem with Java, and I found that Java has changed a lot compared with my impression on it. in this post, I’m going to talk about the collectors shipped with Java 8. also I’ll try to give collector example in scala.

The Problem

Reduction, (aka. iterating on a collection, apply reduction computation on each elements, and produce a single result or a smaller collection) is a common problem in any programming language.

Let’s look at a specific example:

Given an collection of employees, grouping these employees by age produce a map between age and list of employees.

Here is the class definition of Employee:

class Employee{
    private int age;
    private String name;

    Employee(String name, int age) {
        this.age = age;
        this.name = name;
    }
}

The Solution

a simple implementation could be:

List<Employee> employees = Arrays.asList(new Employee("A", 18), new Employee("B", 20), new Employee("C", 18));
Map<Integer, List<Employee>> byAge = new HashMap<>();
for(Employee employee: employees) {
  List<Employee> employeeByAge = byAge.get(employee.getAge())
  if(employeeByAge = null) {
    employeeByAge = new ArrayList<>();
  }
  employeeByAge.add(employee);
  byAge.put(employee.getAge(), employeeByAge);
}

if you have been working with Java for quite a long time, you may be sick to write these code. you must have write code in this structure for quite a long time. to demonstrate the duplication of this structure, let’s rewrite the above code to this format:

Collection<T> collections = ...
Collection<R> results = new SomeCollection;

for(T t: in collections) {
  R r = results.get(t)
  if (r == null) {
    r = new R
  }
  r.add(t)
}

all these code did is to collect some information for give collection and apply reduction on the items in this collection and produce a result container.

with Java 8s collector interface, you can simply do

List<Employee> employees = Arrays.asList(new Employee("A", 18), new Employee("B", 20), new Employee("C", 18));
Map<Integer, List<Employee>> byAge = employees.stream().collect(Collectors.groupingBy((e) -> e.age));

so what is the magic behind it:

The magic is behind the Collector<T, A, R> interface:

Collectors.groupingBy is a built in collector which acceptting a function with type T -> K which can be group against(in this case, employee.age()). it will produce a result with type Map<K, List<T>>(aka, the result type R).

Here is the official definition of Collector from its api document:

A mutable reduction operation that accumulates input elements into a mutable result container, optionally transforming the accumulated result into a final representation after all input elements have been processed. Reduction operations can be performed either sequentially or in parallel.

You see from the document, Collector take three type parameters T, A and R, where T is the type of element inside the collection, A is an intermediate type which could be used to do the mutable reduction, R is the type of result.

There four functions in this interface which work together to accumulate entries into a mutable result container.

  • supplier(), with type () -> A - creation of a new result container.
  • accumulator(), with type(A, T) -> A - incorprating a new element into the result container.
  • combiner(), with type (A, A) -> A - combing two result container into one.
  • finisher(), with type A -> R - a optional final transformation on the result container to get the result. the optional means that in some scenarios, A and R could be same, so the finisher function is not required. but in some other cases, when A and R are different, this function is required to get the final result.

In the previous example, the type of result of Collector.groupingBy is Collector<Employee, ?, Map<Integer, List<Employee>>.

let’s extend this problem a little bit: how about grouping employees by age range(e.g. 20-29 as a group, 30-39 as a group) this time, you can not find any buitin collector which is suitable to solve this problem, now, you will need a customised collector to do the reduction.

(this blog post)[http://www.nurkiewicz.com/2014/07/introduction-to-writing-custom.html] is a fairly good guide for how to create you own Collector implementation.

Collector in Scala

After found this useful pattern, I wonder if scala’s powerful collection system support this computation. Unfortunately, I can not found a similar api from any collection type. But I do found that we can easily build our own version of collector based on scala.collection.mutable.Builder.

scala.collection.mutable.Builder play the same role with accumulator (the A) in java Collector. Let’s see the following example of how we implement the collect method in scala and how we use it to solve the word count problem:

import scala.collection.mutable.Builder

//`T` is the type of element in collection, `Builder` is the type of intermediate result container, `R` is the type of reduction result.
def collect[T, R] (ts: List[T], a: Builder[T, R]):R = {
  ts.foreach (a += _) //invoke the accumulator function
  a.result //invoke the finisher function
}

//word counting builder
//from Seq[String] to Map[String, Int]
class CounterBuilder[String, Map[String, Int]] extends Builder[String, Map[String, Int]] {

  var counter = scala.collection.mutable.Map[String, Int]()

  def += (el: String) = {
    counter.get(el)  match {
      case None => counter += el -> 1
      case Some(count) => counter += el -> (count + 1)
    }
    this
  }

  def clear = counter = scala.collection.mutable.Map[String, Int]()

  def result: Map[String, Int] = counter.toMap[String, Int].asInstanceOf[Map[String, Int]]
}

and here is the code to use the CounterBuilder

//use case
val xs = List("a", "a", "a", "c", "d")

//the supplier function
val builder = new CounterBuilder
val res = collect[String, Map[String, Int]] (xs, new CounterBuilder)
//output: Map(d -> 1, a -> 3, c -> 1)

Conclusion

  • Java 8s Collector Api provide a better way to encapsulate reduction computation - not only some built in reduction(e.g. max, min, sum, average), but also customized reduction(via customised collector), Collector is designed to be composed, which means these reduction logic are much easier to be reused.
  • Scala dose not have native support for customised mutable reduction, but based on scala’s powerfull collection system, we can create our own version.

References

build rest api in scala

micro service getting more attractions in the past few years, more and more orgnizations is moving to this area. many framework were created to make building micro service easier, in java world, we have DropWizard, in ruby world, we have Rails-API, Grape and Lotus. but how the scala guys solve this problem?

in my previous post, I demostrated how to build api with spray, in this post I’ll try to use another framework - unfilter and json4s to build a similar api.

The problems:

you want to build a api which can accept http request and send response in json format.

to achieve this goal, we need:

  • a server listening on a local port, wrap received http request and pass it to the application.
  • a request mapping mechanism where you can define how http request should be handled.
  • a tool which can covert between plan scala object and json.

Unfilter have it’s answer for the first two questions:

Request mapping

in unfilter, the request mapping mechanism was called plan and intent, a accurat name, right?

from unfilter’s document:

  • An intent is a partial function for matching requests.
  • A plan binds an intent to a particular server interface.

Here is the code:

object PushApi extends Plan {
  def intent = {
    case req @ POST(Path(Seg("devices" :: Nil))) => {
      //code omitted
    }
    case req @ POST(Path(Seg("notifications" :: Nil))) => {
      //code omitted
    }
  }
}

Server

Unfilter can be run on top of jetty / netty, to do so, just run your plan with correponded server:

unfiltered.netty.Server.http(8080).plan(PushApi).run()
//or
unfiltered.jetty.Server.http(8080).plan(PushApi).run()

Json serialization/deserialization

The biggest difference between spray-json and json4s is serialization/deserialization done implicitly or explicitly.

in spray-json, you can get serialization/deserialization(aka. marshalling) implicitly if you defined your own JsonFormat, the marshalling mechanism will do their job while extracting information from request and send response implicitly. it’s cool when everything works fine, but if something went wrong it’s really hard to debug. e.g. this one took me years to find out.

with json4s, you have to serialize/deserialize your object explicitly, but the api is very neat and easy to use.

peronally, I really like json4s’s solution.

Conclusion

Compared with spray, unfilter focused on request mapping and dispatching, json4s focused on json serialization/deserialization, they both did a very good job. I highly recommand you to try it in your next scala api project.

The full example can be found here

spray based rest api

Spray is an open-source toolkit for building REST/HTTP-based integration layers on top of Scala and Akka. Being asynchronous, actor-based, fast, lightweight, modular and testable it’s a great way to connect your Scala applications to the world.

TLDR;

This post will give you a example of how to use spray to build a REST api.

Problem

you want to build a REST api that support the following operations:

GET     /users/:id
POST    /users

routing

spray-routing gives you a elegant DSL to build routing system, which will accept http request and respond correctly. let’s see the example:

//File: src/scala/com/example/MyService.scala

trait MyService extends HttpService {
  implicit val ec: ExecutionContext = actorRefFactory.dispatcher
  val userRepository: UserRepository

  val myRoute =
    path("users" / Segment) {
      userId => {
        get {
          complete {
            //must import SprayJsonSupport to get a json mashaller
            import spray.httpx.SprayJsonSupport._
            userRepository fetch userId
          }
        }
      }
    }
}

The MyService trait extends spray.routing.HttpService which includes a bunch of convinient mehtods for creating DSL, such as path, get, complete. The variable myRoute defines a set of rules:

  • When a http request matches GET /users/:id, call userRepo.get with extracted userId, and response with the result.
  • When a http request matches POST /users/, call userRepo.save with extracted userData, and response correponded status code.

Note that there is a field defined as val userRepository: UserRepository not be initialized. This field will be implemened in a acturall actor (MyServiceActor in this example). The actural business logic was deleagted into this object.

json support

I haven’t find a JSON library in Java/Scala world which providing as good api as Ruby doses. if you konw one please let me know.

spray-json is the most beautiful one I ever found!

spray-json allows you to convert between:

  • String JSON documents
  • JSON Abstract Syntax Trees (ASTs) with base type JsValue
  • instances of arbitrary Scala types

in this post, we just want to convert between JSON documents and Scala case class. To enable JSON serialization/deserialization for your case class, just define a implicit method which returns a instance of RootJsonFormat[YourCaseClass].

//File: src/scala/com/example/User.scala

package com.example

import spray.json.DefaultJsonProtocol._
import spray.json.RootJsonFormat


case class User(name: String, age: Int)
object User {
  implicit def userJsonFormat: RootJsonFormat[User] = jsonFormat2(User.apply)
}

Put them together

As we metioned before, Spray is built on top of Scala and Akka, to enable MyService to handle http request, we need to create a actor:

class MyServiceActor(userRepo: UserRepository) extends Actor with MyService {

  def actorRefFactory = context

  def receive = runRoute(myRoute)

  override val userRepository: UserRepository = userRepo
}

This actor mixin MyService, in the receive method, call runRoute to handle http request with pre-defined route.

A Tips:

put your business logic in plain scala object instead of actor.

I found that it is really hard to test logic in a actor, so I preferred to implate business logic in a pure scala class, then it is much easier to test it. then inject a instance of this class into a actor.

The full exmaple can be found here.