> {-# OPTIONS -fglasgow-exts -fth -fno-monomorphism-restriction #-} > module QuicQuid.Broker( > broker > -- * The broker API. > ,newQuery,newQuery_,newAnswer,newAnswerAt > -- * Utility functions. > ,readAskChan,writeAskChan,writeQueryChan > --,bind2term,map2bind > ) where > import Data.List > import Control.Monad(forever) > import Data.Maybe > import qualified Data.Map as M > import Control.Exception > import QuicQuid.Log > import QuicQuid.Router > import QuicQuid.Term > import QuicQuid.Logic(Binding) > import QuicQuid.BrokerCore The broker is the agent that matches information providers and information seekers. It has no privileged place in the system, it's just one more agent. It lives at the well-known address # "/dns/org/quicquid/broker" TODO: > brokerAddr = "/dns/org/quicquid/broker" > -- |The broker agent. > broker :: BrokerCore core => core -- ^ The BrokerCore implementation to use > -> IO () > broker brk = do > (ch,_) <- newNamedEndPoint brokerAddr > forever $ handle (\e-> errorM "Broker.broker" $ show e) $ do > (App (Str cmd) (Arr [pattern,Str ach])) <- readBody ch > case cmd of > "ask" -> ask brk pattern ach >>= writeAll > "answer" -> answer brk pattern ach >>= writeAll > _ -> error $ "Unknown command " ++ cmd > writeAll = mapM_ (\(bindings,answerChan,queryChan) -> writeAskChan answerChan bindings queryChan) > -- |Subscribe to answer a specific predicate at a specific endpoint. > newAnswerAt agentName answerS = newNamedEndPoint agentName >>= newAnswer_ answerS > -- |Subscribe to answer a specific predicate. > newAnswer :: String -- ^The predicate pattern (in string format). > -> IO ReadChan -- ^The channel where the pattern matches are sent. > newAnswer answerS = newEndPoint "/tmp" >>= newAnswer_ answerS > newAnswer_ answerS (answerCh,chName) = do > let Just answerPat = parse answerS > write brokerAddr $ App (Str "answer") (Arr [answerPat,Str chName]) > return answerCh > -- |Subscribe to receive the results of a persistent query. > newQuery :: String -- ^The query to perform, either a single predicate or a predicate conjunction (an "and"). > -> IO ReadChan > newQuery queriesS = do > (ch,chName) <- newEndPoint "/tmp" > newQuery_ (fromJust $ parse queriesS) chName > return ch > newQuery_ qs chName = write brokerAddr $ App (Str "ask") (Arr [qs,Str chName]) > readAskChan :: ReadChan -> IO (Binding, Address) > readAskChan ch = do > Arr [Obj bs,Str replyCh] <- readBody ch > return (bs,replyCh) -- was: (map2bind bs) > writeAskChan :: Address -> Binding -> String -> IO () > writeAskChan ch bindings queryCh = write ch $ Arr [bind2term $ bindings,Str queryCh] > writeQueryChan :: Address -> Binding -> IO () > writeQueryChan ch bindings = write ch (bind2term $ bindings) > bind2term :: Binding -> Term > bind2term = Obj -- . M.mapKeys (\s -> Str s) map2bind :: M.Map Term a -> M.Map String a map2bind = M.mapKeys (\(Str s) -> s)