spray based rest api

Spray is an open-source toolkit for building REST/HTTP-based integration layers on top of Scala and Akka. Being asynchronous, actor-based, fast, lightweight, modular and testable it’s a great way to connect your Scala applications to the world.

TLDR;

This post will give you a example of how to use spray to build a REST api.

Problem

you want to build a REST api that support the following operations:

GET     /users/:id
POST    /users

routing

spray-routing gives you a elegant DSL to build routing system, which will accept http request and respond correctly. let’s see the example:

//File: src/scala/com/example/MyService.scala

trait MyService extends HttpService {
  implicit val ec: ExecutionContext = actorRefFactory.dispatcher
  val userRepository: UserRepository

  val myRoute =
    path("users" / Segment) {
      userId => {
        get {
          complete {
            //must import SprayJsonSupport to get a json mashaller
            import spray.httpx.SprayJsonSupport._
            userRepository fetch userId
          }
        }
      }
    }
}

The MyService trait extends spray.routing.HttpService which includes a bunch of convinient mehtods for creating DSL, such as path, get, complete. The variable myRoute defines a set of rules:

  • When a http request matches GET /users/:id, call userRepo.get with extracted userId, and response with the result.
  • When a http request matches POST /users/, call userRepo.save with extracted userData, and response correponded status code.

Note that there is a field defined as val userRepository: UserRepository not be initialized. This field will be implemened in a acturall actor (MyServiceActor in this example). The actural business logic was deleagted into this object.

json support

I haven’t find a JSON library in Java/Scala world which providing as good api as Ruby doses. if you konw one please let me know.

spray-json is the most beautiful one I ever found!

spray-json allows you to convert between:

  • String JSON documents
  • JSON Abstract Syntax Trees (ASTs) with base type JsValue
  • instances of arbitrary Scala types

in this post, we just want to convert between JSON documents and Scala case class. To enable JSON serialization/deserialization for your case class, just define a implicit method which returns a instance of RootJsonFormat[YourCaseClass].

//File: src/scala/com/example/User.scala

package com.example

import spray.json.DefaultJsonProtocol._
import spray.json.RootJsonFormat


case class User(name: String, age: Int)
object User {
  implicit def userJsonFormat: RootJsonFormat[User] = jsonFormat2(User.apply)
}

Put them together

As we metioned before, Spray is built on top of Scala and Akka, to enable MyService to handle http request, we need to create a actor:

class MyServiceActor(userRepo: UserRepository) extends Actor with MyService {

  def actorRefFactory = context

  def receive = runRoute(myRoute)

  override val userRepository: UserRepository = userRepo
}

This actor mixin MyService, in the receive method, call runRoute to handle http request with pre-defined route.

A Tips:

put your business logic in plain scala object instead of actor.

I found that it is really hard to test logic in a actor, so I preferred to implate business logic in a pure scala class, then it is much easier to test it. then inject a instance of this class into a actor.

The full exmaple can be found here.

RxJava初探

RxJava 是来自于NetflixReactive Extension的java版实现。

Reactive Extenstion所要解决的一个问题是对多个异步任务的组合,依赖所带来的编码复杂性的问题,我们先从一个例子看起:

异步任务的依赖

假设我们的程序需要从五个micro-service获取数据,这些micro-services之间存在依赖关系,我们来看一下第一版实现:

note: 本文我使用了scala来做为RxJava的客户端代码,只是因为scala中支持lambda。关于rx-scala的更新信息,参阅这里

 val fa = callToRemoteServiceA();
 val fb = callToRemoteServiceB();

 val fc = callToRemoteServiceC(fa.get());
 val fd = callToRemoteServiceD(fb.get());
 val fe = callToRemoteServiceE(fb.get());

fa, fb, fc, fd, fe之间的依赖关系如下: Micro Services Dependencies</img>

由于这些future之前有依赖关系(fa,fb的执行结果是fc,fd,fe的输入),我们必须调用fa.get(), fb.get(), 而这会阻塞主线程的执行。

那么这种阻塞能否避免呢?当然可以,我们可以分别新起一个线程来创建fc, fd, fe。来看第二版实现:

  val fa = callToRemoteServiceA();
  val fb = callToRemoteServiceB();

  val fc = executor.submit(new Callable[String]() {
    override def call(): String = callToRemoteServiceC(fa.get).get
  })
  val fd = executor.submit(new Callable[String]() {
    override def call(): String = callToRemoteServiceD(fb.get).get
  })
  val fe = executor.submit(new Callable[String]() {
    override def call(): String = callToRemoteServiceE(fb.get).get
  })

在这个实现里,我们分别启动了一个线程来等待fa,fb的执行结果,然后再执行fc, fd, fe, 这样,主线程就不会被阻塞,然而,这却大大地增加代码的复杂度。 那么,能否不要等待future的执行结果(poll),而是等到Future执行完成的时候被通知到(push),Reactive Extenstion的Observable的出现就解决了这样的问题,我们先来看一下实现:

val oa = from(callToRemoteServiceA)
val ob = from(callToRemoteServiceB())

val oc = oa.flatMap { res => from(callToRemoteServiceC(res)) }
val od = ob.flatMap { res => from(callToRemoteServiceD(res))}
val oe = ob.flatMap { res => from(callToRemoteServiceE(res))}

在这个版本的实现中,对ServiceAServiceB的调用被包装为一个Observable对象, 然后使用flatMap来把micro services 之间的依赖串接起来:

ServiceC的调用依赖于对ServiceA的调用,因此,我们在oa上调用flatMap方法, flatMap接受一个函数,参数为Observiable的每个元素,返回值为一个新的Observable。 这里我们传入的是:

res => from(callToRemoteServiceC(res))

就是对于oa的每个元素,用其做为参数调用ServiceC,并且包装成一个Observable。对ServiceD, ServiceE的调用也是类似的。

这个方案与上面方案最大的不同是,上面的例子中,我们需要不断地询问对ServiceA的调用是否完成, 若调用完成,再进行下面的动作(发起对ServiceC的调用)。 即便启动了新的线程以便不block在主线程,这个新的线程还是会被block住。 而在这个方案中,我们只需要定义好对ServiceA的调用完成后,需要做那些事情(发起对ServiceC的调用),代码也简洁了很多。

如果有个ServiceF依赖于ServiceE的执行结果,我们也可以很容易地通过flatMap来表述他们的依赖关系:

val of = oe.flatMap { res  => from(callToRemoteServiceF(res)) }

Reactive Extension中的概念

Observable

__Observable__用于表示一个可被消费的数据集合(data provider),它后面的数据的产生机制或者是同步的,或者是异步的,这都不重要的,最重要是它提供了下面的能力:

  • Observei可以通过Observable的subscribe向其注册。

  • 当Observable中有数据产生时,调用Observer的onNext方法通知有新数据到来。

  • 当Observable数据发送完毕时,调用Observer的onComplete方法通知数据发送完毕。

  • 当Observable内部出现错误时,调用Observer的onError方法通知有错误需要处理。

Observable 之于 Iterable

Observable 做为一个数据(事件)集合的抽象,也支持类似于Iterable上的各种,转换、组合操作,如mapfiltermerge等等,我们还是先从一个例子来看: 假设有一个GUI应用,我们使用一个Observable actions 来表示用户在界面上的操作(可能的值有click, drag, drop),

//这里我用interval模拟这些操作是异步的
val actionList = List("click", "drag", "drop", "click", "click")
val actions = interval(1 seconds).map(_.toInt).take(5).map(actionList(_))

有一个收集用户点击事件并打印日志的需求,我们该怎么实现呢?

actions.filter(_ == "click").subscribe(println("clicked at " + new Date()))

是不是和Iterable的操作非常相像? 实际上Observable和Iterable在很多方面都很相似:

  • 都是数据的容器。
  • 都可以对其应用一个映射函数(map, flatMap),从而得到一个新的Iterable/Observable。
  • 都可以对其中中的元素进行过滤,从而得到一个元素数量更少的Iterable/Observable。

Observable和Iterable最大的不同点:

  Observable对其消费者push数据,而Iterable没有这种能力。
  Iterable的消费值只能通过`pull`的方式获取数据。 
  而这种`push`的能力在Reactive Programming世界中极其重要。
Observer 之于 Observer Pattern

__Observer__的概念来自于设计模式中的Observer模式,并对其行为进行了扩展。设计模式中的Observer模式定义如下:

  有一个subject对象,它维护一个observer对象列表,当它的状态发生变化时,它会逐个通知这些observer。
  这里的Observer只有对外暴露一个行为:update, 当subject的状态发生变化时,
  subject通过这个update接口通知observer。

RxJava中的Obsever的这个update接口叫做onNext, 同时在此基础之上添加了两个行为:onCompletedonError,以应对Observable的这种特殊的data providersubject的需求:

  • onCompleted, 当Observable数据发送完毕后,调用此接口通知Observer。
  • onError,当Observable产生数据过程中出现错误时,调用此借口通知Observer。

总结

最后,引用RxJava中对Observable的解释:Observable填补了在异步编程领域中访问包含多个元素的异步序列的空白, 他们的关系正如下表所示:

  single item multiple items
synchronous T getData() Iterable<T> getData()
asynchronous Future<T> getData() Observable<T> getData()

RxJava极大地改进了java异步编程的体验,如果你受够了block Future,以及弱爆了容错机制,体验一下rxjava吧。

上面的示例代码在这里都可以找到。

更多资料,参考RxJava的wiki, 我只能帮到你这儿了 :)

Monoid, Functor, Applicative and Monad

Monoid

Given a Monoid trait Semigroup

trait Semigroup[M] {
  def append(a: M, b: M): M
  val zero: M
}

the following should hold:

append(a, append(b, c)) === append(append(a, b), c)
append(a, zero) = a
append(zero, a) = a

Monoid examples:

  • Int with + and 0
  • Int with * and 1
  • Boolean with || and false
  • A => A with compose and identity
  • List[A] with ++ and Nil
  • String with + and ""

Functor

Concept

Functor is a type class that defines how to apply a function to a value wrapped in a context(T).List, Option, Ethier, Try both are functor.

trait Functor[T[_], A] {
  def fmap[B](f: A => B): Functor[T, B]

  def id: T[A]
}

the Functor takes two type parameters, T[_] which is a generic type, and a type A one concrete example is:

//List as T, A as A
case class ListFunctor[A](val id: A, xs: List[A]) extends Functor[List, A] {
  def fmap[B](f: A => B): List[B] = ListFunctor(xs.map(f))
}

Functor laws:

  • fmap id = id

    if we map the id function over a functor, the functor that we get back should be the same as the original functor

  • for any functor F, the following should hold: fmap (f . g) F = fmap f (fmap g F)

    composing two functions and then mapping the resulting function over a functor should be the same as first mapping one function over the functor and then mapping the other one

Function is Functor:

Function composition:

Mapping a Function over a Function will produce a new Function(function composition), just like mapping a function over a List will produce a List, mapping a function over a Option will produce a Option.

Lifting:

Given a map function with type (A => B) => F[A] => F[B](F is a functor, it could be List, Option, or Ethier), we can think the map as a function which take a function (with type A => B) as parameter and return a new function just like the old one(with type F[A] => F[B]).

Applicative

Concept

Applicative is a type class that defines how to apply a function tf wrapped in a context T to a value wrapped in a context T.

trait Applicative[T[_], A] extends Functor[T, A] {
  def apply[B](f: T[A => B]): Applicative[T, B]
}

Monad

Concept

Monad is a type class Monad[T[_], A] that defines how to apply a function that returns a wrapped value A => T[B] to a wrapped value T[A].

trait Monad[T[_], A] extends Monoid[T, A] with Applicative[T, A] {
  def flatMap[B](f: A => T[B]): Monad[T, B]
}

Monad law:

  • Left identity

    Given a value x and a function f, the following should hold:

unit(x) flatMap f = f(x)
  • Right identity

Given a monad m, the following should hold:

m flatMap unit = m
  • Composition

Given a monad m and two functions f and g, the following should hold:

m flatMap f flatMap g == m flatMap g flatMap f

A concrete Monad example

case class ListMonad[A](val list: List[A])  extends Monad[List, A] {
  //defined in Monoid
  override def append(values: List[A]): ListMonad[A] = ListMonad(list ++ values)

  //defined in Monoid
  override def id: List[A] = Nil

  //defined in Functor
  override def fmap[B](f: (A) => B): ListMonad[B] = ListMonad(list.map(f))

  //defined in Applicative
  override def apply[B](tf: List[(A) => B]): ListMonad[B] = ListMonad(list.map(tf.head))

  //defined in Monad
  override def flatMap[B](f: (A) => List[B]): ListMonad[B] = ListMonad(list.flatMap(f))
}

Behind $.ajax

相信大家对于下面这段代码都不会太陌生:

doSomethingBefore();

$.ajax({
  url: "test.html",
  success: function() {
	doSomethingWhenSucceed();
  }
});

doSomethingAfter();

上述代码的执行顺序是:

  1. 主程序首先调用doSomethingBefore
  2. 其次,主程序发起ajax调用,接下来继续执行doSomethingAfter,主程序结束。
  3. 待ajax请求得到响应并且响应成功时,doSomethingWhenSucced开始执行。

了解了javascript异步编程模型的人们这样的执行结果也不会觉得奇怪。那么,这段代码背后,到底发生了什么,我将在这里分享一下我的理解。

被jQuery惯坏了的程序员们或许已经忘记了使用原生的javascript api发起ajax调用了,我们先通过一个简单的例子回忆一下:

var xmlhttp = new XMLHttpRequest();

xmlhttp.onreadystatechange = function() {
  if (xmlhttp.readyState === 4){
	console.log(xmlhttp.responseText);
  }
};

xmlhttp.open("GET","https://api.github.com/users/tater/events",true);
xmlhttp.send();

XMLHttpRequest是ajax技术中最重要的一个概念,它是浏览器暴露给浏览器脚本语言(例如javascript)的一个接口,浏览器脚本语言(例如javascript)就可以通过这个API发起HTTP,HTTPS请求,并获取响应。

当我们需要发起一个ajax调用通常经过以下几步:

  1. 创建一个XMLHttpRequest对象
  2. 注册该XMLHttpRequest对象的onreadystatechange事件侦听器,即http请求成果后需要执行的动作。
  3. 使用这个对象的open方法发起异步http调用
  4. 浏览器发起http请求,并同时更新该XMLHttpRequest对象的readyState,触发readystatechange事件,(即此readystatechange事件进入javascript事件队列)。
  5. javascript引擎线程轮询事件队列时,遇到readystatechange事件,调用该事件的侦听器函数,完成此次调用。

具体执行步骤参见下图:

Ajax workflow 上述步骤也体现了ajax如何在javascript单线程执行模型下工作的,关于javascript单线程执行的细节, 我的前同事四火最近写了一篇关于javascript单线程执行的文章,详细介绍了javascript中单线程执行任务的原理。

Create a scala project with sbt

在ruby的世界里,习惯了使用bundle来管理依赖,Rake来实现自动化构建,那么在scala的世界里,如果遇到一个好玩的开源库,例如akka,如何很快的把玩一下呢?以前我的做法会是:

  • 打开InteliJ,创建一个scala项目
  • 下载开源包到本地
  • 写程序

有没有更便捷,轻量级一点的方式呢?答案是可以的,这里我就简单介绍一下使用sbt管理依赖,自动化构建scala程序。

  • 安装sbt.
  • 安装sbt的idea插件
  • 创建一个工程目录
  • 在这个工程目录下新建一个名为build.sbt的文件,包含以下内容。
name := "<your project name>"

version := "0.1"

scalaVersion := "2.11-M3"

libraryDependencies += "org.scalatest" % "scalatest_2.10" % "1.9.1" % "test"

这个文件中是scala语法,指定了项目名称,版本,依赖的scala的版本,以及项目所需要的依赖,例如我想要使用akka,那么就在该文件中加入以下内容

libraryDependencies += "com.typesafe.akka" % "akka-actor_2.11.0-M3" % "2.2.0"
  • 在工程目录下运行sbt update下载项目依赖包。
  • 运行sbt gen-idea生成idea工程文件。
  • 打开InteliJ导入刚刚生成的工程。

好,开始写scala程序吧。

由于我经常创建小的scala项目,于是写了一个ruby程序来生成scala项目