JESS: JESS for Complex Event Processing

View: New views
4 Messages — Rating Filter:   Alert me  

JESS: JESS for Complex Event Processing

by Paul Fodor-4 :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

We are trying to apply Jess for complex event processing, but we have some problems with its speed. I want to check with this mailing list our sample program before we abandon the approach altogether. We also wonder if there is any way to speed it up. In our experiments, we detected complex events in streams of stock tickers or transactions of the form stock(Company,Price,Volume) (see below the rules in Jess while the events are represented as the facts pumped into the system with time stamps).

I can provide all of our code for anyone that needs it (beside what I already pasted below).
We get some very bad times and we also run of of memory for large datasets (10k stock ticks), while we can go up to millions of ticks and ~1% of the execution time with CEP systems (Drools Fusion, Esper, Etalis, etc.). I know that Jess might not be fit for this task, but we just wanted to try it since we saw some works applying it (or other production rule systems) for complex event processing.
Please tell me if you see or know any optimizations. 

Thank you, regards,
Paul Fodor 

test.clp:
; Complex events:
; stock("GOOG",Pr1,_)*stock("GOOG",Pr2,_)* (Pr1*0.8 > Pr2)) -> ce1.
; stock("MSFT",Pr1,_)*stock("MSFT",Pr2,_)* (Pr1*0.8 > Pr2)) -> ce2.
; ce1 \/ ce2 -> ce3.
; ce1 /\ ce2 -> ce4 {[T1,T2],T2-T1 <20}.
; ce1 # ce2 -> ce5.
; ce1*not(ce2)*ce1 -> ce6.

(deftemplate stockEvent "define stock event template" (slot symbol) (slot price) (slot time1) (slot time2))
(deftemplate ce1 "Define complex event temp" (slot time1) (slot time2))
(deftemplate ce2 "Define complex event temp" (slot time1) (slot time2))
(deftemplate ce3 "Define complex event temp" (slot time1) (slot time2))
(deftemplate tmp1ce3 "Define complex event temp" (slot time1) (slot time2))
(deftemplate tmp2ce3 "Define complex event temp" (slot time1) (slot time2))
(deftemplate ce4 "Define complex event temp" (slot time1) (slot time2))
(deftemplate tmp1ce4 "Define complex event temp" (slot time1) (slot time2))
(deftemplate tmp2ce4 "Define complex event temp" (slot time1) (slot time2))
(deftemplate ce5 "Define complex event temp" (slot time1) (slot time2))
(deftemplate tmp1ce5 "Define complex event temp" (slot time1) (slot time2))
(deftemplate tmp2ce5 "Define complex event temp" (slot time1) (slot time2))
(deftemplate ce6 "Define complex event temp" (slot time1) (slot time2))
(deftemplate tmp1ce6 "Define complex event temp" (slot time1) (slot time2))
(deftemplate tmp2ce6 "Define complex event temp" (slot time1) (slot time2))

(defrule rule_ce1 "Complex Event 1"
 ?stockEvent1 <- (stockEvent {symbol == GOOG}(price ?p1))
 ?stockEvent2 <- (stockEvent {symbol == GOOG && time1 > stockEvent1.time2 }
 (price ?p2&:(< ?p1 (* ?p2 1.2))))
  => (assert (ce1 (time1 ?stockEvent1.time1) (time2 ?stockEvent2.time2))) )

(defrule rule_ce2 "Complex Event 2"
 ?stockEvent1 <- (stockEvent {symbol == MSFT } (price ?p1))
 ?stockEvent2 <- (stockEvent {symbol == MSFT && time1 > stockEvent1.time2 }
 (price ?p2&:(< ?p1 (* ?p2 1.2))))
  => (retract ?stockEvent1 ?stockEvent2)
  (assert (ce2 (time1 ?stockEvent1.time1) (time2 ?stockEvent2.time2))) )

(defrule rule_ce1_multiply "Multiply Complex Event 1"
 ?temp <- (ce1 (time1 ?t1) (time2 ?t2))
  =>
  (assert (tmp1ce3 (time1 ?t1) (time2 ?t2)))
  (assert (tmp1ce4 (time1 ?t1) (time2 ?t2)))
  (assert (tmp1ce5 (time1 ?t1) (time2 ?t2)))
  (assert (tmp1ce6 (time1 ?t1) (time2 ?t2)))
  (retract ?temp) )

(defrule rule_ce2_multiply "Multiply Complex Event 2"
 ?temp <- (ce2 (time1 ?t1) (time2 ?t2))
  =>
  (assert (tmp2ce3 (time1 ?t1) (time2 ?t2)))
  (assert (tmp2ce4 (time1 ?t1) (time2 ?t2)))
  (assert (tmp2ce5 (time1 ?t1) (time2 ?t2)))
  (assert (tmp2ce6 (time1 ?t1) (time2 ?t2)))
  (retract ?temp) )

(defrule rule_ce3 "Complex Event ce3"
 ?temp <- (tmp1ce3)
  =>
  (assert (ce3 (time1 ?temp.time1) (time2 ?temp.time2)))
  (retract ?temp) )

(defrule rule_ce3 "Complex Event ce3"
 ?temp <- (tmp2ce3)
  =>
  (assert (ce3 (time1 ?temp.time1) (time2 ?temp.time2)))
  (retract ?temp) )

(defrule rule_ce41 "Complex Event 4"
 ?temp1 <- (tmp1ce4 (time2 ?ce1t2))
 ?temp2 <- (tmp2ce4 {time1 > temp1.time2} (time2 ?ce2t2&:(> 20 (- ?ce2t2 ?ce1t2))))
  => 
  (assert (ce4 (time1 ?temp1.time1) (time2 ?temp2.time2)))
  (retract ?temp1 ?temp2) )

(defrule rule_ce42 "Complex Event 4"
 ?temp1 <- (tmp2ce4 (time2 ?ce1t2))
 ?temp2 <- (tmp1ce4 {time1 > temp1.time2} (time2 ?ce2t2&:(> 20 (- ?ce2t2 ?ce1t2))))
  => 
  (assert (ce4 (time1 ?temp1.time1) (time2 ?temp2.time2)))
  (retract ?temp1 ?temp2) )

(defrule rule_ce51 "Complex Event 5 rule1"
 ?temp1 <- (tmp1ce5)
 ?temp2 <- (tmp2ce5 {time1 < temp1.time2 && time1 > temp1.time1 && time2 > temp1.time2})
  => 
  (assert (ce5 (time1 ?temp1.time1) (time2 ?temp2.time2)))
  (retract ?temp1 ?temp2) )

(defrule rule_ce52 "Complex Event 5 rule2"
 ?temp1 <- (tmp2ce5)
 ?temp2 <- (tmp1ce5 {time1 < temp1.time2 && time1 > temp1.time1 && time2 > temp1.time2})
  => 
  (assert (ce5 (time1 ?temp1.time1) (time2 ?temp2.time2)))
  (retract ?temp1 ?temp2) )

(defrule ce6 "Complex Event 6"
 ?temp1 <- (tmp1ce6 (time1 ?ce1t1) (time2 ?ce1t2) )
 ?temp2 <- (tmp1ce6 {time1 > temp1.time1 && time2 > temp1.time2}(time1 ?ce2t1) (time2 ?ce2t2))
 (not (tmp2ce6 (time1 ?t1&:(> ?t1 ?ce1t2)) (time2 ?t2&:(< ?t2 ?ce2t2)) ))
  =>
  (assert (ce6 (time1 ?ce1t1) (time2 ?ce2t2))) 
  (retract ?temp1 ?temp2 ) )

(defquery query-ce6 (ce6 (time1 ?time1) (time2 ?time2)))

(open "result.txt" output a)
(printout output crlf)
(reset)
(load-facts "tmp_data.clp")
(bind ?tmx (call java.lang.management.ManagementFactory getThreadMXBean))
(deffunction cputime () (return (* (?tmx getCurrentThreadCpuTime) 1E-9)))
(bind ?starttime_wall (time))
(bind ?starttime_cpu (cputime))
(run)
(bind ?query_result (run-query* query-ce6))
;(bind ?count 0)
;(while (?query_result next)
; (++ ?count)
;)
;(printout output "solutions: " ?count crlf)
(bind ?endtime_cpu (cputime))
(bind ?endtime_wall (time))
(bind ?walltime (- ?endtime_wall ?starttime_wall))
(bind ?cputime (- ?endtime_cpu ?starttime_cpu))
(printout output "computing cputime: " ?cputime crlf)
(printout output "computing walltime: " ?walltime crlf)
(close output)

tmp_data.clp:
(stockEvent (symbol GOOG) (price 580.0) (time1 1) (time2 1))
(stockEvent (symbol MSFT) (price 247.1) (time1 2) (time2 2))
(stockEvent (symbol MSFT) (price 439.9) (time1 3) (time2 3))
(stockEvent (symbol GOOG) (price 149.0) (time1 4) (time2 4))
(stockEvent (symbol GOOG) (price 313.1) (time1 5) (time2 5))
...

 cp data/benchFacts_001k.clp tmp_data.clp
 java -Xms1000m -Xmx1000m -cp ";./;.\;.;c:\Program Files\Java\jdk1.6.0_12\lib\tools.jar;C:\Program Files\Java\jre6\lib\ext\QTJava.zip;C:\Program Files\pl\bin;lib\antlr-runtime-3.1.1.jar;lib\jess.jar;lib\jsr94.jar" jess.Main test.clp






Re: JESS: JESS for Complex Event Processing

by Ernest Friedman-Hill :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

I don't see anything particularly bad in this code that would cause  
especially poor performance. If you're not using Jess 7.1p2 (the most  
recent version) then you may be missing some optimizations that would  
be relevant here, so it's worth a try if you're not.

That said, the algorithms used by Jess tacitly assume that only a  
small percentage of the facts in working memory will change at each  
'cycle', and I'm not sure that applies here. In general, this is a  
poor assumption for event-processing applications.


On Sep 30, 2009, at 3:22 PM, PF wrote:

> We are trying to apply Jess for complex event processing, but we  
> have some problems with its speed. I want to check with this mailing  
> list our sample program before we abandon the approach altogether.  
> We also wonder if there is any way to speed it up. In our  
> experiments, we detected complex events in streams of stock tickers  
> or transactions of the form stock(Company,Price,Volume) (see below  
> the rules in Jess while the events are represented as the facts  
> pumped into the system with time stamps).
>
> I can provide all of our code for anyone that needs it (beside what  
> I already pasted below).
> We get some very bad times and we also run of of memory for large  
> datasets (10k stock ticks), while we can go up to millions of ticks  
> and ~1% of the execution time with CEP systems (Drools Fusion,  
> Esper, Etalis, etc.). I know that Jess might not be fit for this  
> task, but we just wanted to try it since we saw some works applying  
> it (or other production rule systems) for complex event processing.
> Please tell me if you see or know any optimizations.
>
> Thank you, regards,
> Paul Fodor
>
> test.clp:
> ; Complex events:
> ; stock("GOOG",Pr1,_)*stock("GOOG",Pr2,_)* (Pr1*0.8 > Pr2)) -> ce1.
> ; stock("MSFT",Pr1,_)*stock("MSFT",Pr2,_)* (Pr1*0.8 > Pr2)) -> ce2.
> ; ce1 \/ ce2 -> ce3.
> ; ce1 /\ ce2 -> ce4 {[T1,T2],T2-T1 <20}.
> ; ce1 # ce2 -> ce5.
> ; ce1*not(ce2)*ce1 -> ce6.
>
> (deftemplate stockEvent "define stock event template" (slot symbol)  
> (slot price) (slot time1) (slot time2))
> (deftemplate ce1 "Define complex event temp" (slot time1) (slot  
> time2))
> (deftemplate ce2 "Define complex event temp" (slot time1) (slot  
> time2))
> (deftemplate ce3 "Define complex event temp" (slot time1) (slot  
> time2))
> (deftemplate tmp1ce3 "Define complex event temp" (slot time1) (slot  
> time2))
> (deftemplate tmp2ce3 "Define complex event temp" (slot time1) (slot  
> time2))
> (deftemplate ce4 "Define complex event temp" (slot time1) (slot  
> time2))
> (deftemplate tmp1ce4 "Define complex event temp" (slot time1) (slot  
> time2))
> (deftemplate tmp2ce4 "Define complex event temp" (slot time1) (slot  
> time2))
> (deftemplate ce5 "Define complex event temp" (slot time1) (slot  
> time2))
> (deftemplate tmp1ce5 "Define complex event temp" (slot time1) (slot  
> time2))
> (deftemplate tmp2ce5 "Define complex event temp" (slot time1) (slot  
> time2))
> (deftemplate ce6 "Define complex event temp" (slot time1) (slot  
> time2))
> (deftemplate tmp1ce6 "Define complex event temp" (slot time1) (slot  
> time2))
> (deftemplate tmp2ce6 "Define complex event temp" (slot time1) (slot  
> time2))
>
> (defrule rule_ce1 "Complex Event 1"
>  ?stockEvent1 <- (stockEvent {symbol == GOOG}(price ?p1))
>  ?stockEvent2 <- (stockEvent {symbol == GOOG && time1 >  
> stockEvent1.time2 }
>  (price ?p2&:(< ?p1 (* ?p2 1.2))))
>   => (assert (ce1 (time1 ?stockEvent1.time1) (time2 ?
> stockEvent2.time2))) )
>
> (defrule rule_ce2 "Complex Event 2"
>  ?stockEvent1 <- (stockEvent {symbol == MSFT } (price ?p1))
>  ?stockEvent2 <- (stockEvent {symbol == MSFT && time1 >  
> stockEvent1.time2 }
>  (price ?p2&:(< ?p1 (* ?p2 1.2))))
>   => (retract ?stockEvent1 ?stockEvent2)
>   (assert (ce2 (time1 ?stockEvent1.time1) (time2 ?
> stockEvent2.time2))) )
>
> (defrule rule_ce1_multiply "Multiply Complex Event 1"
>  ?temp <- (ce1 (time1 ?t1) (time2 ?t2))
>   =>
>   (assert (tmp1ce3 (time1 ?t1) (time2 ?t2)))
>   (assert (tmp1ce4 (time1 ?t1) (time2 ?t2)))
>   (assert (tmp1ce5 (time1 ?t1) (time2 ?t2)))
>   (assert (tmp1ce6 (time1 ?t1) (time2 ?t2)))
>   (retract ?temp) )
>
> (defrule rule_ce2_multiply "Multiply Complex Event 2"
>  ?temp <- (ce2 (time1 ?t1) (time2 ?t2))
>   =>
>   (assert (tmp2ce3 (time1 ?t1) (time2 ?t2)))
>   (assert (tmp2ce4 (time1 ?t1) (time2 ?t2)))
>   (assert (tmp2ce5 (time1 ?t1) (time2 ?t2)))
>   (assert (tmp2ce6 (time1 ?t1) (time2 ?t2)))
>   (retract ?temp) )
>
> (defrule rule_ce3 "Complex Event ce3"
>  ?temp <- (tmp1ce3)
>   =>
>   (assert (ce3 (time1 ?temp.time1) (time2 ?temp.time2)))
>   (retract ?temp) )
>
> (defrule rule_ce3 "Complex Event ce3"
>  ?temp <- (tmp2ce3)
>   =>
>   (assert (ce3 (time1 ?temp.time1) (time2 ?temp.time2)))
>   (retract ?temp) )
>
> (defrule rule_ce41 "Complex Event 4"
>  ?temp1 <- (tmp1ce4 (time2 ?ce1t2))
>  ?temp2 <- (tmp2ce4 {time1 > temp1.time2} (time2 ?ce2t2&:(> 20 (- ?
> ce2t2 ?ce1t2))))
>   =>
>   (assert (ce4 (time1 ?temp1.time1) (time2 ?temp2.time2)))
>   (retract ?temp1 ?temp2) )
>
> (defrule rule_ce42 "Complex Event 4"
>  ?temp1 <- (tmp2ce4 (time2 ?ce1t2))
>  ?temp2 <- (tmp1ce4 {time1 > temp1.time2} (time2 ?ce2t2&:(> 20 (- ?
> ce2t2 ?ce1t2))))
>   =>
>   (assert (ce4 (time1 ?temp1.time1) (time2 ?temp2.time2)))
>   (retract ?temp1 ?temp2) )
>
> (defrule rule_ce51 "Complex Event 5 rule1"
>  ?temp1 <- (tmp1ce5)
>  ?temp2 <- (tmp2ce5 {time1 < temp1.time2 && time1 > temp1.time1 &&  
> time2 > temp1.time2})
>   =>
>   (assert (ce5 (time1 ?temp1.time1) (time2 ?temp2.time2)))
>   (retract ?temp1 ?temp2) )
>
> (defrule rule_ce52 "Complex Event 5 rule2"
>  ?temp1 <- (tmp2ce5)
>  ?temp2 <- (tmp1ce5 {time1 < temp1.time2 && time1 > temp1.time1 &&  
> time2 > temp1.time2})
>   =>
>   (assert (ce5 (time1 ?temp1.time1) (time2 ?temp2.time2)))
>   (retract ?temp1 ?temp2) )
>
> (defrule ce6 "Complex Event 6"
>  ?temp1 <- (tmp1ce6 (time1 ?ce1t1) (time2 ?ce1t2) )
>  ?temp2 <- (tmp1ce6 {time1 > temp1.time1 && time2 > temp1.time2}
> (time1 ?ce2t1) (time2 ?ce2t2))
>  (not (tmp2ce6 (time1 ?t1&:(> ?t1 ?ce1t2)) (time2 ?t2&:(< ?t2 ?
> ce2t2)) ))
>   =>
>   (assert (ce6 (time1 ?ce1t1) (time2 ?ce2t2)))
>   (retract ?temp1 ?temp2 ) )
>
> (defquery query-ce6 (ce6 (time1 ?time1) (time2 ?time2)))
>
> (open "result.txt" output a)
> (printout output crlf)
> (reset)
> (load-facts "tmp_data.clp")
> (bind ?tmx (call java.lang.management.ManagementFactory  
> getThreadMXBean))
> (deffunction cputime () (return (* (?tmx getCurrentThreadCpuTime)  
> 1E-9)))
> (bind ?starttime_wall (time))
> (bind ?starttime_cpu (cputime))
> (run)
> (bind ?query_result (run-query* query-ce6))
> ;(bind ?count 0)
> ;(while (?query_result next)
> ; (++ ?count)
> ;)
> ;(printout output "solutions: " ?count crlf)
> (bind ?endtime_cpu (cputime))
> (bind ?endtime_wall (time))
> (bind ?walltime (- ?endtime_wall ?starttime_wall))
> (bind ?cputime (- ?endtime_cpu ?starttime_cpu))
> (printout output "computing cputime: " ?cputime crlf)
> (printout output "computing walltime: " ?walltime crlf)
> (close output)
>
> tmp_data.clp:
> (stockEvent (symbol GOOG) (price 580.0) (time1 1) (time2 1))
> (stockEvent (symbol MSFT) (price 247.1) (time1 2) (time2 2))
> (stockEvent (symbol MSFT) (price 439.9) (time1 3) (time2 3))
> (stockEvent (symbol GOOG) (price 149.0) (time1 4) (time2 4))
> (stockEvent (symbol GOOG) (price 313.1) (time1 5) (time2 5))
> ...
>
>  cp data/benchFacts_001k.clp tmp_data.clp
>  java -Xms1000m -Xmx1000m -cp ";./;.\;.;c:\Program Files\Java
> \jdk1.6.0_12\lib\tools.jar;C:\Program Files\Java\jre6\lib\ext
> \QTJava.zip;C:\Program Files\pl\bin;lib\antlr-runtime-3.1.1.jar;lib
> \jess.jar;lib\jsr94.jar" jess.Main test.clp
>
>
>
>
>

---------------------------------------------------------
Ernest Friedman-Hill
Informatics & Decision Sciences          Phone: (925) 294-2154
Sandia National Labs
PO Box 969, MS 9012                            ejfried@...
Livermore, CA 94550                             http://www.jessrules.com





--------------------------------------------------------------------
To unsubscribe, send the words 'unsubscribe jess-users you@...'
in the BODY of a message to majordomo@..., NOT to the list
(use your own address!) List problems? Notify owner-jess-users@....
--------------------------------------------------------------------


RE: JESS: JESS for Complex Event Processing

by LAUN, Wolfgang :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Three remarks:
 
(1) All these event facts ce* and tmp*ce* are asserted and retracted in RHSs, but it is not easy to see whether they all are retracted under all possible circumstances. No tmp2ce6 is ever retracted, so this will lead to an ever-increasing amount of work to be done for evaluating rule ce6. CEP engines have a feature to automatically retract out-dated events when they no longer matter. In Jess, you would have to take care of that using additional rules.
 
(2) I'm also very suspicious w.r.t. to the slot structure of the event facts. As they all contain just a couple of time values: How can you be sure that all asserts of some ce* or tmp*ce* result in a new, unique value of its type? Jess won't let you assert an identical fact.
 
(3) A synchronous test by first loading all facts from events occuring over a period of time and then run-until-halt isn't necessarily a good measure for the asynchronous real time performance where events are typically inserted and processed as they arrive.
 
-W
 


From: owner-jess-users@... [mailto:owner-jess-users@...] On Behalf Of PF
Sent: Mittwoch, 30. September 2009 21:23
To: jess-users@...
Subject: JESS: JESS for Complex Event Processing

We are trying to apply Jess for complex event processing, but we have some problems with its speed. I want to check with this mailing list our sample program before we abandon the approach altogether. We also wonder if there is any way to speed it up. In our experiments, we detected complex events in streams of stock tickers or transactions of the form stock(Company,Price,Volume) (see below the rules in Jess while the events are represented as the facts pumped into the system with time stamps).

I can provide all of our code for anyone that needs it (beside what I already pasted below).
We get some very bad times and we also run of of memory for large datasets (10k stock ticks), while we can go up to millions of ticks and ~1% of the execution time with CEP systems (Drools Fusion, Esper, Etalis, etc.). I know that Jess might not be fit for this task, but we just wanted to try it since we saw some works applying it (or other production rule systems) for complex event processing.
Please tell me if you see or know any optimizations. 

Thank you, regards,
Paul Fodor 

test.clp:
; Complex events:
; stock("GOOG",Pr1,_)*stock("GOOG",Pr2,_)* (Pr1*0.8 > Pr2)) -> ce1.
; stock("MSFT",Pr1,_)*stock("MSFT",Pr2,_)* (Pr1*0.8 > Pr2)) -> ce2.
; ce1 \/ ce2 -> ce3.
; ce1 /\ ce2 -> ce4 {[T1,T2],T2-T1 <20}.
; ce1 # ce2 -> ce5.
; ce1*not(ce2)*ce1 -> ce6.

(deftemplate stockEvent "define stock event template" (slot symbol) (slot price) (slot time1) (slot time2))
(deftemplate ce1 "Define complex event temp" (slot time1) (slot time2))
(deftemplate ce2 "Define complex event temp" (slot time1) (slot time2))
(deftemplate ce3 "Define complex event temp" (slot time1) (slot time2))
(deftemplate tmp1ce3 "Define complex event temp" (slot time1) (slot time2))
(deftemplate tmp2ce3 "Define complex event temp" (slot time1) (slot time2))
(deftemplate ce4 "Define complex event temp" (slot time1) (slot time2))
(deftemplate tmp1ce4 "Define complex event temp" (slot time1) (slot time2))
(deftemplate tmp2ce4 "Define complex event temp" (slot time1) (slot time2))
(deftemplate ce5 "Define complex event temp" (slot time1) (slot time2))
(deftemplate tmp1ce5 "Define complex event temp" (slot time1) (slot time2))
(deftemplate tmp2ce5 "Define complex event temp" (slot time1) (slot time2))
(deftemplate ce6 "Define complex event temp" (slot time1) (slot time2))
(deftemplate tmp1ce6 "Define complex event temp" (slot time1) (slot time2))
(deftemplate tmp2ce6 "Define complex event temp" (slot time1) (slot time2))

(defrule rule_ce1 "Complex Event 1"
 ?stockEvent1 <- (stockEvent {symbol == GOOG}(price ?p1))
 ?stockEvent2 <- (stockEvent {symbol == GOOG && time1 > stockEvent1.time2 }
 (price ?p2&:(< ?p1 (* ?p2 1.2))))
  => (assert (ce1 (time1 ?stockEvent1.time1) (time2 ?stockEvent2.time2))) )

(defrule rule_ce2 "Complex Event 2"
 ?stockEvent1 <- (stockEvent {symbol == MSFT } (price ?p1))
 ?stockEvent2 <- (stockEvent {symbol == MSFT && time1 > stockEvent1.time2 }
 (price ?p2&:(< ?p1 (* ?p2 1.2))))
  => (retract ?stockEvent1 ?stockEvent2)
  (assert (ce2 (time1 ?stockEvent1.time1) (time2 ?stockEvent2.time2))) )

(defrule rule_ce1_multiply "Multiply Complex Event 1"
 ?temp <- (ce1 (time1 ?t1) (time2 ?t2))
  =>
  (assert (tmp1ce3 (time1 ?t1) (time2 ?t2)))
  (assert (tmp1ce4 (time1 ?t1) (time2 ?t2)))
  (assert (tmp1ce5 (time1 ?t1) (time2 ?t2)))
  (assert (tmp1ce6 (time1 ?t1) (time2 ?t2)))
  (retract ?temp) )

(defrule rule_ce2_multiply "Multiply Complex Event 2"
 ?temp <- (ce2 (time1 ?t1) (time2 ?t2))
  =>
  (assert (tmp2ce3 (time1 ?t1) (time2 ?t2)))
  (assert (tmp2ce4 (time1 ?t1) (time2 ?t2)))
  (assert (tmp2ce5 (time1 ?t1) (time2 ?t2)))
  (assert (tmp2ce6 (time1 ?t1) (time2 ?t2)))
  (retract ?temp) )

(defrule rule_ce3 "Complex Event ce3"
 ?temp <- (tmp1ce3)
  =>
  (assert (ce3 (time1 ?temp.time1) (time2 ?temp.time2)))
  (retract ?temp) )

(defrule rule_ce3 "Complex Event ce3"
 ?temp <- (tmp2ce3)
  =>
  (assert (ce3 (time1 ?temp.time1) (time2 ?temp.time2)))
  (retract ?temp) )

(defrule rule_ce41 "Complex Event 4"
 ?temp1 <- (tmp1ce4 (time2 ?ce1t2))
 ?temp2 <- (tmp2ce4 {time1 > temp1.time2} (time2 ?ce2t2&:(> 20 (- ?ce2t2 ?ce1t2))))
  => 
  (assert (ce4 (time1 ?temp1.time1) (time2 ?temp2.time2)))
  (retract ?temp1 ?temp2) )

(defrule rule_ce42 "Complex Event 4"
 ?temp1 <- (tmp2ce4 (time2 ?ce1t2))
 ?temp2 <- (tmp1ce4 {time1 > temp1.time2} (time2 ?ce2t2&:(> 20 (- ?ce2t2 ?ce1t2))))
  => 
  (assert (ce4 (time1 ?temp1.time1) (time2 ?temp2.time2)))
  (retract ?temp1 ?temp2) )

(defrule rule_ce51 "Complex Event 5 rule1"
 ?temp1 <- (tmp1ce5)
 ?temp2 <- (tmp2ce5 {time1 < temp1.time2 && time1 > temp1.time1 && time2 > temp1.time2})
  => 
  (assert (ce5 (time1 ?temp1.time1) (time2 ?temp2.time2)))
  (retract ?temp1 ?temp2) )

(defrule rule_ce52 "Complex Event 5 rule2"
 ?temp1 <- (tmp2ce5)
 ?temp2 <- (tmp1ce5 {time1 < temp1.time2 && time1 > temp1.time1 && time2 > temp1.time2})
  => 
  (assert (ce5 (time1 ?temp1.time1) (time2 ?temp2.time2)))
  (retract ?temp1 ?temp2) )

(defrule ce6 "Complex Event 6"
 ?temp1 <- (tmp1ce6 (time1 ?ce1t1) (time2 ?ce1t2) )
 ?temp2 <- (tmp1ce6 {time1 > temp1.time1 && time2 > temp1.time2}(time1 ?ce2t1) (time2 ?ce2t2))
 (not (tmp2ce6 (time1 ?t1&:(> ?t1 ?ce1t2)) (time2 ?t2&:(< ?t2 ?ce2t2)) ))
  =>
  (assert (ce6 (time1 ?ce1t1) (time2 ?ce2t2))) 
  (retract ?temp1 ?temp2 ) )

(defquery query-ce6 (ce6 (time1 ?time1) (time2 ?time2)))

(open "result.txt" output a)
(printout output crlf)
(reset)
(load-facts "tmp_data.clp")
(bind ?tmx (call java.lang.management.ManagementFactory getThreadMXBean))
(deffunction cputime () (return (* (?tmx getCurrentThreadCpuTime) 1E-9)))
(bind ?starttime_wall (time))
(bind ?starttime_cpu (cputime))
(run)
(bind ?query_result (run-query* query-ce6))
;(bind ?count 0)
;(while (?query_result next)
; (++ ?count)
;)
;(printout output "solutions: " ?count crlf)
(bind ?endtime_cpu (cputime))
(bind ?endtime_wall (time))
(bind ?walltime (- ?endtime_wall ?starttime_wall))
(bind ?cputime (- ?endtime_cpu ?starttime_cpu))
(printout output "computing cputime: " ?cputime crlf)
(printout output "computing walltime: " ?walltime crlf)
(close output)

tmp_data.clp:
(stockEvent (symbol GOOG) (price 580.0) (time1 1) (time2 1))
(stockEvent (symbol MSFT) (price 247.1) (time1 2) (time2 2))
(stockEvent (symbol MSFT) (price 439.9) (time1 3) (time2 3))
(stockEvent (symbol GOOG) (price 149.0) (time1 4) (time2 4))
(stockEvent (symbol GOOG) (price 313.1) (time1 5) (time2 5))
...

 cp data/benchFacts_001k.clp tmp_data.clp
 java -Xms1000m -Xmx1000m -cp ";./;.\;.;c:\Program Files\Java\jdk1.6.0_12\lib\tools.jar;C:\Program Files\Java\jre6\lib\ext\QTJava.zip;C:\Program Files\pl\bin;lib\antlr-runtime-3.1.1.jar;lib\jess.jar;lib\jsr94.jar" jess.Main test.clp






Re: JESS: JESS for Complex Event Processing

by Peter Lin :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

If I understand the rules correctly. the example is using a fact to
define intervals. Doing it that way incurs a huge cost, since the
engine has to retract the fact at the end of the time window.

It would be more efficient to write a time function(s) and use them in
the LHS. That avoids hammering the engine, but doesn't completely
solve the issue. Rule_ce2, it retracts the stockEvent. Rule_ce1
doesn't retract the stockEvent, so over time the engine is going to
run out of memory.

Taking a step back, the bigger issue is temporal logic as it relates
to complex rulesets. In the example, there's only 2 rules that perform
a simple pattern match. One can't assume it's safe to retract the
stockEvent fact from the engine if some other rules use it.

without looking at the bigger picture and having a clear understanding
of temporal logic, any attempt to use a rule engine for event
processing is going to filled with potholes. I would suggest taking
time to study temporal logic and pattern matching theory more
thoroughly first.

peter

On Wed, Sep 30, 2009 at 3:22 PM, PF <paul.i.fodor@...> wrote:

> We are trying to apply Jess for complex event processing, but we have some
> problems with its speed. I want to check with this mailing list our sample
> program before we abandon the approach altogether. We also wonder if there
> is any way to speed it up. In our experiments, we detected complex events in
> streams of stock tickers or transactions of the form
> stock(Company,Price,Volume) (see below the rules in Jess while the events
> are represented as the facts pumped into the system with time stamps).
>
> I can provide all of our code for anyone that needs it (beside what I
> already pasted below).
> We get some very bad times and we also run of of memory for large datasets
> (10k stock ticks), while we can go up to millions of ticks and ~1% of the
> execution time with CEP systems (Drools Fusion, Esper, Etalis, etc.). I know
> that Jess might not be fit for this task, but we just wanted to try it since
> we saw some works applying it (or other production rule systems) for complex
> event processing.
> Please tell me if you see or know any optimizations.
>
> Thank you, regards,
> Paul Fodor
>
> test.clp:
> ; Complex events:
> ; stock("GOOG",Pr1,_)*stock("GOOG",Pr2,_)* (Pr1*0.8 > Pr2)) -> ce1.
> ; stock("MSFT",Pr1,_)*stock("MSFT",Pr2,_)* (Pr1*0.8 > Pr2)) -> ce2.
> ; ce1 \/ ce2 -> ce3.
> ; ce1 /\ ce2 -> ce4 {[T1,T2],T2-T1 <20}.
> ; ce1 # ce2 -> ce5.
> ; ce1*not(ce2)*ce1 -> ce6.
>
> (deftemplate stockEvent "define stock event template" (slot symbol) (slot
> price) (slot time1) (slot time2))
> (deftemplate ce1 "Define complex event temp" (slot time1) (slot time2))
> (deftemplate ce2 "Define complex event temp" (slot time1) (slot time2))
> (deftemplate ce3 "Define complex event temp" (slot time1) (slot time2))
> (deftemplate tmp1ce3 "Define complex event temp" (slot time1) (slot time2))
> (deftemplate tmp2ce3 "Define complex event temp" (slot time1) (slot time2))
> (deftemplate ce4 "Define complex event temp" (slot time1) (slot time2))
> (deftemplate tmp1ce4 "Define complex event temp" (slot time1) (slot time2))
> (deftemplate tmp2ce4 "Define complex event temp" (slot time1) (slot time2))
> (deftemplate ce5 "Define complex event temp" (slot time1) (slot time2))
> (deftemplate tmp1ce5 "Define complex event temp" (slot time1) (slot time2))
> (deftemplate tmp2ce5 "Define complex event temp" (slot time1) (slot time2))
> (deftemplate ce6 "Define complex event temp" (slot time1) (slot time2))
> (deftemplate tmp1ce6 "Define complex event temp" (slot time1) (slot time2))
> (deftemplate tmp2ce6 "Define complex event temp" (slot time1) (slot time2))
>
> (defrule rule_ce1 "Complex Event 1"
>  ?stockEvent1 <- (stockEvent {symbol == GOOG}(price ?p1))
>  ?stockEvent2 <- (stockEvent {symbol == GOOG && time1 > stockEvent1.time2 }
>  (price ?p2&:(< ?p1 (* ?p2 1.2))))
>   => (assert (ce1 (time1 ?stockEvent1.time1) (time2 ?stockEvent2.time2))) )
>
> (defrule rule_ce2 "Complex Event 2"
>  ?stockEvent1 <- (stockEvent {symbol == MSFT } (price ?p1))
>  ?stockEvent2 <- (stockEvent {symbol == MSFT && time1 > stockEvent1.time2 }
>  (price ?p2&:(< ?p1 (* ?p2 1.2))))
>   => (retract ?stockEvent1 ?stockEvent2)
>   (assert (ce2 (time1 ?stockEvent1.time1) (time2 ?stockEvent2.time2))) )
>
> (defrule rule_ce1_multiply "Multiply Complex Event 1"
>  ?temp <- (ce1 (time1 ?t1) (time2 ?t2))
>   =>
>   (assert (tmp1ce3 (time1 ?t1) (time2 ?t2)))
>   (assert (tmp1ce4 (time1 ?t1) (time2 ?t2)))
>   (assert (tmp1ce5 (time1 ?t1) (time2 ?t2)))
>   (assert (tmp1ce6 (time1 ?t1) (time2 ?t2)))
>   (retract ?temp) )
>
> (defrule rule_ce2_multiply "Multiply Complex Event 2"
>  ?temp <- (ce2 (time1 ?t1) (time2 ?t2))
>   =>
>   (assert (tmp2ce3 (time1 ?t1) (time2 ?t2)))
>   (assert (tmp2ce4 (time1 ?t1) (time2 ?t2)))
>   (assert (tmp2ce5 (time1 ?t1) (time2 ?t2)))
>   (assert (tmp2ce6 (time1 ?t1) (time2 ?t2)))
>   (retract ?temp) )
>
> (defrule rule_ce3 "Complex Event ce3"
>  ?temp <- (tmp1ce3)
>   =>
>   (assert (ce3 (time1 ?temp.time1) (time2 ?temp.time2)))
>   (retract ?temp) )
>
> (defrule rule_ce3 "Complex Event ce3"
>  ?temp <- (tmp2ce3)
>   =>
>   (assert (ce3 (time1 ?temp.time1) (time2 ?temp.time2)))
>   (retract ?temp) )
>
> (defrule rule_ce41 "Complex Event 4"
>  ?temp1 <- (tmp1ce4 (time2 ?ce1t2))
>  ?temp2 <- (tmp2ce4 {time1 > temp1.time2} (time2 ?ce2t2&:(> 20 (- ?ce2t2
> ?ce1t2))))
>   =>
>   (assert (ce4 (time1 ?temp1.time1) (time2 ?temp2.time2)))
>   (retract ?temp1 ?temp2) )
>
> (defrule rule_ce42 "Complex Event 4"
>  ?temp1 <- (tmp2ce4 (time2 ?ce1t2))
>  ?temp2 <- (tmp1ce4 {time1 > temp1.time2} (time2 ?ce2t2&:(> 20 (- ?ce2t2
> ?ce1t2))))
>   =>
>   (assert (ce4 (time1 ?temp1.time1) (time2 ?temp2.time2)))
>   (retract ?temp1 ?temp2) )
>
> (defrule rule_ce51 "Complex Event 5 rule1"
>  ?temp1 <- (tmp1ce5)
>  ?temp2 <- (tmp2ce5 {time1 < temp1.time2 && time1 > temp1.time1 && time2 >
> temp1.time2})
>   =>
>   (assert (ce5 (time1 ?temp1.time1) (time2 ?temp2.time2)))
>   (retract ?temp1 ?temp2) )
>
> (defrule rule_ce52 "Complex Event 5 rule2"
>  ?temp1 <- (tmp2ce5)
>  ?temp2 <- (tmp1ce5 {time1 < temp1.time2 && time1 > temp1.time1 && time2 >
> temp1.time2})
>   =>
>   (assert (ce5 (time1 ?temp1.time1) (time2 ?temp2.time2)))
>   (retract ?temp1 ?temp2) )
>
> (defrule ce6 "Complex Event 6"
>  ?temp1 <- (tmp1ce6 (time1 ?ce1t1) (time2 ?ce1t2) )
>  ?temp2 <- (tmp1ce6 {time1 > temp1.time1 && time2 > temp1.time2}(time1
> ?ce2t1) (time2 ?ce2t2))
>  (not (tmp2ce6 (time1 ?t1&:(> ?t1 ?ce1t2)) (time2 ?t2&:(< ?t2 ?ce2t2)) ))
>   =>
>   (assert (ce6 (time1 ?ce1t1) (time2 ?ce2t2)))
>   (retract ?temp1 ?temp2 ) )
>
> (defquery query-ce6 (ce6 (time1 ?time1) (time2 ?time2)))
>
> (open "result.txt" output a)
> (printout output crlf)
> (reset)
> (load-facts "tmp_data.clp")
> (bind ?tmx (call java.lang.management.ManagementFactory getThreadMXBean))
> (deffunction cputime () (return (* (?tmx getCurrentThreadCpuTime) 1E-9)))
> (bind ?starttime_wall (time))
> (bind ?starttime_cpu (cputime))
> (run)
> (bind ?query_result (run-query* query-ce6))
> ;(bind ?count 0)
> ;(while (?query_result next)
> ; (++ ?count)
> ;)
> ;(printout output "solutions: " ?count crlf)
> (bind ?endtime_cpu (cputime))
> (bind ?endtime_wall (time))
> (bind ?walltime (- ?endtime_wall ?starttime_wall))
> (bind ?cputime (- ?endtime_cpu ?starttime_cpu))
> (printout output "computing cputime: " ?cputime crlf)
> (printout output "computing walltime: " ?walltime crlf)
> (close output)
>
> tmp_data.clp:
> (stockEvent (symbol GOOG) (price 580.0) (time1 1) (time2 1))
> (stockEvent (symbol MSFT) (price 247.1) (time1 2) (time2 2))
> (stockEvent (symbol MSFT) (price 439.9) (time1 3) (time2 3))
> (stockEvent (symbol GOOG) (price 149.0) (time1 4) (time2 4))
> (stockEvent (symbol GOOG) (price 313.1) (time1 5) (time2 5))
> ...
>
>  cp data/benchFacts_001k.clp tmp_data.clp
>  java -Xms1000m -Xmx1000m -cp ";./;.\;.;c:\Program
> Files\Java\jdk1.6.0_12\lib\tools.jar;C:\Program
> Files\Java\jre6\lib\ext\QTJava.zip;C:\Program
> Files\pl\bin;lib\antlr-runtime-3.1.1.jar;lib\jess.jar;lib\jsr94.jar"
> jess.Main test.clp
>
>
>
>
>
>


--------------------------------------------------------------------
To unsubscribe, send the words 'unsubscribe jess-users you@...'
in the BODY of a message to majordomo@..., NOT to the list
(use your own address!) List problems? Notify owner-jess-users@....
--------------------------------------------------------------------