hunk ./src/Main.hs 14 +import QuicQuid.Agent.And hunk ./src/Main.hs 41 - mapM_ forkIO [broker k,curlAgent,randomAgent] + mapM_ forkIO [broker k,andAgent,curlAgent,randomAgent] addfile ./src/QuicQuid/Agent/And.hs hunk ./src/QuicQuid/Agent/And.hs 1 +-- Implements the logical AND +module QuicQuid.Agent.And(andAgent) where + +import Control.Concurrent +import Control.Monad(forever) +import QuicQuid.Router +import qualified Data.Map as M + +import QuicQuid.Log +import QuicQuid.Logic +import QuicQuid.Agent.Util + +-- BUG: security problem, 'and' can be taken over by rogue agents. +-- BUG: spawn agents are never stopped. +-- BUG: inefficient, an agent for every combination of results? +andAgent = answerAgent "and" "and ?preds" process + where process qch = do + (bs,replyCh) <- readAskChan qch + let Just (Arr preds) = M.lookup "preds" bs + andQuery M.empty preds replyCh + +andQuery bs [] outChan = write outChan $ Obj bs +andQuery bs (q:qs) outChan = do + debugM "QuicQuid.Agent.And.andQuery" $ show bs ++ " " ++ show (q:qs) + forkIO $ do + (ch,chName) <- newEndPoint "/tmp" + newQuery_ q chName + forever $ do + Obj nbs <- readBody ch + andQuery (M.union bs nbs) (map (substitute nbs) qs) outChan + return () hunk ./src/QuicQuid/Agent/Util.hs 20 + +-- TODO: add support for the orderly shutdown of agents, to allow for closing connections, etc. hunk ./src/QuicQuid/Broker.lhs 5 -> ,newQuery,newAnswer,newAnswerAt +> ,newQuery,newQuery_,newAnswer,newAnswerAt hunk ./src/QuicQuid/Broker.lhs 41 -> "ask" -> newQueryCh brk pattern ach -> "answer" -> newAnswerCh brk pattern ach +> "ask" -> ask brk pattern ach >>= writeAll +> "answer" -> answer brk pattern ach >>= writeAll hunk ./src/QuicQuid/Broker.lhs 45 +> writeAll = mapM_ (\(bindings,answerChan,queryChan) -> writeAskChan answerChan bindings queryChan) + hunk ./src/QuicQuid/Broker.lhs 63 -> newQuery queriesS = do -> (ch,chName) <- newEndPoint "/tmp" -> let qs = fromJust $ parse queriesS -> write brokerAddr $ App (Str "ask") (Arr [qs,Str chName]) +> newQuery queriesS = do +> (ch,chName) <- newEndPoint "/tmp" +> newQuery_ (fromJust $ parse queriesS) chName hunk ./src/QuicQuid/Broker.lhs 68 +> newQuery_ qs chName = write brokerAddr $ App (Str "ask") (Arr [qs,Str chName]) + hunk ./src/QuicQuid/Broker.lhs 84 + +map2bind :: M.Map Term a -> M.Map String a +map2bind = M.mapKeys (\(Str s) -> s) + hunk ./src/QuicQuid/BrokerCore.lhs 9 +> import QuicQuid.Logic (Binding) hunk ./src/QuicQuid/BrokerCore.lhs 11 -> -- |The interface implemented by the cores. -> class BrokerCore a where -> -- |Subscribe to receive the results of a persistent query. -> newQueryCh :: -> a -- ^The broker core. -> -> Term -- ^The ask/query pattern. -> -> Address -- ^The address where matches are to be asynchronously sent. -> -> IO () -> -> -- |Subscribe to answer a specific predicate. -> newAnswerCh :: -> a -- ^The broker core. -> -> Term -- ^The answer pattern. -> -> Address -- ^The address where matches are to be asynchronously sent. -> -> IO () +> class BrokerCore brk where +> ask :: brk +> -> Term -- ^The ask pattern +> -> Address -- ^The asking agent address +> -- |The matches produced. +> -> IO [(Binding -- ^The bindings that make the answer match the ask/query +> ,Address -- ^The address of the answer agent +> ,Address -- ^The address of the ask agent +> )] +> answer :: brk +> -> Term -- ^The answer pattern +> -> Address -- ^The answering agent address +> -> IO [(Binding -- ^The bindings that make the answer match the ask/query +> ,Address -- ^The address of the answer agent +> ,Address -- ^The address of the ask agent +> )] hunk ./src/QuicQuid/BrokerCoreNaive.lhs 26 -> newQueryCh brk query ch = newQueryN brk (p query) ch -> where -> p (App (Str "and") (Arr qs)) = qs -> p q = [q] - -> newAnswerCh brk answer ch = do +> answer brk answer answerChan = do hunk ./src/QuicQuid/BrokerCoreNaive.lhs 28 -> st <- readTVar brk -> writeTVar brk $ st {answers = (answer,ch) : answers st} -> return st -> mapM_ (\q -> match q (answer,ch)) (queries st) - - -> newQueryN :: TVar Broker -> [Term] -> Address -> IO () -> newQueryN brk [query] ch = newQuery_ brk query ch - -> newQueryN brk queries@(q:qs) outChan = do -> forkIO $ agent outChan brk -> return () -> where -> agent outChan brk = do -> (ch,chName) <- newEndPoint "/tmp" -> newQueryN brk (init queries) chName -> forever $ do -> Obj bs <- readBody ch -> (transChan,transChanName) <- newEndPoint "/tmp" -> forkIO $ tagent bs transChan outChan -> newQuery_ brk (substitute (bs) (last queries)) transChanName -> tagent bindings inChan outChan = readBody inChan >>= \(Obj bs) -> write outChan $ Obj $ M.union bindings bs - -> newQuery_ :: TVar Broker -> Term -> Address -> IO () -> newQuery_ brk query ch = do +> st <- readTVar brk +> writeTVar brk $ st {answers = (answer,answerChan) : answers st} +> return st +> return $ catMaybes $ map (\(query,queryChan) -> fmap (\binding -> (binding,answerChan,queryChan)) (match2 query answer answerChan)) (queries st) +> ask brk query queryChan = do hunk ./src/QuicQuid/BrokerCoreNaive.lhs 35 -> writeTVar brk $ st {queries = (query,ch) : queries st} +> writeTVar brk $ st {queries = (query,queryChan) : queries st} hunk ./src/QuicQuid/BrokerCoreNaive.lhs 37 -> mapM_ (match (query,ch)) (answers st) - -Simple form of context: specifies only the pattern of the answerer address (needed?) and is applied only to the immediate query, not anything included. +> return $ catMaybes $ map (\(answer,answerChan) -> fmap (\binding -> (binding,answerChan,queryChan)) (match2 query answer answerChan)) (answers st) hunk ./src/QuicQuid/BrokerCoreNaive.lhs 39 -> match :: (Term, Address) -> (Term, Address) -> IO () -> match (query,queryChan) (answer,answerChan) = do -> --let queryE = substitute bindings query -> debugM "QuicQuid.BrokerCoreNaive.match" $ show ((query,queryChan),(answer,answerChan)) -> let u = c query -> if isJust u -> then writeAskChan answerChan (fromJust u) queryChan -> else return () -> where -> c (App (Str "context") (Arr [to,q])) = let ctxBinds = unify1 to (Str answerChan) in +> -- Simple form of context: specifies only the pattern of the answerer address (needed?) and is applied only to the immediate query, not anything included. +> match2 (App (Str "context") (Arr [to,q])) answer answerChan = let ctxBinds = unify1 to (Str answerChan) in hunk ./src/QuicQuid/BrokerCoreNaive.lhs 42 -> c q = unify1 answer query - - -> -- Startup a set of agents -> -- TODO: add support for the orderly shutdown of agents, to allow for closing connections, etc. - - startup :: TVar Broker -> [TVar Broker -> IO ()] -> IO () - startup brk agents = mapM_ (\a -> forkIO $ a brk) agents - +> match2 query answer answerCh = unify1 answer query hunk ./src/QuicQuid/BrokerCoreNaive.lhs 44 -map2bind :: M.Map Term a -> M.Map String a -map2bind = M.mapKeys (\(Str s) -> s) hunk ./src/QuicQuid/BrokerTest.hs 14 +import QuicQuid.Agent.And hunk ./src/QuicQuid/BrokerTest.hs 27 --- Test the broker and the broker core implementations. +-- Tests the broker and the broker core implementations. + +{- +TODO: +* Test invariants: +** Every possible term is an acceptable pattern to answer or ask. +** For every possible ask term, an answer term obtained by substituting variables to subtrees of the ask term matches the ask term (might also verify that the returned bindings are correct). +** Adding a to an existing repository, a new answer or a new ask generates a set of matches that is proper superset of the original repository match sets. +** Any permutation of a sequence of ask and answers will produce the same matches (except for their order). +** Compare the new implementation with the currently deployed one and see that differences are as expected. +-} + +tp = perf 5 hunk ./src/QuicQuid/BrokerTest.hs 44 - setup [broker core,db1,db2,curlAgent] + setup [broker core,andAgent,db1,db2] -- ,andAgent hunk ./src/QuicQuid/BrokerTest.hs 46 + -- Test queries vs the db1+db2 databases hunk ./src/QuicQuid/BrokerTest.hs 49 - testAnswer + -- testAnswer hunk ./src/QuicQuid/BrokerTest.hs 54 - h <- verboseStreamHandler stderr ERROR + h <- verboseStreamHandler stderr DEBUG hunk ./src/QuicQuid/BrokerTest.hs 56 - updateGlobalLogger rootLoggerName (setLevel DEBUG) + updateGlobalLogger rootLoggerName (setLevel ERROR) hunk ./src/QuicQuid/BrokerTest.hs 58 + updateGlobalLogger "QuicQuid.Agent.And" (setLevel ERROR) hunk ./src/QuicQuid/BrokerTest.hs 64 +-- simpleTest = ask "random ?X" ,answer "?q" answer "?X ?Y" answer "random ?Z" + hunk ./src/QuicQuid/BrokerTest.hs 68 - --,t2 "resource{content:?C,url:\"http://www.google.com\"}" ["{C:mime[\"XX\",\"\"]}"] hunk ./src/QuicQuid/BrokerTest.hs 70 - ,t2 "random ?R " ["{R:1234}","{R:ABC}"] + ,t2 "random ?R " ["{R:ABC}","{R:1234}"] hunk ./src/QuicQuid/BrokerTest.hs 73 - ,t2 "and[match[?X,IBM],type[?X,?T],name[?T,?TN],name[?X,?N]]" ["{N:\"Inter Bus Mach\",T:\"/en/company\",TN:\"Company\",X:\"/en/ibm\"}"] + ,t2 "and[match[?X,IBM],type[?X,?T]]" ["{T:\"/en/company\",X:\"/en/ibm\"}","{T:\"/en/thing\",X:\"/en/ibm\"}"] + -- TODO: fix test, add second possibility. + ,t2 "and[match[?X,IBM],type[?X,?T],name[?T,?TN],name[?X,?N]]" ["{N:\"Inter Bus Mach\",T:\"/en/company\",TN:\"Company\",X:\"/en/ibm\"}","{N:\"Inter Bus Mach\",T:\"/en/thing\",TN:\"A Thing\",X:\"/en/ibm\"}"] + -- don't care variables + -- ,t2 "random ?_R" ["{}","{}"] + -- ,t2 "random ?_" ["{}","{}"] hunk ./src/QuicQuid/BrokerTest.hs 81 + -- ask for all answers + -- ,t2 "context[\"{address:?adr,answer:?answer}]" ["{...}"] hunk ./src/QuicQuid/BrokerTest.hs 88 - -- We will also need this, match more then one predicate in the query: + -- We also need to match more then one predicate: hunk ./src/QuicQuid/BrokerTest.hs 94 - actual <- withTimeout op query 10000 -- Warn: if the query is slow to answer it will seem to fail. + actual <- withTimeout op query 100000 -- Warn: if the query is slow to answer it will seem to fail. hunk ./src/QuicQuid/BrokerTest.hs 98 +-- WARN: this should create a set of agents, each satisfying just one predicate +-- as db has itself complexity O(numPredicates) and skews the test results. hunk ./src/QuicQuid/BrokerTest.hs 106 - ,"random 1234" hunk ./src/QuicQuid/BrokerTest.hs 107 + ,"random 1234" hunk ./src/QuicQuid/BrokerTest.hs 109 + ,"hash abcd" + ,"hash dcba" hunk ./src/QuicQuid/BrokerTest.hs 132 --- TODO: fix/remake. +-- TODO: fix, should expect two results per query hunk ./src/QuicQuid/BrokerTest.hs 134 -t1 n = do +perf n = do hunk ./src/QuicQuid/BrokerTest.hs 139 - readN ch $ numReaders*2 + readN ch $ numReaders*1 hunk ./src/QuicQuid/BrokerTest.hs 148 - qch <- newQuery "and[match[?X,IBM],type[?X,?T],name[?T,?TN],name[?X,?N]]" -- http('http://happs.org' ?stuff)" + qch <- newQuery "and[match[?X,IBM],type[?X,?T],name[?T,?TN],name[?X,?N]]" hunk ./src/QuicQuid/BrokerTest.hs 169 +{- +root + Branch App [Branch (StrTerm,VarTerm) [Leaf (App (Str "random") (Var "R")]]] + +Answer: +random ?r + +random random +App + +data .. = AppTerm | StrTem + +["App","random","1234"] -> ["Arr","App","random","1234"] +random 1234 -> ["App","random","1234"] + +App + Str + "random" + Var (R) -> adr1 [R=abc,q] + Str "abc" -> adr2 [q] + "resource" + +Query: + random "abc" + random ?g + ?z 342 + ?x ?y + +-} + hunk ./src/QuicQuid/Logic.lhs 78 -> -- |Substitute a set of bindins in a term +> -- |Substitute a set of bindings in a term hunk ./src/QuicQuid/LogicTest.hs 37 - -testSubstitute = tests "substitute" chk [t3 [("x",Num 33)] "?x" "33",t3 [("X",Num 33)] "match[?X,IBM]" "match[33,IBM]"] +testSubstitute = tests "substitute" chk [t3 [("x",Num 33)] "?x" "33" + ,t3 [("X",Num 33)] "match[?X,IBM]" "match[33,IBM]" + ,t3 [("R",Str "ABC")] "integer ?R" "integer ABC" + ]