micro service getting more attractions in the past few years, more and more orgnizations is moving to this area. many framework were created to make building micro service easier, in java world, we have DropWizard, in ruby world, we have Rails-API, Grape and Lotus. but how the scala guys solve this problem?

in my previous post, I demostrated how to build api with spray, in this post I’ll try to use another framework - unfilter and json4s to build a similar api.

The problems:

you want to build a api which can accept http request and send response in json format.

to achieve this goal, we need:

  • a server listening on a local port, wrap received http request and pass it to the application.
  • a request mapping mechanism where you can define how http request should be handled.
  • a tool which can covert between plan scala object and json.

Unfilter have it’s answer for the first two questions:

Request mapping

in unfilter, the request mapping mechanism was called plan and intent, a accurat name, right?

from unfilter’s document:

  • An intent is a partial function for matching requests.
  • A plan binds an intent to a particular server interface.

Here is the code:

object PushApi extends Plan {
  def intent = {
    case req @ POST(Path(Seg("devices" :: Nil))) => {
      //code omitted
    }
    case req @ POST(Path(Seg("notifications" :: Nil))) => {
      //code omitted
    }
  }
}

Server

Unfilter can be run on top of jetty / netty, to do so, just run your plan with correponded server:

unfiltered.netty.Server.http(8080).plan(PushApi).run()
//or
unfiltered.jetty.Server.http(8080).plan(PushApi).run()

####Json serialization/deserialization The biggest difference between spray-json and json4s is serialization/deserialization done implicitly or explicitly.

in spray-json, you can get serialization/deserialization(aka. marshalling) implicitly if you defined your own JsonFormat, the marshalling mechanism will do their job while extracting information from request and send response implicitly. it’s cool when everything works fine, but if something went wrong it’s really hard to debug. e.g. this one took me years to find out.

with json4s, you have to serialize/deserialize your object explicitly, but the api is very neat and easy to use.

peronally, I really like json4s’s solution.

Conclusion

Compared with spray, unfilter focused on request mapping and dispatching, json4s focused on json serialization/deserialization, they both did a very good job. I highly recommand you to try it in your next scala api project.

The full example can be found here

Spray is an open-source toolkit for building REST/HTTP-based integration layers on top of Scala and Akka. Being asynchronous, actor-based, fast, lightweight, modular and testable it’s a great way to connect your Scala applications to the world.

###TLDR; This post will give you a example of how to use spray to build a REST api.

Problem

you want to build a REST api that support the following operations:

GET     /users/:id
POST    /users

###routing spray-routing gives you a elegant DSL to build routing system, which will accept http request and respond correctly. let’s see the example:

//File: src/scala/com/example/MyService.scala

trait MyService extends HttpService {
  implicit val ec: ExecutionContext = actorRefFactory.dispatcher
  val userRepository: UserRepository

  val myRoute =
    path("users" / Segment) {
      userId => {
        get {
          complete {
            //must import SprayJsonSupport to get a json mashaller
            import spray.httpx.SprayJsonSupport._
            userRepository fetch userId
          }
        }
      }
    }
}

The MyService trait extends spray.routing.HttpService which includes a bunch of convinient mehtods for creating DSL, such as path, get, complete. The variable myRoute defines a set of rules:

  • When a http request matches GET /users/:id, call userRepo.get with extracted userId, and response with the result.
  • When a http request matches POST /users/, call userRepo.save with extracted userData, and response correponded status code.

Note that there is a field defined as val userRepository: UserRepository not be initialized. This field will be implemened in a acturall actor (MyServiceActor in this example). The actural business logic was deleagted into this object.

###json support I haven’t find a JSON library in Java/Scala world which providing as good api as Ruby doses. if you konw one please let me know.

spray-json is the most beautiful one I ever found!

spray-json allows you to convert between:

  • String JSON documents
  • JSON Abstract Syntax Trees (ASTs) with base type JsValue
  • instances of arbitrary Scala types

in this post, we just want to convert between JSON documents and Scala case class. To enable JSON serialization/deserialization for your case class, just define a implicit method which returns a instance of RootJsonFormat[YourCaseClass].

//File: src/scala/com/example/User.scala

package com.example

import spray.json.DefaultJsonProtocol._
import spray.json.RootJsonFormat


case class User(name: String, age: Int)
object User {
  implicit def userJsonFormat: RootJsonFormat[User] = jsonFormat2(User.apply)
}

###Put them together As we metioned before, Spray is built on top of Scala and Akka, to enable MyService to handle http request, we need to create a actor:

class MyServiceActor(userRepo: UserRepository) extends Actor with MyService {

  def actorRefFactory = context

  def receive = runRoute(myRoute)

  override val userRepository: UserRepository = userRepo
}

This actor mixin MyService, in the receive method, call runRoute to handle http request with pre-defined route.

###A Tips:

####put your business logic in plain scala object instead of actor. I found that it is really hard to test logic in a actor, so I preferred to implate business logic in a pure scala class, then it is much easier to test it. then inject a instance of this class into a actor.

The full exmaple can be found here.

RxJava 是来自于NetflixReactive Extension的java版实现。

Reactive Extenstion所要解决的一个问题是对多个异步任务的组合,依赖所带来的编码复杂性的问题,我们先从一个例子看起:

异步任务的依赖

假设我们的程序需要从五个micro-service获取数据,这些micro-services之间存在依赖关系,我们来看一下第一版实现:

note: 本文我使用了scala来做为RxJava的客户端代码,只是因为scala中支持lambda。关于rx-scala的更新信息,参阅这里

 val fa = callToRemoteServiceA();
 val fb = callToRemoteServiceB();

 val fc = callToRemoteServiceC(fa.get());
 val fd = callToRemoteServiceD(fb.get());
 val fe = callToRemoteServiceE(fb.get());

fa, fb, fc, fd, fe之间的依赖关系如下: Micro Services Dependencies</img>

由于这些future之前有依赖关系(fa,fb的执行结果是fc,fd,fe的输入),我们必须调用fa.get(), fb.get(), 而这会阻塞主线程的执行。

那么这种阻塞能否避免呢?当然可以,我们可以分别新起一个线程来创建fc, fd, fe。来看第二版实现:

  val fa = callToRemoteServiceA();
  val fb = callToRemoteServiceB();

  val fc = executor.submit(new Callable[String]() {
    override def call(): String = callToRemoteServiceC(fa.get).get
  })
  val fd = executor.submit(new Callable[String]() {
    override def call(): String = callToRemoteServiceD(fb.get).get
  })
  val fe = executor.submit(new Callable[String]() {
    override def call(): String = callToRemoteServiceE(fb.get).get
  })

在这个实现里,我们分别启动了一个线程来等待fa,fb的执行结果,然后再执行fc, fd, fe, 这样,主线程就不会被阻塞,然而,这却大大地增加代码的复杂度。 那么,能否不要等待future的执行结果(poll),而是等到Future执行完成的时候被通知到(push),Reactive Extenstion的Observable的出现就解决了这样的问题,我们先来看一下实现:

val oa = from(callToRemoteServiceA)
val ob = from(callToRemoteServiceB())

val oc = oa.flatMap { res => from(callToRemoteServiceC(res)) }
val od = ob.flatMap { res => from(callToRemoteServiceD(res))}
val oe = ob.flatMap { res => from(callToRemoteServiceE(res))}

在这个版本的实现中,对ServiceAServiceB的调用被包装为一个Observable对象, 然后使用flatMap来把micro services 之间的依赖串接起来:

ServiceC的调用依赖于对ServiceA的调用,因此,我们在oa上调用flatMap方法, flatMap接受一个函数,参数为Observiable的每个元素,返回值为一个新的Observable。 这里我们传入的是:

res => from(callToRemoteServiceC(res))

就是对于oa的每个元素,用其做为参数调用ServiceC,并且包装成一个Observable。对ServiceD, ServiceE的调用也是类似的。

这个方案与上面方案最大的不同是,上面的例子中,我们需要不断地询问对ServiceA的调用是否完成, 若调用完成,再进行下面的动作(发起对ServiceC的调用)。 即便启动了新的线程以便不block在主线程,这个新的线程还是会被block住。 而在这个方案中,我们只需要定义好对ServiceA的调用完成后,需要做那些事情(发起对ServiceC的调用),代码也简洁了很多。

如果有个ServiceF依赖于ServiceE的执行结果,我们也可以很容易地通过flatMap来表述他们的依赖关系:

val of = oe.flatMap { res  => from(callToRemoteServiceF(res)) }

Reactive Extension中的概念

Observable

__Observable__用于表示一个可被消费的数据集合(data provider),它后面的数据的产生机制或者是同步的,或者是异步的,这都不重要的,最重要是它提供了下面的能力:

  • Observei可以通过Observable的subscribe向其注册。

  • 当Observable中有数据产生时,调用Observer的onNext方法通知有新数据到来。

  • 当Observable数据发送完毕时,调用Observer的onComplete方法通知数据发送完毕。

  • 当Observable内部出现错误时,调用Observer的onError方法通知有错误需要处理。

Observable 之于 Iterable

Observable 做为一个数据(事件)集合的抽象,也支持类似于Iterable上的各种,转换、组合操作,如mapfiltermerge等等,我们还是先从一个例子来看: 假设有一个GUI应用,我们使用一个Observable actions 来表示用户在界面上的操作(可能的值有click, drag, drop),

//这里我用interval模拟这些操作是异步的
val actionList = List("click", "drag", "drop", "click", "click")
val actions = interval(1 seconds).map(_.toInt).take(5).map(actionList(_))

有一个收集用户点击事件并打印日志的需求,我们该怎么实现呢?

actions.filter(_ == "click").subscribe(println("clicked at " + new Date()))

是不是和Iterable的操作非常相像? 实际上Observable和Iterable在很多方面都很相似:

  • 都是数据的容器。
  • 都可以对其应用一个映射函数(map, flatMap),从而得到一个新的Iterable/Observable。
  • 都可以对其中中的元素进行过滤,从而得到一个元素数量更少的Iterable/Observable。

Observable和Iterable最大的不同点:

  Observable对其消费者push数据,而Iterable没有这种能力。
  Iterable的消费值只能通过`pull`的方式获取数据。 
  而这种`push`的能力在Reactive Programming世界中极其重要。
Observer 之于 Observer Pattern

__Observer__的概念来自于设计模式中的Observer模式,并对其行为进行了扩展。设计模式中的Observer模式定义如下:

  有一个subject对象,它维护一个observer对象列表,当它的状态发生变化时,它会逐个通知这些observer。
  这里的Observer只有对外暴露一个行为:update, 当subject的状态发生变化时,
  subject通过这个update接口通知observer。

RxJava中的Obsever的这个update接口叫做onNext, 同时在此基础之上添加了两个行为:onCompletedonError,以应对Observable的这种特殊的data providersubject的需求:

  • onCompleted, 当Observable数据发送完毕后,调用此接口通知Observer。
  • onError,当Observable产生数据过程中出现错误时,调用此借口通知Observer。

总结

最后,引用RxJava中对Observable的解释:Observable填补了在异步编程领域中访问包含多个元素的异步序列的空白, 他们的关系正如下表所示:

  single item multiple items
synchronous T getData() Iterable<T> getData()
asynchronous Future<T> getData() Observable<T> getData()

RxJava极大地改进了java异步编程的体验,如果你受够了block Future,以及弱爆了容错机制,体验一下rxjava吧。

上面的示例代码在这里都可以找到。

更多资料,参考RxJava的wiki, 我只能帮到你这儿了 :)

Monoid

Given a Monoid trait Semigroup

trait Semigroup[M] {
  def append(a: M, b: M): M
  val zero: M
}

the following should hold:

append(a, append(b, c)) === append(append(a, b), c)
append(a, zero) = a
append(zero, a) = a

Monoid examples:

  • Int with + and 0
  • Int with * and 1
  • Boolean with || and false
  • A => A with compose and identity
  • List[A] with ++ and Nil
  • String with + and ""

Functor

Concept

Functor is a type class that defines how to apply a function to a value wrapped in a context(T).List, Option, Ethier, Try both are functor.

trait Functor[T[_], A] {
  def fmap[B](f: A => B): Functor[T, B]

  def id: T[A]
}

the Functor takes two type parameters, T[_] which is a generic type, and a type A one concrete example is:

//List as T, A as A
case class ListFunctor[A](val id: A, xs: List[A]) extends Functor[List, A] {
  def fmap[B](f: A => B): List[B] = ListFunctor(xs.map(f))
}

Functor laws:

  • fmap id = id

    if we map the id function over a functor, the functor that we get back should be the same as the original functor

  • for any functor F, the following should hold: fmap (f . g) F = fmap f (fmap g F)

    composing two functions and then mapping the resulting function over a functor should be the same as first mapping one function over the functor and then mapping the other one

Function is Functor:

Function composition:

Mapping a Function over a Function will produce a new Function(function composition), just like mapping a function over a List will produce a List, mapping a function over a Option will produce a Option.

Lifting:

Given a map function with type (A => B) => F[A] => F[B](F is a functor, it could be List, Option, or Ethier), we can think the map as a function which take a function (with type A => B) as parameter and return a new function just like the old one(with type F[A] => F[B]).

Applicative

Concept

Applicative is a type class that defines how to apply a function tf wrapped in a context T to a value wrapped in a context T.

trait Applicative[T[_], A] extends Functor[T, A] {
  def apply[B](f: T[A => B]): Applicative[T, B]
}

Monad

Concept

Monad is a type class Monad[T[_], A] that defines how to apply a function that returns a wrapped value A => T[B] to a wrapped value T[A].

trait Monad[T[_], A] extends Monoid[T, A] with Applicative[T, A] {
  def flatMap[B](f: A => T[B]): Monad[T, B]
}

Monad law:

  • Left identity

    Given a value x and a function f, the following should hold:

unit(x) flatMap f = f(x)
  • Right identity

Given a monad m, the following should hold:

m flatMap unit = m
  • Composition

Given a monad m and two functions f and g, the following should hold:

m flatMap f flatMap g == m flatMap g flatMap f

A concrete Monad example

case class ListMonad[A](val list: List[A])  extends Monad[List, A] {
  //defined in Monoid
  override def append(values: List[A]): ListMonad[A] = ListMonad(list ++ values)

  //defined in Monoid
  override def id: List[A] = Nil

  //defined in Functor
  override def fmap[B](f: (A) => B): ListMonad[B] = ListMonad(list.map(f))

  //defined in Applicative
  override def apply[B](tf: List[(A) => B]): ListMonad[B] = ListMonad(list.map(tf.head))

  //defined in Monad
  override def flatMap[B](f: (A) => List[B]): ListMonad[B] = ListMonad(list.flatMap(f))
}

相信大家对于下面这段代码都不会太陌生:

doSomethingBefore();

$.ajax({
  url: "test.html",
  success: function() {
	doSomethingWhenSucceed();
  }
});

doSomethingAfter();

上述代码的执行顺序是:

  1. 主程序首先调用doSomethingBefore
  2. 其次,主程序发起ajax调用,接下来继续执行doSomethingAfter,主程序结束。
  3. 待ajax请求得到响应并且响应成功时,doSomethingWhenSucced开始执行。

了解了javascript异步编程模型的人们这样的执行结果也不会觉得奇怪。那么,这段代码背后,到底发生了什么,我将在这里分享一下我的理解。

被jQuery惯坏了的程序员们或许已经忘记了使用原生的javascript api发起ajax调用了,我们先通过一个简单的例子回忆一下:

var xmlhttp = new XMLHttpRequest();

xmlhttp.onreadystatechange = function() {
  if (xmlhttp.readyState === 4){
	console.log(xmlhttp.responseText);
  }
};

xmlhttp.open("GET","https://api.github.com/users/tater/events",true);
xmlhttp.send();

XMLHttpRequest是ajax技术中最重要的一个概念,它是浏览器暴露给浏览器脚本语言(例如javascript)的一个接口,浏览器脚本语言(例如javascript)就可以通过这个API发起HTTP,HTTPS请求,并获取响应。

当我们需要发起一个ajax调用通常经过以下几步:

  1. 创建一个XMLHttpRequest对象
  2. 注册该XMLHttpRequest对象的onreadystatechange事件侦听器,即http请求成果后需要执行的动作。
  3. 使用这个对象的open方法发起异步http调用
  4. 浏览器发起http请求,并同时更新该XMLHttpRequest对象的readyState,触发readystatechange事件,(即此readystatechange事件进入javascript事件队列)。
  5. javascript引擎线程轮询事件队列时,遇到readystatechange事件,调用该事件的侦听器函数,完成此次调用。

具体执行步骤参见下图:

Ajax workflow 上述步骤也体现了ajax如何在javascript单线程执行模型下工作的,关于javascript单线程执行的细节, 我的前同事四火最近写了一篇关于javascript单线程执行的文章,详细介绍了javascript中单线程执行任务的原理。