Wednesday Dec 31, 2008

The Year That Will Be

It's 2009 now, in Beijing.

1==0.999999999......

I met Erlang 2 years ago, which finally brings me to Scala. I learnt a lot from Erlang, and I entered the Scala world with Erlang atmosphere surrounding me. The FP, the Pattern Match, the Actor/Process, I found these familiar friends in Scala everywhere.

Scala has extra bonus, to me, static types and OO/FP. The domains I face are usually with a lot of business logic, or, the worlds I try to describe are not only messages, they are, models I don't think are suitable to describe in Function only.

The world itself is OO/FP mixed, like Martin's quote: Two sides of coin. It's something like the Particle/Wave in Quantum. The world is an infinite whole, but the reason of Human Being is always finite, we are using out finite reason to measure the infinite world, it's an unsolvable contradiction: Infinity vs Finite. We have to read our world in OO and, in FP, in snapshot and in continuation.

There won't be "Super Hero" in computer languages, the world is getting self-organization and harmony, so do the languages. Each language is living in an eco-system, born, growing via interacting with environment, disappear ...

The Economy

It was bad in 2008. I tried to do some computing on stock market based on my neural network. What I can say is it will be swing in the next half-year, no big drop, no big rise. The Shanghai Stock Index will swing between 1200 and 3000. At least, no big worse any more.

My Self

I need to make some big decisions in this a year.

A Case Study of Scalability Related "Out of memory" Crash in Erlang

We are building a platform for message switching, in Erlang. Everything looks OK on stability and features. It actually has run more than half year with zero down. We tested its performance on our 2-core CPU machine before, and got about 140 transactions/second, it's good enough.

Then, we got a 8-core CPU machine several weeks ago, and we did same performance testing on it, to see the scalability. Since Erlang is almost perfect on scalability, you can image the result, yes, about 700 transactions/second now, scaled almost linear. Until it crashed with "out of memory" when million hits processed.

It left a very big "erl_crash.dump" file there, I had to dig the issue. My first guess was, were some remote requests (access db, access remote web service etc) timeout but the process itself was not timeout yet, and cause more and more processes kept in VM?

A quick grep "=proc:" erl_crash.dump showed that the total number of processes was about 980, which was reasonable for our case.

So, which process ate so many memory? A quick grep "Stack+head" erl_crash.dump showed that there was indeed a process with 285082125 size of Stack+head there.

Following this clue, I caught this process:

=proc:<0.4.0>
State: Garbing
Name: error_logger
Spawned as: proc_lib:init_p/5
Last scheduled in for: io_lib_format:pad_char/2
Spawned by: <0.1.0>
Started: Sun Apr  1 01:21:50 2012
Message queue length: 2086029
Number of heap fragments: 1234053
Heap fragment data: 281266956
Link list: [<0.27.0>, <0.0.0>, {from,<0.42.0>,#Ref<0.0.0.88>}]
Reductions: 72745575
Stack+heap: 285082125
OldHeap: 47828850
Heap unused: 121777661
OldHeap unused: 47828850
Program counter: 0x0764c66c (io_lib_format:pad_char/2 + 4)
CP: 0x0764c1b4 (io_lib_format:collect_cseq/2 + 124)

This process was error_logger, which is from OTP/Erlang standard lib: error_logger, writing received messages to log file or tty. The typical usage is:

error_logger:info_msg("~p:~p " ++ Format, [?MODULE, ?LINE] ++ Data))

Which will format Data to a String according to the Format string, and write it to tty or log file.

The above case showed the message queue length of process "error_logger" had reached 1234053, and the Stack+heap was 285082125, about 272M size.

So the cause may be, that the message queue could not be processed in time, the messages were crowded in error_logger's process and finally caused "out of memory". The bottle-neck was that when error_logger tried to format the message to String, Erlang VM was weak on processing them, which seemed to need a lot of CPU cycles.

In my previous blog, I talked about Erlang is bad on massive text processing. Erlang processes String/Text via List, which is obvious bottle-neck in Erlang now, with Erlang is getting much and much popular and more and more Erlang applications are written.

But, why this did not happen on our 2-core CPU machine? It's an interesting scalability related problem:

"error_logger" module will registered one and only one process to receive and handle all log messages. But Erlang VM's scheduler can not distribute ONE process to use multiple CPUs' computing ability. In our 2-core machine, the whole ability is about 140 transactions/second, the one process of "error_logger" just happened to have the power to handle corresponding log messages in time. Under 8-core CPUs machine, our platform scales to handle 700 transactions/second, but there is still only one process of "error_logger", which can not use 8-core CPUs' ability at all, and finally fail on it.

Erlang treats every process fairly (although you can change the priority manually), we can do a simple/quick evaluation:

1. 2-Core machine, keeping hits at 140 trans/second:
The number of simultaneous processes will be about 200, each process shares the CPU cycles: 1/200 * 2 Core = 1%

2. 8-Core machine, keeping hits at 700 trans/second:
The number of simultaneous processes will be about 980, each process shares the CPU cycles: 1/980 * 8 Core = 0.82%

So, the CPU cycles shared by error_logger process actually not increases.

BTW, I think error_logger should cut its message queue when can not process them in time (disk IO may also be slower than receiving messages).

Sunday Dec 28, 2008

CN Erlounge III

I attended CN Erlounge III last weekend, it was a 2-day conference. I did a presentation about Scala vs Erlang.

I met Jackyz who is one of the translators of Chinese version "Programming Erlang". And Aimin who is writing a Delphi module to support Erlang c-node and c-driver in Delphi.

There is a commercial network monitoring product using Erlang from a major telecom company in China. And our Mobile-Banking platform (in Erlang) is scheduled to launch at middle of January too.

I talked with Yeka and Diuera from Broadview, a leading publisher in IT in China, they are really interested in importing "Programming in Scala" to mainland China.

And many thanks to Shiwei Xu, who is heavy working on Erlang community in China, and took the place to organize this conference.

I gave some encouragements to younger developers on learning Erlang and reading "Programming Erlang", since I'm the oldest one in attendees :-). Erlang is one of the best pragmatic and clear languages to learn concurrent/parallel and functional programming, and the book, is a very thoughtful and philosophic one on these perceptions.

And I'd like to see "Programming in Scala" also appeals in China soon, Scala is another pragmatic language on solving real world problems and, the book, is also thoughtful and philosophic one on our real world on Types, OO and FP.

Of course, choosing Scala or Erlang for your real world project should depend on the requirements.

I may be back to Vancouver next month for a while. Oh, it will be the beginning of new year.

CN Erlounge III photos by krzycube

Monday Dec 15, 2008

Thinking in Scala vs Erlang

Keeping Erlang in mind, I've coded two months in Scala, I'm thinking something called "Scala vs Erlang", I wrote some benchmark code to prove me (the code and result may be available someday), and I'd like to do some gradually summary on it in practical aspect. These opinions may be or not be correct currently due to lacking of deep experience and understanding, but, anyway, I need to record them now and correct myself with more experiences and understanding got on both Scala and Erlang.

Part I. Syntax

List comprehension

Erlang:

Lst = [1,2,3,4],
[X + 1 || X <- Lst],
lists:map(fun(X) -> X + 1 end, Lst)

Scala:

val lst = List(1,2,3,4) 
for (x <- lst) yield x + 1
lst.map{x => x + 1}
lst.map{_ + 1} // or place holder

Pattern match

Erlang:

case X of
   {A, B} when is_integer(A), A > 1 -> ok;
   _ -> error
end,

{ok, [{A, B} = H|T]} = my_function(X)

Scala:

x match {
   case (a:Int, b:_) if a > 1 => OK // can match type
   case _ => ERROR
}

val ("ok", (h@(a, b)) :: t) = my_function(x)

List, Tuple, Array, Map, Binary, Bit

Erlang:

Lst = [1, 2, 3] %% List
[0 | Lst]  %% List concat
{1, 2, 3}  %% Tuple
<<1, 2, “abc”>>  %% Binary
%% no Array, Map syntax

Scala:

val lst = List(1, 2, 3)  // List
0 :: lst  // List concat
(1, 2, 3) // Tuple
Array(1, 2, 3) // Array
Map(“a” -> 1, “b” -> 2) // Map
// no Binary, Bit syntax

Process, Actor

Erlang:

the_actor(X) -> 
   receive 
      ok -> io:format(“~p~n”, [X]);
      I -> the_actor(X + I) %% needs to explicitly continue loop
   end.
P = spawn(mymodule, the_actor, [0])
P ! 1
P ! ok

Scala I:

class TheActor(x:Int) extends Actor { 
   def act = loop {
      react {
         case “ok” => println(x); exit // needs to explicitly exit loop
         case i:Int => x += i
      }
   }
}
val a = new TheActor(0)
a ! 1
a ! “ok”

Scala II:

val a = actor { 
   def loop(x:Int) = {
      react {
         case "ok" => println(x)
         case i:Int => loop(x + i)
      }
   }
   loop(0)
}
a ! 1
a ! "ok"

Part II. Processes vs Actors

Something I

Erlang:

  • Lightweight processes
  • You can always (almost) create a new process for each new comer
  • Scheduler treats all processes fairly
  • Share nothing between processes
  • Lightweight context switch between processes
  • IO has been carefully delegated to independent processes

Scala:

  • Active actor is delegated to JVM thread, actor /= thread
  • You can create a new actor for each new comer
  • But the amount of real workers (threads) is dynamically adjusted according to the processing time
  • The later comers may be in wait list for further processing until a spare thread is available
  • Share nothing or share something upon you decision
  • Heavy context switch between working threads
  • IO block is still pain unless good NIO framework (Grizzly?)

Something II

Erlang:

  • Try to service everyone simultaneously
  • But may loss service quality when the work is heavy, may time out (out of service)
  • Ideal when processing cost is comparable to context switching cost
  • Ideal for small message processing in soft real-time
  • Bad for massive data processing, and cpu-heavy work

Scala:

  • Try to service limited number of customers best first
  • If can not service all, the later comers will be put in waiting list and may time out (out of service)
  • It's difficult for soft real-time on all coming concurrent customers
  • Ideal when processing cost is far more than context switching cost (context switch time is in ns on modern JVM)
  • When will there be perfect NIO + Actor library?

Tuesday Nov 25, 2008

Erlang Plugin for NetBeans - 0.17.0 Released

I'm pleased to announce Erlang plugin for NetBeans (ErlyBird) 0.17.0 is released.

This is a bug-fix release, and from now on, will be in form of NetBeans plugin.

NetBeans 6.5 is a requirement.

To download, please go to: https://sourceforge.net/project/showfiles.php?group_id=192439&package_id=226387&release_id=642911

To install:

  1. Open NetBeans, go to "Tools" -> "Plugins", click on "Downloaded" tab title, click on "Add Plugins..." button, choose the directory where the Erlang plugin are unzipped, select all listed *.nbm files, following the instructions. Restart IDE.
  2. Check/set your OTP path. From [Tools]->[Options], click on 'Miscellanous', then expand 'Erlang Installation', fill in the full path of your 'erl.exe' or 'erl' file. For instance: "C:/erl/bin/erl.exe"

When you open/create an Erlang project first time, the OTP libs will be indexed. The indexing time varies from 30 to 60 minutes depending on your computer.

Feedback and bug reports are welcome.

Saturday Nov 22, 2008

NetBeans on OpenSolaris 08.11 in VirtualBox in Mac OS

My Macbook is ân old one with Mac OS X 10.4, I have no way to get Java 6. I´ve tracked OpenSolaris for a long time, with OpenSolaris 08.11 is going to be released, I think I should have a try to see if I can do my daily work on OpenSolaris rather than Mac OS on my MacBook.

I then installed VirtualBox on my MacOS, then a guest OpenSolaris with 1024M memory and 16G disk. I downloaded and installed Java JDK 6 and NetBeans 6.5, plus my Scala plugins.

It rocks seamlessly. The only problem is, should I re-format my harddisk and install OpenSolaris instead of Mac OS now?

There are a lot of guesses on Sun´s future these days, but, with all those innovations from Sun or taken by Sun, Why cloud-computing? Should still be sunshine-computing.

Click on the picture to enlarge it

nn

Tuesday Nov 18, 2008

Scala Plugin for Coming NetBeans 6.5 Official Release

I'm pleased to announce the availability of Scala plugin for coming NetBeans 6.5 official release.

  • Much better code-completion
  • Two new color themes: Twilight and Emacs Standard
  • Various bugs fixes
  • It's not perfect, but fairly stable
  • Works good with NetBeans Maven plugin

To download, please go to: https://sourceforge.net/project/showfiles.php?group_id=192439&package_id=256544&release_id=641359

For more information, please see http://wiki.netbeans.org/Scala

Bug reports are welcome.

It works also on NetBeans RC2. If you have previous version of Scala plugin installed, you can upgrade to this version.

Sunday Nov 09, 2008

Scala for NetBeans Screenshot#15: Twilight Color Theme

>>>Updated Nov 11:
Emacs Standard color theme already be there.
======

I'm beginning to write some real thing based on Liftweb. The more code I wrote, the more bugs of Scala plugin were fixed.

Not only bugs are being fixed, I also created a Twilight color theme for this plugin. And an Emacs color theme is also on the road.

When NetBeans 6.5 is official released, I'll put a new Scala plugin too.

Click on the picture to enlarge it

nn

nn

Friday Nov 07, 2008

New Scala Plugin for NetBeans 6.5 RC2 Is Packed

I packed a new Scala plugin, and tried several times to upload it to NetBeans' Plugins Portal and could not get job successfully done. So I uploaded it to sourceforge.net instead. The zip file is located at: scala-plugin-081106.zip

NetBeans' trunk has been targeting 7.0 now, the plugin on "Last Development Build" update center is no longer compatible with 6.5 RC2. That is, you should ignore my previous blog talked about installing plugin on 6.5 RC2, if you are using 6.5 RC2, you should download and install this one.

Nightly built version users can get latest plugin via NetBeans' plugin management feature as normal.

For more information, please visit http://wiki.netbeans.org/Scala

This version fixed various bugs, and with some enhancements. The bundling Scala runtime is 2.7.2RC6, with NetBeans' maven plugin, it works on newest Liftweb project.

Bug reports are always welcome.

Click on the picture to enlarge it

nn

Thursday Oct 30, 2008

Scala for NetBeans Screenshot#14: Refined Color Theme

With more Scala coding experience, I refined color theme of Scala plugin for NetBeans. By desalinating Type name a bit, I found I can concentrate on logic a bit better. And all function calls are highlighted now, so, for multiple-line expression, when error-prone op is put on wrong new line, you can get some hints at once. It also gives you hint if a val/var is with implicit calling, which will be highlighted as a function call.

There are still some bugs when infer var/val's correct type in some cases.

Now the editor is much informative with highlighting and bubble pop display of type information (move mouse on it with CTRL/COMMAND pressed).

You'll need to update to newest 1.9.0 Editing module, please wait for it appealing on Development Update Center.

Click on the picture to enlarge it

nn

Install Scala Plugin for NetBeans 6.5 RC2

>>> Updated Nov 8
Content of this blog is out of date, please look at New Scala Plugin for NetBeans 6.5 RC2 Is Packed
===

NetBeans 6.5 RC2 released. The Scala plugin on NetBeans' Plugins Portal is not compilable with RC2. To get Scala plugin working with RC2, do:

# Open NetBeans 6.5 RC2, go to "Tools" -> "Plugins", check "Setting" -> "Add", add new update center as "Last Development Build" with url: http://deadlock.netbeans.org/hudson/job/nbms-and-javadoc/lastStableBuild/artifact/nbbuild/nbms/updates.xml.gz

# Then in the "Available Plugins" tab, you can find the "Scala" category (or, you can click on "Name" in "Available Plugins" tab to find them. You may need to click "Reload Catalog" to get the latest available modules), check "Scala Kit" and click "Install", following the instructions. Restart IDE.

I'll re-pack a new version of Scala plugin for Plugins Portal when NetBeans 6.5 is officially released.

Monday Oct 27, 2008

RPC Server for Erlang, In Scala

There has been Java code in my previous blog: RPC Server for Erlang, In Java, I'm now try to rewrite it in Scala. With the pattern match that I've been familiar with in Erlang, write the Scala version is really a pleasure. You can compare it with the Java version.

I do not try Scala's actor lib yet, maybe late.

And also, I should port Erlang's jinterface to Scala, since OtpErlangTuple, OtpErlangList should be written in Scala's Tuple and List.

The code is auto-formatted by NetBeans' Scala plugin, and the syntax highlighting is the same as in NetBeans, oh, not exactly.

/*
 * RpcMsg.scala
 *
 */
package net.lightpole.rpcnode

import com.ericsson.otp.erlang.{OtpErlangAtom, OtpErlangList, OtpErlangObject, OtpErlangPid, OtpErlangRef, OtpErlangTuple}

class RpcMsg(val call:OtpErlangAtom,
             val mod :OtpErlangAtom,
             val fun :OtpErlangAtom,
             val args:OtpErlangList,
             val user:OtpErlangPid,
             val to  :OtpErlangPid,
             val tag :OtpErlangRef) {
}

object RpcMsg {
   
   def apply(msg:OtpErlangObject) : Option[RpcMsg] = msg match {
      case tMsg:OtpErlangTuple =>
         tMsg.elements() match {
            /* {'$gen_call', {To, Tag}, {call, Mod, Fun, Args, User}} */
            case Array(head:OtpErlangAtom, from:OtpErlangTuple, request:OtpErlangTuple) =>
               if (head.atomValue.equals("$gen_call")) {
                  (from.elements, request.elements) match {
                     case (Array(to :OtpErlangPid,
                                 tag:OtpErlangRef), Array(call:OtpErlangAtom,
                                                          mod :OtpErlangAtom,
                                                          fun :OtpErlangAtom,
                                                          args:OtpErlangList,
                                                          user:OtpErlangPid)) =>
                        if (call.atomValue.equals("call")) {
                           Some(new RpcMsg(call, mod, fun, args, user, to, tag))
                        } else None
                     case _ => None
                  }
               } else None
            case _ => None
         }
      case _ => None
   }
}

/*
 * RpcNode.scala
 *
 * To change this template, choose Tools | Template Manager
 * and open the template in the editor.
 */
package net.lightpole.rpcnode

import com.ericsson.otp.erlang.{OtpAuthException, OtpConnection, OtpErlangAtom, OtpErlangExit, OtpErlangObject, OtpErlangString, OtpErlangTuple, OtpSelf}
import java.io.IOException
import java.net.InetAddress
import java.net.UnknownHostException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.logging.Level
import java.util.logging.Logger


trait Cons {
   val OK     = new OtpErlangAtom("ok")
   val ERROR  = new OtpErlangAtom("error")
   val STOPED = new OtpErlangAtom("stoped")
   val THREAD_POOL_SIZE = 100
}

/**
 *
 * Usage:
 *   $ erl -sname clientnode -setcookie mycookie
 *   (clientnode@cmac)> rpc:call(xnodename@cmac, xnode, parse, []).
 *
 * @author Caoyuan Deng
 */
abstract class RpcNode(xnodeName:String, cookie:String, threadPoolSize:Int) extends Cons {
   
   def this(xnodeName:String, cookie:String) = this(xnodeName, cookie, 100)
   
   private var xSelf:OtpSelf = _
   private var sConnection:OtpConnection = _
   private var execService:ExecutorService = Executors.newFixedThreadPool(threadPoolSize)
   private val flags = Array(0)

   startServerConnection(xnodeName, cookie)
   loop
    
   def startServerConnection(xnodeName:String, cookie:String ) = {
      try {
         xSelf = new OtpSelf(xnodeName, cookie);
         // The node then publishes its port to the Erlang Port Mapper Daemon.
         // This registers the node name and port, making it available to a remote client process.
         // When the port is published it is important to immediately invoke the accept method.
         // Forgetting to accept a connection after publishing the port would be the programmatic
         // equivalent of false advertising
         val registered = xSelf.publishPort();
         if (registered) {
            System.out.println(xSelf.node() + " is ready.");
            /**
             * Accept an incoming connection from a remote node. A call to this
             * method will block until an incoming connection is at least
             * attempted.
             */
            sConnection = xSelf.accept();
         } else {
            System.out.println("There should be an epmd running, start an epmd by running 'erl'.");
         }
      } catch {
         case ex:IOException =>
         case ex:OtpAuthException =>
      }
   }

   def loop : Unit = {
      try {
         val msg = sConnection.receive
            
         val task = new Runnable() {
            override
            def run = RpcMsg(msg) match {
               case None =>
                  try {
                     sConnection.send(sConnection.peer.node, new OtpErlangString("unknown request"));
                  } catch {
                     case ex:IOException =>
                  }
               case Some(call) =>
                  val t0 = System.currentTimeMillis

                  flag(0) = processRpcCall(call)

                  System.out.println("Rpc time: " + (System.currentTimeMillis() - t0) / 1000.0)
            }
         }

         execService.execute(task)

         if (flag(0) == -1) {
            System.out.println("Exited")
         } else loop
         
      } catch {
         case ex:IOException => loop
         case ex:OtpErlangExit =>
         case ex:OtpAuthException =>
      }
   }

   /** @throws IOException */
   def sendRpcResult(call:RpcMsg, head:OtpErlangAtom, result:OtpErlangObject) = {
      val tResult = new OtpErlangTuple(Array(head, result))

      // Should specify call.tag here
      val msg = new OtpErlangTuple(Array(call.tag, tResult))
      // Should specify call.to here
      sConnection.send(call.to, msg, 1024 * 1024 * 10)
   }

   /** @abstact */
   def processRpcCall(call:RpcMsg) : Int
}

object RpcCall {   
   def getShortLocalHost : String = getLocalHost(false)

   def getLongLocalHost : String = getLocalHost(true)

   def getLocalHost(longName:Boolean) : String = {
      var localHost = "localhost"
      try {
         localHost = InetAddress.getLocalHost.getHostName;
         if (!longName) {
            /* Make sure it's a short name, i.e. strip of everything after first '.' */
            val dot = localHost.indexOf(".")
            if (dot != -1) localHost = localHost.substring(0, dot)
         }
      } catch {
         case ex:UnknownHostException =>
      }

      localHost
   }
}

Tuesday Oct 21, 2008

FOR, WHILE Is Too Easy, Let's Go Looping

With several 10k code in Erlang, I'm familiar with functional style coding, and I found I can almost rewrite any functions in Erlang to Scala, in syntax meaning.

Now, I have some piece of code written in Java, which I need to translate them to Scala. Since "for", "while", or "do" statement is so easy in Java, I can find a lot of them in Java code. The problem is, should I keep them in the corresponding "for", "while", "do" in Scala, or, as what I do in Erlang, use recursive function call, or, "loop"?

I sure choose to loop, and since Scala supports recursive function call on functions defined in function body (Erlang doesn't), I choose define these functions' name as "loop", and I tried to write code let "loop" looks like a replacement of "for", "while" etc.

Here's a piece of code that is used to read number string and convert to double, only piece of them.

The Java code:

public class ReadNum {

    private double readNumber(int fstChar, boolean isNeg) {
        StringBuilder out = new StringBuilder(22);
        out.append(fstChar);
        
        double v = '0' - fstChar;
        // the maxima length of number stirng won't exceed 22
        for (int i = 0; i < 22; i++) {
            int c = getChar();
            switch (c) {
                case '0':
                case '1':
                case '2':
                case '3':
                case '4':
                case '5':
                case '6':
                case '7':
                case '8':
                case '9':
                    v = v * 10 - (c - '0');
                    out.append(c);
                    continue;
                case '.':
                    out.append('.');
                    return readFrac(out, 22 - i);
                case 'e':
                case 'E':
                    out.append(c);
                    return readExp(out, 22 - i);
                default:
                    if (c != -1) backup(1);
                    if (!isNeg) return v; else return -v
            }
        }
        return 0;
    }
}

The Scala code:

class ReadNum {
   private
   def readNumber(fstChar:Char, isNeg:Boolean) :Double = {
      val out = new StringBuilder(22)
      out.append(fstChar)

      val v:Double = '0' - fstChar
      def loop(c:Char, v:Double, i:Int) :Double = c match {
         // the maxima length of number stirng won't exceed 22
         case _ if i > 21 =>
            0
         case '0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9' =>
            out.append(c)
            val v1 = v * 10 - (c - '0')
            loop(getChar, v1, i + 1)
         case '.' =>
            out.append('.')
            readFrac(out, 22 - i)
         case 'e' | 'E' =>
            out.append(c)
            readExp(out, 22 - i)
         case _ =>
            if (c != -1) backup(1)
            if (isNeg) v else -v
      }; loop(getChar, v, 1)
   }
}
As you can see in line 25, the loop call is put at the position immediately after the "loop" definition, following "}; ", I don't put it to another new line, it makes me aware of the "loop" function is just used for this call.

And yes, I named all these embedded looping function as "loop", every where.

Wednesday Oct 15, 2008

An Example Syntax in Haskell, Erlang and Scala

>>> Updated Oct 16:
I found some conventions of coding style make code more readable for Scala. For example, use

{ x => something } instead of (x => dosomething)

for anonymous function; Use x, y, z as the names of arguments of anonymous functions; Put all modifiers to the ahead line etc. That makes me can pick up these functions by eyes quickly.
======

It's actually my first time to write true Scala code, sounds strange? Before I write Scala code, I wrote a Scala IDE first, and am a bit familiar with Scala syntax now. And I've got about 1.5 year experience on Erlang, it began after I wrote ErlyBird.

Now it's time to write some real world Scala code, I choose to port Paul R. Brown's perpubplat blog engine, which is written in Haskell. And I have also some curiosities on how the syntax looks in Erlang, so I tried some Erlang code too.

Here's some code piece of entry module in Haskell, Erlang and Scala:

Original Haskell code piece

empty :: Model
empty = Model M.empty M.empty M.empty [] 0

build_model :: [Item] -> Model
build_model [] = empty
build_model items = Model (map_by permatitle sorted_items)
                    bid                    
                    (build_child_map sorted_items)
                    (sorted_items)
                    (n+1)
    where
      sorted_items = sort_by_created_reverse items
      bid = (map_by internal_id sorted_items)
      n = fst . M.findMax $ bid

build_child_map :: [Item] -> M.Map Int [Int]
build_child_map i = build_child_map_ (M.fromList $ (map (\x -> (internal_id x,[])) i)) i

-- Constructed to take advantage of the input being in sorted order.
build_child_map_ :: M.Map Int [Int] -> [Item] -> M.Map Int [Int]
build_child_map_ m [] = m
build_child_map_ m (i:is) = if (parent i == Nothing) then
                                build_child_map_ m is
                            else
                                build_child_map_ (M.insertWith (++) (unwrap $ parent i) [internal_id i] m) is

sort_by_created_reverse :: [Item] -> [Item]
sort_by_created_reverse = sortBy created_sort_reverse

created_sort_reverse :: Item -> Item -> Ordering
created_sort_reverse a b = compare (created b) (created a)

In Erlang:

% @spec empty :: Model
empty() -> #model{}.

% @spec build_model :: [Item] -> Model
build_model([]) -> empty();
build_model(Is) -> 
    SortedIs = sort_by_created_reverse(Is),
    Bid = dict:from_list([{I#item.internal_id, I} || I <- SortedIs]),
    N = lists:max(dict:fetch_keys(Bid)),
    
    #model{by_permatitle = dict:from_list([{X#item.permatitle, X} || X <- SortedIs]),
           by_int_id = Bid,               
           child_map = build_child_map(SortedIs),
           all_items = SortedIs,
           next_id = N + 1}.


% @spec build_child_map :: [Item] -> M.Map Int [Int]
build_child_map(Is) -> build_child_map_(dict:from_list(lists:map(fun (X) -> {X#item.internal_id, []} end), Is), Is).

%% Constructed to take advantage of the input being in sorted order.
% @spec build_child_map_ :: M.Map Int [Int] -> [Item] -> M.Map Int [Int]
build_child_map_(D, []) -> D;
build_child_map_(D, [I|Is]) -> 
    case I#item.parent of 
        undefined ->                
            build_child_map_(D, Is);
        P_Id ->
            build_child_map_(dict:append(unwrap(P_Id), I#item.internal_id, D), Is)
    end.

% @spec sort_by_created_reverse :: [Item] -> [Item]
sort_by_created_reverse(Is) -> lists:sort(fun created_sort_reverse/2, Is).

% @spec created_sort_reverse :: Item -> Item -> Ordering
created_sort_reverse(A, B) -> compare(B#item.created, A#item.created).

In Scala

object Entry {
    def empty = new Model()

    def build_model(is:List[Item]) = is match {
        case Nil => empty
        case _ =>
            val sortedIs = sortByCreatedReverse(is)
            val bid = Map() ++ sortedIs.map{ x => (x.internalId -> x) }
            val n = bid.keys.toList.sort{ (x, y) => x > y }.head // max

            new Model(Map() ++ sortedIs.map{ x => (x.permatitle -> x) },
                      bid,
                      buildChildMap(sortedIs),
                      sortedIs,
                      n + 1)
    }

    def buildChildMap(is:List[Item]) = buildChildMap_(Map() ++ is.map{ x => (x.internalId -> Nil) }, is)

    private
    def buildChildMap_(map:Map[Int, List[Int]], is:List[Item]) = {
        map ++ (for (i <- is if i.parent.isDefined; pid = i.parent.get; cids = map.getOrElse(pid, Nil)) 
                yield (pid -> (cids + i.internalId)))
    }

    def sortByCreatedReverse(is:List[Item]) = is.sort{ (x, y) => x.created before y.created }
}

>>> Updated Oct 16: Per Martin's suggestion, the above code can be written more Scala style (the reasons are in the comments). Thanks, Martin.

object Entry {
   def empty = new Model()

   def build_model(is:List[Item]) = is match {
       case Nil => empty
       case _ =>
           val sortedIs = sortByCreatedReverse(is)
           val bid = Map() ++ sortedIs.map{ x => (x.internalId -> x) }
           // use predefined max in Iterable
           val n = Iterable.max(bid.keys.toList)   

           new Model(Map() ++ sortedIs.map{ x => (x.permatitle -> x) },
                     bid,
                     buildChildMap(sortedIs),
                     sortedIs,
                     n + 1)
   }

   // you can use a wildcard anonymousfunction here
   def buildChildMap(is:List[Item]) = buildChildMap_(Map() ++ is.map(_.internalId -> Nil), is)

   private
   def buildChildMap_(map:Map[Int, List[Int]], is:List[Item]) =
       map ++ {  // rewrite for so that definitions go into body -- it's more efficient.
           for (i <- is if i.parent.isDefined) yield {
               val pid = i.parent.get
               val cids = map.getOrElse(pid, Nil)
               pid -> (cids + i.internalId)
           }
       }
       
   // you can use a wildcard anonymous function here
   def sortByCreatedReverse(is:List[Item]) = is.sort{ _.created before _.created } 
}
======

I use ErlyBird for Erlang coding, and Scala for NetBeans for Scala coding. The experience seems that IDE is much aware of Scala, and I can get the typing a bit faster than writing Erlang code.

If you are not familiar with all these 3 languages, which one looks more understandable?

Friday Oct 10, 2008

RPC Server for Erlang, In Java

We are using Erlang to do some serious things, one of them is indeed part of a banking system. Erlang is a perfect language in concurrent and syntax (yes, I like its syntax), but lacks static typing (I hope new added -spec and -type attributes may be a bit helping), and, is not suitable for processing massive data (performance, memory etc). I tried parsing a 10M size XML file with xmerl, the lib for XML in OTP/Erlang, which causes terrible memory disk-swap and I can never get the parsed tree out.

It's really a need to get some massive data processed in other languages, for example, C, Java etc. That's why I tried to write RPC server for Erlang, in Java.

There is a jinterface lib with OTP/Erlang, which is for communication between Erlang and Java. And there are docs for how to get it to work. But, for a RPC server that is called from Erlang, there are still some tips for real world:

1. When you send back the result to caller, you need set the result as a tuple, with caller's tag Ref as the first element, and the destination should be the caller's Pid. It's something like:

OtpErlangTuple msg = new OtpErlangTuple(new OtpErlangObject[] {call.tag, tResult});
sConnection.send(call.to, msg); 

where, call.tag is a OtpErlangRef, and tResult can be any OtpErlangObject, call.to is a OtpErlangPid.

2. If you need to send back a massive data back to caller, the default buffer size of OtpErlangOutputStream is not good, I set it to 1024 * 1024 * 10

3. Since there may be a lot of concurrent callers call your RPC server, you have to consider the concurrent performance of your server, I choose using thread pool here.

The RPC server in Java has two class, RpcNode.java, and RpcMsg.java:

package net.lightpole.rpcnode;

import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangList;
import com.ericsson.otp.erlang.OtpErlangObject;
import com.ericsson.otp.erlang.OtpErlangPid;
import com.ericsson.otp.erlang.OtpErlangRef;
import com.ericsson.otp.erlang.OtpErlangTuple;

/**
 *
 * @author Caoyuan Deng
 */
public class RpcMsg {

    public OtpErlangAtom call;
    public OtpErlangAtom mod;
    public OtpErlangAtom fun;
    public OtpErlangList args;
    public OtpErlangPid user;
    public OtpErlangPid to;
    public OtpErlangRef tag;

    public RpcMsg(OtpErlangTuple from, OtpErlangTuple request) throws IllegalArgumentException {
        if (request.arity() != 5) {
            throw new IllegalArgumentException("Not a rpc call");
        }

        /* {call, Mod, Fun, Args, userPid} */
        if (request.elementAt(0) instanceof OtpErlangAtom && ((OtpErlangAtom) request.elementAt(0)).atomValue().equals("call") &&
                request.elementAt(1) instanceof OtpErlangAtom &&
                request.elementAt(2) instanceof OtpErlangAtom &&
                request.elementAt(3) instanceof OtpErlangList &&
                request.elementAt(4) instanceof OtpErlangPid &&
                from.elementAt(0) instanceof OtpErlangPid &&
                from.elementAt(1) instanceof OtpErlangRef) {

            call = (OtpErlangAtom) request.elementAt(0);
            mod = (OtpErlangAtom) request.elementAt(1);
            fun = (OtpErlangAtom) request.elementAt(2);
            args = (OtpErlangList) request.elementAt(3);
            user = (OtpErlangPid) request.elementAt(4);
            to = (OtpErlangPid) from.elementAt(0);
            tag = (OtpErlangRef) from.elementAt(1);

        } else {
            throw new IllegalArgumentException("Not a rpc call.");
        }
    }

    /* {'$gen_call', {To, Tag}, {call, Mod, Fun, Args, User}} */
    public static RpcMsg tryToResolveRcpCall(OtpErlangObject msg) {
        if (msg instanceof OtpErlangTuple) {
            OtpErlangTuple tMsg = (OtpErlangTuple) msg;
            if (tMsg.arity() == 3) {
                OtpErlangObject[] o = tMsg.elements();
                if (o[0] instanceof OtpErlangAtom && ((OtpErlangAtom) o[0]).atomValue().equals("$gen_call") &&
                        o[1] instanceof OtpErlangTuple && ((OtpErlangTuple) o[1]).arity() == 2 &&
                        o[2] instanceof OtpErlangTuple && ((OtpErlangTuple) o[2]).arity() == 5) {
                    OtpErlangTuple from = (OtpErlangTuple) o[1];
                    OtpErlangTuple request = (OtpErlangTuple) o[2];

                    try {
                        return new RpcMsg(from, request);
                    } catch (IllegalArgumentException ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
        
        return null;
    }
}

package net.lightpole.rpcnode;

import com.ericsson.otp.erlang.OtpAuthException;
import com.ericsson.otp.erlang.OtpConnection;
import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangExit;
import com.ericsson.otp.erlang.OtpErlangObject;
import com.ericsson.otp.erlang.OtpErlangString;
import com.ericsson.otp.erlang.OtpErlangTuple;
import com.ericsson.otp.erlang.OtpSelf;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 *
 * Usage:
 *   $ erl -sname clientnode -setcookie mycookie
 *   (clientnode@cmac)> rpc:call(xnodename@cmac, 'System', currentTimeMillis, []).
 * 
 * @author Caoyuan Deng
 */
public abstract class RpcNode {

    public static final OtpErlangAtom OK = new OtpErlangAtom("ok");
    public static final OtpErlangAtom ERROR = new OtpErlangAtom("error");
    public static final OtpErlangAtom STOPED = new OtpErlangAtom("stoped");
    private static final int THREAD_POOL_SIZE = 100;
    private OtpSelf xSelf;
    private OtpConnection sConnection;
    private ExecutorService execService;

    public RpcNode(String xnodeName, String cookie) {
        this(xnodeName, cookie, THREAD_POOL_SIZE);
    }

    public RpcNode(String xnodeName, String cookie, int threadPoolSize) {
        execService = Executors.newFixedThreadPool(threadPoolSize);

        startServerConnection(xnodeName, cookie);
        loop();
    }

    private void startServerConnection(String xnodeName, String cookie) {
        try {
            xSelf = new OtpSelf(xnodeName, cookie);
            boolean registered = xSelf.publishPort();
            if (registered) {
                System.out.println(xSelf.node() + " is ready.");
                /**
                 * Accept an incoming connection from a remote node. A call to this
                 * method will block until an incoming connection is at least
                 * attempted.
                 */
                sConnection = xSelf.accept();
            } else {
                System.out.println("There should be an epmd running, start an epmd by running 'erl'.");
            }
        } catch (IOException ex) {
            Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
        } catch (OtpAuthException ex) {
            Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    private void loop() {
        while (true) {
            try {
                final int[] flag = {0};

                final OtpErlangTuple msg = (OtpErlangTuple) sConnection.receive();

                Runnable task = new Runnable() {

                    public void run() {
                        RpcMsg call = RpcMsg.tryToResolveRcpCall(msg);

                        if (call != null) {
                            long t0 = System.currentTimeMillis();

                            flag[0] = processRpcCall(call);

                            System.out.println("Rpc time: " + (System.currentTimeMillis() - t0) / 1000.0);
                        } else {
                            try {
                                sConnection.send(sConnection.peer().node(), new OtpErlangString("unknown request"));
                            } catch (IOException ex) {
                                Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
                            }
                        }
                    }
                };

                execService.execute(task);

                if (flag[0] == -1) {
                    System.out.println("Exited");
                    break;
                }

            } catch (OtpErlangExit ex) {
                Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
            } catch (IOException ex) {
                Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
            } catch (OtpAuthException ex) {
                Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

    protected void sendRpcResult(RpcMsg call, OtpErlangAtom head, OtpErlangObject result) throws IOException {
        OtpErlangTuple tResult = new OtpErlangTuple(new OtpErlangObject[] {head, result});

        // Should specify call.tag here
        OtpErlangTuple msg = new OtpErlangTuple(new OtpErlangObject[]{call.tag, tResult});
        // Should specify call.to here
        sConnection.send(call.to, msg, 1024 * 1024 * 10); 
    }

    public abstract int processRpcCall(RpcMsg call);
    

    // ------ helper
    public static String getShortLocalHost() {
        return getLocalHost(false);
    }

    public static String getLongLocalHost() {
        return getLocalHost(true);
    }

    private static String getLocalHost(boolean longName) {
        String localHost;
        try {
            localHost = InetAddress.getLocalHost().getHostName();
            if (!longName) {
                /* Make sure it's a short name, i.e. strip of everything after first '.' */
                int dot = localHost.indexOf(".");
                if (dot != -1) {
                    localHost = localHost.substring(0, dot);
                }
            }
        } catch (UnknownHostException e) {
            localHost = "localhost";
        }

        return localHost;
    }
}

As you can see, the RpcNode is an abstract class, by implement int processRpcCall(RpcMsg call), you can get your what ever wanted features. For example:

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package net.lightpole.xmlnode;

import basexnode.Main;
import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangList;
import com.ericsson.otp.erlang.OtpErlangObject;
import com.ericsson.otp.erlang.OtpErlangString;
import java.io.IOException;
import net.lightpole.rpcnode.RpcMsg;
import net.lightpole.rpcnode.RpcNode;

/**
 *
 * @author dcaoyuan
 */
public class MyNode extends RpcNode {

    public MyNode(String xnodeName, String cookie, int threadPoolSize) {
        super(xnodeName, cookie, threadPoolSize);
    }

    @Override
    public int processRpcCall(RpcMsg call) {
        final String modStr = call.mod.atomValue();
        final String funStr = call.fun.atomValue();
        final OtpErlangList args = call.args;

        try {
            OtpErlangAtom head = ERROR;
            OtpErlangObject result = null;

            if (modStr.equals("xnode") && funStr.equals("stop")) {
                head = OK;
                sendRpcResult(call, head, STOPED);
                return -1;
            }

            if (modStr.equals("System") && funStr.equals("currentTimeMillis")) {
                head = OK;
                long t = System.currentTimeMillis();
                result = new OtpErlangLong(t);
            } else {
                result = new OtpErlangString("{undef,{" + modStr + "," + funStr + "}}");
            }

            if (result == null) {
                result = new OtpErlangAtom("undefined");
            }

            sendRpcResult(call, head, result);
        } catch (IOException ex) {
            ex.printStackTrace();
        } catch (Exception ex) {
        }

        return 0;
    }
}

I tested MyNode by:

$ erl -sname clientnode -setcookie mycookie
...
(clientnode@cmac)> rpc:call(xnodename@cmac, 'System', currentTimeMillis, []).

And you can try to test its concurrent performance by:

%% $ erl -sname clientnode -setcookie mycookie
%% > xnode_test:test(10000)

-module(xnode_test).

-export([test/1]).

test(ProcN) ->
    Workers = [spawn_worker(self(), fun rpc_parse/1, {})        
     	       || I <- lists:seq(0, ProcN - 1)],
    Results = [wait_result(Worker) || Worker <- Workers].

rpc_parse({}) ->
    rpc:call(xnodename@cmac, 'System', currentTimeMillis, []).

spawn_worker(Parent, F, A) ->
    erlang:spawn_monitor(fun() -> Parent ! {self(), F(A)} end).

wait_result({Pid, Ref}) ->
    receive
        {'DOWN', Ref, _, _, normal} -> receive {Pid, Result} -> Result end;
        {'DOWN', Ref, _, _, Reason} -> exit(Reason)
    end.

I spawned 10000 calls to it, and it run smoothly.

I'm also considering to write a more general-purpose RPC server in Java, which can dynamically call any existed methods of Java class.