Java 8 collectors

Java 8 has been released quite a long time, since I come to ruby in 2011, I haven’t work with java. Even I heard that there’re few cool features come out with Java 8, like lambda, stream collection, functional interface, new date api. none of them can attract me, given ruby ship all these features since the date of birth.

While recently I do try to solve a problem with Java, and I found that Java has changed a lot compared with my impression on it. in this post, I’m going to talk about the collectors shipped with Java 8. also I’ll try to give collector example in scala.

The Problem

Reduction, (aka. iterating on a collection, apply reduction computation on each elements, and produce a single result or a smaller collection) is a common problem in any programming language.

Let’s look at a specific example:

Given an collection of employees, grouping these employees by age produce a map between age and list of employees.

Here is the class definition of Employee:

class Employee{
    private int age;
    private String name;

    Employee(String name, int age) {
        this.age = age;
        this.name = name;
    }
}

The Solution

a simple implementation could be:

List<Employee> employees = Arrays.asList(new Employee("A", 18), new Employee("B", 20), new Employee("C", 18));
Map<Integer, List<Employee>> byAge = new HashMap<>();
for(Employee employee: employees) {
  List<Employee> employeeByAge = byAge.get(employee.getAge())
  if(employeeByAge = null) {
    employeeByAge = new ArrayList<>();
  }
  employeeByAge.add(employee);
  byAge.put(employee.getAge(), employeeByAge);
}

if you have been working with Java for quite a long time, you may be sick to write these code. you must have write code in this structure for quite a long time. to demonstrate the duplication of this structure, let’s rewrite the above code to this format:

Collection<T> collections = ...
Collection<R> results = new SomeCollection;

for(T t: in collections) {
  R r = results.get(t)
  if (r == null) {
    r = new R
  }
  r.add(t)
}

all these code did is to collect some information for give collection and apply reduction on the items in this collection and produce a result container.

with Java 8s collector interface, you can simply do

List<Employee> employees = Arrays.asList(new Employee("A", 18), new Employee("B", 20), new Employee("C", 18));
Map<Integer, List<Employee>> byAge = employees.stream().collect(Collectors.groupingBy((e) -> e.age));

so what is the magic behind it:

The magic is behind the Collector<T, A, R> interface:

Collectors.groupingBy is a built in collector which acceptting a function with type T -> K which can be group against(in this case, employee.age()). it will produce a result with type Map<K, List<T>>(aka, the result type R).

Here is the official definition of Collector from its api document:

A mutable reduction operation that accumulates input elements into a mutable result container, optionally transforming the accumulated result into a final representation after all input elements have been processed. Reduction operations can be performed either sequentially or in parallel.

You see from the document, Collector take three type parameters T, A and R, where T is the type of element inside the collection, A is an intermediate type which could be used to do the mutable reduction, R is the type of result.

There four functions in this interface which work together to accumulate entries into a mutable result container.

  • supplier(), with type () -> A - creation of a new result container.
  • accumulator(), with type(A, T) -> A - incorprating a new element into the result container.
  • combiner(), with type (A, A) -> A - combing two result container into one.
  • finisher(), with type A -> R - a optional final transformation on the result container to get the result. the optional means that in some scenarios, A and R could be same, so the finisher function is not required. but in some other cases, when A and R are different, this function is required to get the final result.

In the previous example, the type of result of Collector.groupingBy is Collector<Employee, ?, Map<Integer, List<Employee>>.

let’s extend this problem a little bit: how about grouping employees by age range(e.g. 20-29 as a group, 30-39 as a group) this time, you can not find any buitin collector which is suitable to solve this problem, now, you will need a customised collector to do the reduction.

(this blog post)[http://www.nurkiewicz.com/2014/07/introduction-to-writing-custom.html] is a fairly good guide for how to create you own Collector implementation.

Collector in Scala

After found this useful pattern, I wonder if scala’s powerful collection system support this computation. Unfortunately, I can not found a similar api from any collection type. But I do found that we can easily build our own version of collector based on scala.collection.mutable.Builder.

scala.collection.mutable.Builder play the same role with accumulator (the A) in java Collector. Let’s see the following example of how we implement the collect method in scala and how we use it to solve the word count problem:

import scala.collection.mutable.Builder

//`T` is the type of element in collection, `Builder` is the type of intermediate result container, `R` is the type of reduction result.
def collect[T, R] (ts: List[T], a: Builder[T, R]):R = {
  ts.foreach (a += _) //invoke the accumulator function
  a.result //invoke the finisher function
}

//word counting builder
//from Seq[String] to Map[String, Int]
class CounterBuilder[String, Map[String, Int]] extends Builder[String, Map[String, Int]] {

  var counter = scala.collection.mutable.Map[String, Int]()

  def += (el: String) = {
    counter.get(el)  match {
      case None => counter += el -> 1
      case Some(count) => counter += el -> (count + 1)
    }
    this
  }

  def clear = counter = scala.collection.mutable.Map[String, Int]()

  def result: Map[String, Int] = counter.toMap[String, Int].asInstanceOf[Map[String, Int]]
}

and here is the code to use the CounterBuilder

//use case
val xs = List("a", "a", "a", "c", "d")

//the supplier function
val builder = new CounterBuilder
val res = collect[String, Map[String, Int]] (xs, new CounterBuilder)
//output: Map(d -> 1, a -> 3, c -> 1)

Conclusion

  • Java 8s Collector Api provide a better way to encapsulate reduction computation - not only some built in reduction(e.g. max, min, sum, average), but also customized reduction(via customised collector), Collector is designed to be composed, which means these reduction logic are much easier to be reused.
  • Scala dose not have native support for customised mutable reduction, but based on scala’s powerfull collection system, we can create our own version.

References

build rest api in scala

micro service getting more attractions in the past few years, more and more orgnizations is moving to this area. many framework were created to make building micro service easier, in java world, we have DropWizard, in ruby world, we have Rails-API, Grape and Lotus. but how the scala guys solve this problem?

in my previous post, I demostrated how to build api with spray, in this post I’ll try to use another framework - unfilter and json4s to build a similar api.

The problems:

you want to build a api which can accept http request and send response in json format.

to achieve this goal, we need:

  • a server listening on a local port, wrap received http request and pass it to the application.
  • a request mapping mechanism where you can define how http request should be handled.
  • a tool which can covert between plan scala object and json.

Unfilter have it’s answer for the first two questions:

Request mapping

in unfilter, the request mapping mechanism was called plan and intent, a accurat name, right?

from unfilter’s document:

  • An intent is a partial function for matching requests.
  • A plan binds an intent to a particular server interface.

Here is the code:

object PushApi extends Plan {
  def intent = {
    case req @ POST(Path(Seg("devices" :: Nil))) => {
      //code omitted
    }
    case req @ POST(Path(Seg("notifications" :: Nil))) => {
      //code omitted
    }
  }
}

Server

Unfilter can be run on top of jetty / netty, to do so, just run your plan with correponded server:

unfiltered.netty.Server.http(8080).plan(PushApi).run()
//or
unfiltered.jetty.Server.http(8080).plan(PushApi).run()

Json serialization/deserialization

The biggest difference between spray-json and json4s is serialization/deserialization done implicitly or explicitly.

in spray-json, you can get serialization/deserialization(aka. marshalling) implicitly if you defined your own JsonFormat, the marshalling mechanism will do their job while extracting information from request and send response implicitly. it’s cool when everything works fine, but if something went wrong it’s really hard to debug. e.g. this one took me years to find out.

with json4s, you have to serialize/deserialize your object explicitly, but the api is very neat and easy to use.

peronally, I really like json4s’s solution.

Conclusion

Compared with spray, unfilter focused on request mapping and dispatching, json4s focused on json serialization/deserialization, they both did a very good job. I highly recommand you to try it in your next scala api project.

The full example can be found here

spray based rest api

Spray is an open-source toolkit for building REST/HTTP-based integration layers on top of Scala and Akka. Being asynchronous, actor-based, fast, lightweight, modular and testable it’s a great way to connect your Scala applications to the world.

TLDR;

This post will give you a example of how to use spray to build a REST api.

Problem

you want to build a REST api that support the following operations:

GET     /users/:id
POST    /users

routing

spray-routing gives you a elegant DSL to build routing system, which will accept http request and respond correctly. let’s see the example:

//File: src/scala/com/example/MyService.scala

trait MyService extends HttpService {
  implicit val ec: ExecutionContext = actorRefFactory.dispatcher
  val userRepository: UserRepository

  val myRoute =
    path("users" / Segment) {
      userId => {
        get {
          complete {
            //must import SprayJsonSupport to get a json mashaller
            import spray.httpx.SprayJsonSupport._
            userRepository fetch userId
          }
        }
      }
    }
}

The MyService trait extends spray.routing.HttpService which includes a bunch of convinient mehtods for creating DSL, such as path, get, complete. The variable myRoute defines a set of rules:

  • When a http request matches GET /users/:id, call userRepo.get with extracted userId, and response with the result.
  • When a http request matches POST /users/, call userRepo.save with extracted userData, and response correponded status code.

Note that there is a field defined as val userRepository: UserRepository not be initialized. This field will be implemened in a acturall actor (MyServiceActor in this example). The actural business logic was deleagted into this object.

json support

I haven’t find a JSON library in Java/Scala world which providing as good api as Ruby doses. if you konw one please let me know.

spray-json is the most beautiful one I ever found!

spray-json allows you to convert between:

  • String JSON documents
  • JSON Abstract Syntax Trees (ASTs) with base type JsValue
  • instances of arbitrary Scala types

in this post, we just want to convert between JSON documents and Scala case class. To enable JSON serialization/deserialization for your case class, just define a implicit method which returns a instance of RootJsonFormat[YourCaseClass].

//File: src/scala/com/example/User.scala

package com.example

import spray.json.DefaultJsonProtocol._
import spray.json.RootJsonFormat


case class User(name: String, age: Int)
object User {
  implicit def userJsonFormat: RootJsonFormat[User] = jsonFormat2(User.apply)
}

Put them together

As we metioned before, Spray is built on top of Scala and Akka, to enable MyService to handle http request, we need to create a actor:

class MyServiceActor(userRepo: UserRepository) extends Actor with MyService {

  def actorRefFactory = context

  def receive = runRoute(myRoute)

  override val userRepository: UserRepository = userRepo
}

This actor mixin MyService, in the receive method, call runRoute to handle http request with pre-defined route.

A Tips:

put your business logic in plain scala object instead of actor.

I found that it is really hard to test logic in a actor, so I preferred to implate business logic in a pure scala class, then it is much easier to test it. then inject a instance of this class into a actor.

The full exmaple can be found here.

RxJava初探

RxJava 是来自于NetflixReactive Extension的java版实现。

Reactive Extenstion所要解决的一个问题是对多个异步任务的组合,依赖所带来的编码复杂性的问题,我们先从一个例子看起:

异步任务的依赖

假设我们的程序需要从五个micro-service获取数据,这些micro-services之间存在依赖关系,我们来看一下第一版实现:

note: 本文我使用了scala来做为RxJava的客户端代码,只是因为scala中支持lambda。关于rx-scala的更新信息,参阅这里

 val fa = callToRemoteServiceA();
 val fb = callToRemoteServiceB();

 val fc = callToRemoteServiceC(fa.get());
 val fd = callToRemoteServiceD(fb.get());
 val fe = callToRemoteServiceE(fb.get());

fa, fb, fc, fd, fe之间的依赖关系如下: Micro Services Dependencies</img>

由于这些future之前有依赖关系(fa,fb的执行结果是fc,fd,fe的输入),我们必须调用fa.get(), fb.get(), 而这会阻塞主线程的执行。

那么这种阻塞能否避免呢?当然可以,我们可以分别新起一个线程来创建fc, fd, fe。来看第二版实现:

  val fa = callToRemoteServiceA();
  val fb = callToRemoteServiceB();

  val fc = executor.submit(new Callable[String]() {
    override def call(): String = callToRemoteServiceC(fa.get).get
  })
  val fd = executor.submit(new Callable[String]() {
    override def call(): String = callToRemoteServiceD(fb.get).get
  })
  val fe = executor.submit(new Callable[String]() {
    override def call(): String = callToRemoteServiceE(fb.get).get
  })

在这个实现里,我们分别启动了一个线程来等待fa,fb的执行结果,然后再执行fc, fd, fe, 这样,主线程就不会被阻塞,然而,这却大大地增加代码的复杂度。 那么,能否不要等待future的执行结果(poll),而是等到Future执行完成的时候被通知到(push),Reactive Extenstion的Observable的出现就解决了这样的问题,我们先来看一下实现:

val oa = from(callToRemoteServiceA)
val ob = from(callToRemoteServiceB())

val oc = oa.flatMap { res => from(callToRemoteServiceC(res)) }
val od = ob.flatMap { res => from(callToRemoteServiceD(res))}
val oe = ob.flatMap { res => from(callToRemoteServiceE(res))}

在这个版本的实现中,对ServiceAServiceB的调用被包装为一个Observable对象, 然后使用flatMap来把micro services 之间的依赖串接起来:

ServiceC的调用依赖于对ServiceA的调用,因此,我们在oa上调用flatMap方法, flatMap接受一个函数,参数为Observiable的每个元素,返回值为一个新的Observable。 这里我们传入的是:

res => from(callToRemoteServiceC(res))

就是对于oa的每个元素,用其做为参数调用ServiceC,并且包装成一个Observable。对ServiceD, ServiceE的调用也是类似的。

这个方案与上面方案最大的不同是,上面的例子中,我们需要不断地询问对ServiceA的调用是否完成, 若调用完成,再进行下面的动作(发起对ServiceC的调用)。 即便启动了新的线程以便不block在主线程,这个新的线程还是会被block住。 而在这个方案中,我们只需要定义好对ServiceA的调用完成后,需要做那些事情(发起对ServiceC的调用),代码也简洁了很多。

如果有个ServiceF依赖于ServiceE的执行结果,我们也可以很容易地通过flatMap来表述他们的依赖关系:

val of = oe.flatMap { res  => from(callToRemoteServiceF(res)) }

Reactive Extension中的概念

Observable

__Observable__用于表示一个可被消费的数据集合(data provider),它后面的数据的产生机制或者是同步的,或者是异步的,这都不重要的,最重要是它提供了下面的能力:

  • Observei可以通过Observable的subscribe向其注册。

  • 当Observable中有数据产生时,调用Observer的onNext方法通知有新数据到来。

  • 当Observable数据发送完毕时,调用Observer的onComplete方法通知数据发送完毕。

  • 当Observable内部出现错误时,调用Observer的onError方法通知有错误需要处理。

Observable 之于 Iterable

Observable 做为一个数据(事件)集合的抽象,也支持类似于Iterable上的各种,转换、组合操作,如mapfiltermerge等等,我们还是先从一个例子来看: 假设有一个GUI应用,我们使用一个Observable actions 来表示用户在界面上的操作(可能的值有click, drag, drop),

//这里我用interval模拟这些操作是异步的
val actionList = List("click", "drag", "drop", "click", "click")
val actions = interval(1 seconds).map(_.toInt).take(5).map(actionList(_))

有一个收集用户点击事件并打印日志的需求,我们该怎么实现呢?

actions.filter(_ == "click").subscribe(println("clicked at " + new Date()))

是不是和Iterable的操作非常相像? 实际上Observable和Iterable在很多方面都很相似:

  • 都是数据的容器。
  • 都可以对其应用一个映射函数(map, flatMap),从而得到一个新的Iterable/Observable。
  • 都可以对其中中的元素进行过滤,从而得到一个元素数量更少的Iterable/Observable。

Observable和Iterable最大的不同点:

  Observable对其消费者push数据,而Iterable没有这种能力。
  Iterable的消费值只能通过`pull`的方式获取数据。 
  而这种`push`的能力在Reactive Programming世界中极其重要。
Observer 之于 Observer Pattern

__Observer__的概念来自于设计模式中的Observer模式,并对其行为进行了扩展。设计模式中的Observer模式定义如下:

  有一个subject对象,它维护一个observer对象列表,当它的状态发生变化时,它会逐个通知这些observer。
  这里的Observer只有对外暴露一个行为:update, 当subject的状态发生变化时,
  subject通过这个update接口通知observer。

RxJava中的Obsever的这个update接口叫做onNext, 同时在此基础之上添加了两个行为:onCompletedonError,以应对Observable的这种特殊的data providersubject的需求:

  • onCompleted, 当Observable数据发送完毕后,调用此接口通知Observer。
  • onError,当Observable产生数据过程中出现错误时,调用此借口通知Observer。

总结

最后,引用RxJava中对Observable的解释:Observable填补了在异步编程领域中访问包含多个元素的异步序列的空白, 他们的关系正如下表所示:

  single item multiple items
synchronous T getData() Iterable<T> getData()
asynchronous Future<T> getData() Observable<T> getData()

RxJava极大地改进了java异步编程的体验,如果你受够了block Future,以及弱爆了容错机制,体验一下rxjava吧。

上面的示例代码在这里都可以找到。

更多资料,参考RxJava的wiki, 我只能帮到你这儿了 :)

Monoid, Functor, Applicative and Monad

Monoid

Given a Monoid trait Semigroup

trait Semigroup[M] {
  def append(a: M, b: M): M
  val zero: M
}

the following should hold:

append(a, append(b, c)) === append(append(a, b), c)
append(a, zero) = a
append(zero, a) = a

Monoid examples:

  • Int with + and 0
  • Int with * and 1
  • Boolean with || and false
  • A => A with compose and identity
  • List[A] with ++ and Nil
  • String with + and ""

Functor

Concept

Functor is a type class that defines how to apply a function to a value wrapped in a context(T).List, Option, Ethier, Try both are functor.

trait Functor[T[_], A] {
  def fmap[B](f: A => B): Functor[T, B]

  def id: T[A]
}

the Functor takes two type parameters, T[_] which is a generic type, and a type A one concrete example is:

//List as T, A as A
case class ListFunctor[A](val id: A, xs: List[A]) extends Functor[List, A] {
  def fmap[B](f: A => B): List[B] = ListFunctor(xs.map(f))
}

Functor laws:

  • fmap id = id

    if we map the id function over a functor, the functor that we get back should be the same as the original functor

  • for any functor F, the following should hold: fmap (f . g) F = fmap f (fmap g F)

    composing two functions and then mapping the resulting function over a functor should be the same as first mapping one function over the functor and then mapping the other one

Function is Functor:

Function composition:

Mapping a Function over a Function will produce a new Function(function composition), just like mapping a function over a List will produce a List, mapping a function over a Option will produce a Option.

Lifting:

Given a map function with type (A => B) => F[A] => F[B](F is a functor, it could be List, Option, or Ethier), we can think the map as a function which take a function (with type A => B) as parameter and return a new function just like the old one(with type F[A] => F[B]).

Applicative

Concept

Applicative is a type class that defines how to apply a function tf wrapped in a context T to a value wrapped in a context T.

trait Applicative[T[_], A] extends Functor[T, A] {
  def apply[B](f: T[A => B]): Applicative[T, B]
}

Monad

Concept

Monad is a type class Monad[T[_], A] that defines how to apply a function that returns a wrapped value A => T[B] to a wrapped value T[A].

trait Monad[T[_], A] extends Monoid[T, A] with Applicative[T, A] {
  def flatMap[B](f: A => T[B]): Monad[T, B]
}

Monad law:

  • Left identity

    Given a value x and a function f, the following should hold:

unit(x) flatMap f = f(x)
  • Right identity

Given a monad m, the following should hold:

m flatMap unit = m
  • Composition

Given a monad m and two functions f and g, the following should hold:

m flatMap f flatMap g == m flatMap g flatMap f

A concrete Monad example

case class ListMonad[A](val list: List[A])  extends Monad[List, A] {
  //defined in Monoid
  override def append(values: List[A]): ListMonad[A] = ListMonad(list ++ values)

  //defined in Monoid
  override def id: List[A] = Nil

  //defined in Functor
  override def fmap[B](f: (A) => B): ListMonad[B] = ListMonad(list.map(f))

  //defined in Applicative
  override def apply[B](tf: List[(A) => B]): ListMonad[B] = ListMonad(list.map(tf.head))

  //defined in Monad
  override def flatMap[B](f: (A) => List[B]): ListMonad[B] = ListMonad(list.flatMap(f))
}