RxJava初探

RxJava 是来自于NetflixReactive Extension的java版实现。

Reactive Extenstion所要解决的一个问题是对多个异步任务的组合,依赖所带来的编码复杂性的问题,我们先从一个例子看起:

异步任务的依赖

假设我们的程序需要从五个micro-service获取数据,这些micro-services之间存在依赖关系,我们来看一下第一版实现:

note: 本文我使用了scala来做为RxJava的客户端代码,只是因为scala中支持lambda。关于rx-scala的更新信息,参阅这里

 val fa = callToRemoteServiceA();
 val fb = callToRemoteServiceB();

 val fc = callToRemoteServiceC(fa.get());
 val fd = callToRemoteServiceD(fb.get());
 val fe = callToRemoteServiceE(fb.get());

fa, fb, fc, fd, fe之间的依赖关系如下: Micro Services Dependencies</img>

由于这些future之前有依赖关系(fa,fb的执行结果是fc,fd,fe的输入),我们必须调用fa.get(), fb.get(), 而这会阻塞主线程的执行。

那么这种阻塞能否避免呢?当然可以,我们可以分别新起一个线程来创建fc, fd, fe。来看第二版实现:

  val fa = callToRemoteServiceA();
  val fb = callToRemoteServiceB();

  val fc = executor.submit(new Callable[String]() {
    override def call(): String = callToRemoteServiceC(fa.get).get
  })
  val fd = executor.submit(new Callable[String]() {
    override def call(): String = callToRemoteServiceD(fb.get).get
  })
  val fe = executor.submit(new Callable[String]() {
    override def call(): String = callToRemoteServiceE(fb.get).get
  })

在这个实现里,我们分别启动了一个线程来等待fa,fb的执行结果,然后再执行fc, fd, fe, 这样,主线程就不会被阻塞,然而,这却大大地增加代码的复杂度。 那么,能否不要等待future的执行结果(poll),而是等到Future执行完成的时候被通知到(push),Reactive Extenstion的Observable的出现就解决了这样的问题,我们先来看一下实现:

val oa = from(callToRemoteServiceA)
val ob = from(callToRemoteServiceB())

val oc = oa.flatMap { res => from(callToRemoteServiceC(res)) }
val od = ob.flatMap { res => from(callToRemoteServiceD(res))}
val oe = ob.flatMap { res => from(callToRemoteServiceE(res))}

在这个版本的实现中,对ServiceAServiceB的调用被包装为一个Observable对象, 然后使用flatMap来把micro services 之间的依赖串接起来:

ServiceC的调用依赖于对ServiceA的调用,因此,我们在oa上调用flatMap方法, flatMap接受一个函数,参数为Observiable的每个元素,返回值为一个新的Observable。 这里我们传入的是:

res => from(callToRemoteServiceC(res))

就是对于oa的每个元素,用其做为参数调用ServiceC,并且包装成一个Observable。对ServiceD, ServiceE的调用也是类似的。

这个方案与上面方案最大的不同是,上面的例子中,我们需要不断地询问对ServiceA的调用是否完成, 若调用完成,再进行下面的动作(发起对ServiceC的调用)。 即便启动了新的线程以便不block在主线程,这个新的线程还是会被block住。 而在这个方案中,我们只需要定义好对ServiceA的调用完成后,需要做那些事情(发起对ServiceC的调用),代码也简洁了很多。

如果有个ServiceF依赖于ServiceE的执行结果,我们也可以很容易地通过flatMap来表述他们的依赖关系:

val of = oe.flatMap { res  => from(callToRemoteServiceF(res)) }

Reactive Extension中的概念

Observable

__Observable__用于表示一个可被消费的数据集合(data provider),它后面的数据的产生机制或者是同步的,或者是异步的,这都不重要的,最重要是它提供了下面的能力:

  • Observei可以通过Observable的subscribe向其注册。

  • 当Observable中有数据产生时,调用Observer的onNext方法通知有新数据到来。

  • 当Observable数据发送完毕时,调用Observer的onComplete方法通知数据发送完毕。

  • 当Observable内部出现错误时,调用Observer的onError方法通知有错误需要处理。

Observable 之于 Iterable

Observable 做为一个数据(事件)集合的抽象,也支持类似于Iterable上的各种,转换、组合操作,如mapfiltermerge等等,我们还是先从一个例子来看: 假设有一个GUI应用,我们使用一个Observable actions 来表示用户在界面上的操作(可能的值有click, drag, drop),

//这里我用interval模拟这些操作是异步的
val actionList = List("click", "drag", "drop", "click", "click")
val actions = interval(1 seconds).map(_.toInt).take(5).map(actionList(_))

有一个收集用户点击事件并打印日志的需求,我们该怎么实现呢?

actions.filter(_ == "click").subscribe(println("clicked at " + new Date()))

是不是和Iterable的操作非常相像? 实际上Observable和Iterable在很多方面都很相似:

  • 都是数据的容器。
  • 都可以对其应用一个映射函数(map, flatMap),从而得到一个新的Iterable/Observable。
  • 都可以对其中中的元素进行过滤,从而得到一个元素数量更少的Iterable/Observable。

Observable和Iterable最大的不同点:

  Observable对其消费者push数据,而Iterable没有这种能力。
  Iterable的消费值只能通过`pull`的方式获取数据。 
  而这种`push`的能力在Reactive Programming世界中极其重要。
Observer 之于 Observer Pattern

__Observer__的概念来自于设计模式中的Observer模式,并对其行为进行了扩展。设计模式中的Observer模式定义如下:

  有一个subject对象,它维护一个observer对象列表,当它的状态发生变化时,它会逐个通知这些observer。
  这里的Observer只有对外暴露一个行为:update, 当subject的状态发生变化时,
  subject通过这个update接口通知observer。

RxJava中的Obsever的这个update接口叫做onNext, 同时在此基础之上添加了两个行为:onCompletedonError,以应对Observable的这种特殊的data providersubject的需求:

  • onCompleted, 当Observable数据发送完毕后,调用此接口通知Observer。
  • onError,当Observable产生数据过程中出现错误时,调用此借口通知Observer。

总结

最后,引用RxJava中对Observable的解释:Observable填补了在异步编程领域中访问包含多个元素的异步序列的空白, 他们的关系正如下表所示:

  single item multiple items
synchronous T getData() Iterable<T> getData()
asynchronous Future<T> getData() Observable<T> getData()

RxJava极大地改进了java异步编程的体验,如果你受够了block Future,以及弱爆了容错机制,体验一下rxjava吧。

上面的示例代码在这里都可以找到。

更多资料,参考RxJava的wiki, 我只能帮到你这儿了 :)

Monoid, Functor, Applicative and Monad

Monoid

Given a Monoid trait Semigroup

trait Semigroup[M] {
  def append(a: M, b: M): M
  val zero: M
}

the following should hold:

append(a, append(b, c)) === append(append(a, b), c)
append(a, zero) = a
append(zero, a) = a

Monoid examples:

  • Int with + and 0
  • Int with * and 1
  • Boolean with || and false
  • A => A with compose and identity
  • List[A] with ++ and Nil
  • String with + and ""

Functor

Concept

Functor is a type class that defines how to apply a function to a value wrapped in a context(T).List, Option, Ethier, Try both are functor.

trait Functor[T[_], A] {
  def fmap[B](f: A => B): Functor[T, B]

  def id: T[A]
}

the Functor takes two type parameters, T[_] which is a generic type, and a type A one concrete example is:

//List as T, A as A
case class ListFunctor[A](val id: A, xs: List[A]) extends Functor[List, A] {
  def fmap[B](f: A => B): List[B] = ListFunctor(xs.map(f))
}

Functor laws:

  • fmap id = id

    if we map the id function over a functor, the functor that we get back should be the same as the original functor

  • for any functor F, the following should hold: fmap (f . g) F = fmap f (fmap g F)

    composing two functions and then mapping the resulting function over a functor should be the same as first mapping one function over the functor and then mapping the other one

Function is Functor:

Function composition:

Mapping a Function over a Function will produce a new Function(function composition), just like mapping a function over a List will produce a List, mapping a function over a Option will produce a Option.

Lifting:

Given a map function with type (A => B) => F[A] => F[B](F is a functor, it could be List, Option, or Ethier), we can think the map as a function which take a function (with type A => B) as parameter and return a new function just like the old one(with type F[A] => F[B]).

Applicative

Concept

Applicative is a type class that defines how to apply a function tf wrapped in a context T to a value wrapped in a context T.

trait Applicative[T[_], A] extends Functor[T, A] {
  def apply[B](f: T[A => B]): Applicative[T, B]
}

Monad

Concept

Monad is a type class Monad[T[_], A] that defines how to apply a function that returns a wrapped value A => T[B] to a wrapped value T[A].

trait Monad[T[_], A] extends Monoid[T, A] with Applicative[T, A] {
  def flatMap[B](f: A => T[B]): Monad[T, B]
}

Monad law:

  • Left identity

    Given a value x and a function f, the following should hold:

unit(x) flatMap f = f(x)
  • Right identity

Given a monad m, the following should hold:

m flatMap unit = m
  • Composition

Given a monad m and two functions f and g, the following should hold:

m flatMap f flatMap g == m flatMap g flatMap f

A concrete Monad example

case class ListMonad[A](val list: List[A])  extends Monad[List, A] {
  //defined in Monoid
  override def append(values: List[A]): ListMonad[A] = ListMonad(list ++ values)

  //defined in Monoid
  override def id: List[A] = Nil

  //defined in Functor
  override def fmap[B](f: (A) => B): ListMonad[B] = ListMonad(list.map(f))

  //defined in Applicative
  override def apply[B](tf: List[(A) => B]): ListMonad[B] = ListMonad(list.map(tf.head))

  //defined in Monad
  override def flatMap[B](f: (A) => List[B]): ListMonad[B] = ListMonad(list.flatMap(f))
}

Behind $.ajax

相信大家对于下面这段代码都不会太陌生:

doSomethingBefore();

$.ajax({
  url: "test.html",
  success: function() {
	doSomethingWhenSucceed();
  }
});

doSomethingAfter();

上述代码的执行顺序是:

  1. 主程序首先调用doSomethingBefore
  2. 其次,主程序发起ajax调用,接下来继续执行doSomethingAfter,主程序结束。
  3. 待ajax请求得到响应并且响应成功时,doSomethingWhenSucced开始执行。

了解了javascript异步编程模型的人们这样的执行结果也不会觉得奇怪。那么,这段代码背后,到底发生了什么,我将在这里分享一下我的理解。

被jQuery惯坏了的程序员们或许已经忘记了使用原生的javascript api发起ajax调用了,我们先通过一个简单的例子回忆一下:

var xmlhttp = new XMLHttpRequest();

xmlhttp.onreadystatechange = function() {
  if (xmlhttp.readyState === 4){
	console.log(xmlhttp.responseText);
  }
};

xmlhttp.open("GET","https://api.github.com/users/tater/events",true);
xmlhttp.send();

XMLHttpRequest是ajax技术中最重要的一个概念,它是浏览器暴露给浏览器脚本语言(例如javascript)的一个接口,浏览器脚本语言(例如javascript)就可以通过这个API发起HTTP,HTTPS请求,并获取响应。

当我们需要发起一个ajax调用通常经过以下几步:

  1. 创建一个XMLHttpRequest对象
  2. 注册该XMLHttpRequest对象的onreadystatechange事件侦听器,即http请求成果后需要执行的动作。
  3. 使用这个对象的open方法发起异步http调用
  4. 浏览器发起http请求,并同时更新该XMLHttpRequest对象的readyState,触发readystatechange事件,(即此readystatechange事件进入javascript事件队列)。
  5. javascript引擎线程轮询事件队列时,遇到readystatechange事件,调用该事件的侦听器函数,完成此次调用。

具体执行步骤参见下图:

Ajax workflow 上述步骤也体现了ajax如何在javascript单线程执行模型下工作的,关于javascript单线程执行的细节, 我的前同事四火最近写了一篇关于javascript单线程执行的文章,详细介绍了javascript中单线程执行任务的原理。

Create a scala project with sbt

在ruby的世界里,习惯了使用bundle来管理依赖,Rake来实现自动化构建,那么在scala的世界里,如果遇到一个好玩的开源库,例如akka,如何很快的把玩一下呢?以前我的做法会是:

  • 打开InteliJ,创建一个scala项目
  • 下载开源包到本地
  • 写程序

有没有更便捷,轻量级一点的方式呢?答案是可以的,这里我就简单介绍一下使用sbt管理依赖,自动化构建scala程序。

  • 安装sbt.
  • 安装sbt的idea插件
  • 创建一个工程目录
  • 在这个工程目录下新建一个名为build.sbt的文件,包含以下内容。
name := "<your project name>"

version := "0.1"

scalaVersion := "2.11-M3"

libraryDependencies += "org.scalatest" % "scalatest_2.10" % "1.9.1" % "test"

这个文件中是scala语法,指定了项目名称,版本,依赖的scala的版本,以及项目所需要的依赖,例如我想要使用akka,那么就在该文件中加入以下内容

libraryDependencies += "com.typesafe.akka" % "akka-actor_2.11.0-M3" % "2.2.0"
  • 在工程目录下运行sbt update下载项目依赖包。
  • 运行sbt gen-idea生成idea工程文件。
  • 打开InteliJ导入刚刚生成的工程。

好,开始写scala程序吧。

由于我经常创建小的scala项目,于是写了一个ruby程序来生成scala项目

Ruby clean code之block and instance eval

引子

自从来到ruby世界,我就被ruby那自由的语法、优雅的对象模型、漂亮的dsl深深地迷住了,了解更多的ruby特性,能够帮你实现更漂亮,流畅的api。在这篇文章中,我将以一个例子来演示如何使用ruby的block和instance_eval实现更具表现力的api。

需求

这个例子是一个来源于真实项目需求,为了演示方便,我对其做了一些简化。程序的输入是一个格式固定的json字符串,输出是从这个json中获取到一些属性值创建出来的一个给定类型的对象。然而,不同于以往的json和对象之间的序列化,反序列化,这里的从json字符串中的值与对象属性之间的对应关系有一定的逻辑。 json中的值和对象值对应关系有如下几种:

  1. json属性和对象属性直接对应。
  2. json属性和对象属性直接对应,当json中没有该属性时,使用给定默认值。
  3. 对象的属性的类型不是普通类型,当json中有对应属性的值时,需要根据json中的值创建一个对应的类型对象。
  4. 等等

我们先来看下最初的实现版本:

注:这里的json不是一个字符串对象,而是经过JSON.parse处理后得到一个嵌套的hash,下同。

class Post
  attr_accessor :author_name, :date, :tags

  def initialize(json)
    init_author_name(json)
    init_date(json)
    init_tags(json)
    #init_xxx...
  end
  
  # omit some code here...
  
  # json中的title和对应上面的第一种情形
  def init_author_name(json)
    @author_name = json['author']['name']
  end

  # 对应上面的第二种情形
  def init_date(json)
    @date = json['date'].blank? ? "1970-01-01" : json['date']
  end

  # 对应上面的第三种情形
  def init_tags(json)
    @tags = []
    unless json['tags'].nil?
      json['tags'].each do |tag|
        @tags.append(Tag.new(tag))
      end
    end
  end
end

Bad smell

看到这样的代码,你发现什么bad smell了吗?重复代码?不像,但是那么多的init_xxx方法看起来就是有那么点不自然。

在我看来,这份代码有两个问题:

第一,从json到Post对象的转换职责,不应该是Post类的职责,这份代码违反了单一职责原则

第二,由于无法很好地将json中的值和对象值对应关系规则建模,导致我们不得不创建多个init_xxx方法,然后在在initialize方法中逐一调用这些方法。然而在这些init_xxx方法之间,存在着结构化重复

如何改进?

首先,要分离职责,把json到Post对象的转换职责放到一个新类PostBuilder中。

其次,要对对应关系进行抽象。

改进

我们在来分析一下json中的值和对象值对应关系规则,还是有规律可循的,对应关系都由三部分组成:json属性对象属性名转换规则(默认没有转换规则)。其中,通过jsonpath来标识json属性,通过block来表示转换规则, 我们可以建立一个MapingRule类来对此关系进行建模。

由此我们得到如下代码:

class Post
  attr_accessor :title, :date, :tags
end

class MappingRule
  attr_accessor, :json_path, :attr_name, :converter

  def apply(obj, json)
    value = JSONPath.new(@json_path).on(json)
    unless value.nil?
      obj.send("#{@field_name}=", @converter.call(value))
    end
  end
end

class PostBuilder
  def initialize
    @rules = []
  end
  
  def rule(json_path, attr_name, converter)
    @rules << MappingRule.new(json_path, attr_name, converter)
  end
  
  def build json
      post = Post.new
      @rules.each do |rule|
      	rule.apply(post, json)
      end
  end
end

# 创建builder
builder = PostBuilder.new
buider.rule("author name", :author_name)
buider.rule("date", :date, -> (date) { date.nil? ? "1970-01-01" : date} )
buider.rule("tags", :tags, -> (tags) { tags.map {|tag| Tag.new(tag)} })

# 使用builder从json创建对象

post = buidler.build({"date" => "2013-09-10", "tags" => ["music", "IT"] })

回顾

与最初版本相比,我们引入了jsonpath和block来对转换规则进行建模(创建了MappingRule类),在PostBuilder#build中循环应用各个rule完成对象的创建,消除了多个init_xxx的重复。至此,代码已经达到一个令人满意的状态。然而,能否让我们的PostBuilder的接口更加漂亮些?

再改进,更具表达力的api

我们再来看下PostBuilder的使用场景:

  1. 创建一个PostBuilder对象。
  2. 给这个对象增加一些转换规则。
  3. 使用这个对象从json创建对象。

因此,可以说,在一个PostBuilder对象被添加规则之前,它是不完整的,是不可用的,即第一二步应该是一个原子操作,我们可以把initialize变为private方法,增加一个config类方法,这个方法可以接受一个block,在此block中对builder增加规则,在这个方法中创建一个builder实例,同时把这个实例传递给block完成buidler的创建。代码如下:

#增加一个config类方法
class PostBuilder
  def self.config
    builder = PostBuilder.new
	yield(builder) if block_given?
	builder
  end
  
  #...
  private
  def initialize
  #...
  end
end

#创建builder
builder = PostBuilder.config do |builder|
    buider.rule("author name", :author_name)
    buider.rule("date", :date, -> (date) { date.nil? ? "1970-01-01" : date} )
    buider.rule("tags", :tags, -> (tags) { tags.map {|tag| Tag.new(tag)} })
end

#使用builder
post = buidler.build({"date" => "2013-09-10", "tags" => ["music", "IT"] })

再改进,更简洁的api

至此,这个PostBuilder提供的api已经非常干净了,然而,这个api还是有改进空间的。在block中builder这个单词出现在每个增加规则的地方。有没有办法把这个重复也给消除掉呢?答案是可以的,instance_eval隆重登场了。对PostBuilder.config方法做如下修改:

  def self.config(&block)
    builder = PostBuilder.new
	builder.instance_eval(block)
	builder
  end

那么,创建builder的代码就简化为:

builder = PostBuilder.config do
    rule("author name", :author_name)
    rule("date", :date, -> (date) { date.nil? ? "1970-01-01" : date} )
    rule("tags", :tags, -> (tags) { tags.map {|tag| Tag.new(tag)} })
end

PostBuilder.config中使用instance_eval对block进行evaluate,相当于在新创建的builder上执行block中的代码,同样能达到对builder增加规则的效果。

使用instance_eval能够使代码变得更加简洁,然而随之而来的风险是,你也给了你的api调用者一个在这个新建对象上执行任意代码的机会。因此,在简洁性和风险之间,你需要做一个权衡。

再抽象

再回头看看PostBuilder,只需些许改动,我们就能从json创建任意类型的对象,于是我们得到一个InstanceBuilder类,如下:

post_builder = InstaneBuilder.config do
    instane_class Post
    rule("author name", :author_name)
    rule("date", :date, -> (date) { date.nil? ? "1970-01-01" : date} )
    rule("tags", :tags, -> (tags) { tags.map {|tag| Tag.new(tag)} })
end

你可以试着实现一个这个InstaneBuilder#instane_class方法。

结语

通观上面的例子,我们通过使用ruby的block和instance_eval,把一个复杂丑陋的代码变得干净,层次清晰,同时,更加容易扩展。 在这里,我抛出自己对编写代码的一点想法,供各位参考:

  1. 在开始编写实现代码前,先考虑一下如何提供一套干净的,更具表达力的api,让api调用者喜欢使用你的api(sinatra做了一个很好的榜样)。
  2. 恰当地使用block,instance_eval 能够很容易的构建一个internal dsl。

Reference

想了解更多关于blockinstance_eval, internal dsl可以参考如下两篇文章:

How do I build DSLs with yield and instance_eval?

Creating a ruby dsl