AIOTrade: Progress of Migrating to Scala - DSL and Parallel Computing
Well, I've migrated the core math, indicator parts of AIOTrade to Scala. There are two noticeable advances for new AIOTrade.
The first is the look and feel of indicator writing. Since Scala is suitable for DSL, now the indicator looks like:
class ARBRIndicator extends ContIndicator { _sname = "AR/BR" _grids = Array(50f, 200f) val period = Factor("Period", 10) val up = Var[Float]("up") val dn = Var[Float]("dn") val bs = Var[Float]("bs") val ss = Var[Float]("ss") val ar = Var[Float]("AR", Plot.Line) val br = Var[Float]("BR", Plot.Line) def computeCont(begIdx:Int, size:Int) { for (i <- begIdx until size) { up(i) = H(i) - O(i) val up_sum_i = sum(i, up, period) dn(i) = O(i) - L(i) val dn_sum_i = sum(i, dn, period) ar(i) = up_sum_i / dn_sum_i * 100 val bs_tmp = H(i) - C(i) bs(i) = Math.max(0, bs_tmp) val bs_sum_i = sum(i, bs, period) val ss_tmp = C(i) - L(i) ss(i) = Math.max(0, ss_tmp) val ss_sum_i = sum(i, ss, period) br(i) = bs_sum_i / ss_sum_i * 100 } } }
Vs Java one:
public class ARBRIndicator extends ContIndicator { _sname = "AR/BR"; _grids = new Float[] {50f, 200f}; Opt period = new DefaultOpt("Period", 10); Var<Float> up = new DefaultVar("up"); Var<Float> dn = new DefaultVar("dn"); Var<Float> bs = new DefaultVar("bs"); Var<Float> ss = new DefaultVar("ss"); Var<Float> ar = new DefaultVar("AR", Plot.Line); Var<Float> br = new DefaultVar("BR", Plot.Line); void computeCont(int begIdx) { for (int i = begIdx; i < _itemSize; i++) { up.set(i, H.get(i) - O.get(i)); float up_sum_i = sum(i, up, period); dn.set(i, O.get(i) - L.get(i)); float dn_sum_i = sum(i, dn, period); ar.set(i, up_sum_i / dn_sum_i * 100); float bs_tmp = H.get(i) - C.get(i); bs.set(i, Math.max(0, bs_tmp)); float bs_sum_i = sum(i, bs, period); float ss_tmp = C.get(i) - L.get(i); ss.set(i, Math.max(0, ss_tmp)); float ss_sum_i = sum(i, ss, period); br.set(i, bs_sum_i / ss_sum_i * 100); } } }
The apply method from Scala simplifies setter/getter, which makes the formulator looks more natural.
The second is by implementing each indicator as Actor, the computing procedure of indicators can be easily distributed to multiple CPU cores, with so few code modification:
case object Compute trait Computable extends Actor { // ----- actor's implementation def act = loop { react { case (Compute, fromTime:Long) => computeFrom(fromTime) case _ => } } // ----- end of actor's implementation def computeFrom(time:Long) :Unit def computedTime :Long def factors :ArrayBuffer[Factor] def factors_=(factors:ArrayBuffer[Factor]) :Unit def factors_=(values:Array[Number]) :Unit def dispose :Unit }
Computable is an Interface/Trait with sync method: computeFrom(Long), now by extending Computable with Actor, implementing a simple function act with react message processing block, all indicators (which extended Computable) can benefit from parallel computing now by calling:
indicator ! (Compute, fromTime)
instead of
indicator.computeFrom(time)
I've done some testing on my 4-core machine, which occupied about 380% CPU usage during running. This is, of course, a most easily implementation for parallel computing under JVM so far.
Another bonus is, I do not need to worry about concurrent calling on computeFrom(Long) now, since all calls will be triggered by Compute messages that are sent to actor's message queue, then be processed sequentially, there is no lock needed any more. The key:
Parallel computing actors + Sequential message driven computing per actor
![(please configure the [header_logo] section in trac.ini)](/chrome/site/blog_logo.png)
rss
Comments
No comments.