#!/usr/bin/python """A CWM Clone using the new Semantic Web API.""" __author__ = 'Sean B. Palmer' __license__ = 'Copyright (C) 2002 Sean B. Palmer. GNU GPL 2' import sys from eep3 import api, query, ntuples import n3 daml_first = '' daml_rest = '' log_implies = '' SUBJECT, PREDICATE, OBJECT, FORMULA = 0, 1, 2, 3 def printe(*args): sys.stderr.write(' '.join(['%s' % arg for arg in args])+'\n') # General formula munging def getRules(store, formula=None): """Find all { ?x log:implies ?y } in the formula given by formula, or the root formula.""" if formula is None: return store[None].query(ntuples.parsent('?x %s ?y .' % log_implies)[0]) else: q = '?x %s ?y .' % log_implies return store[formula].query(ntuples.parsent(q)[0], 0) def makeRule(store, anteID, descID, formula=None): """Make a rule from ante and formula IDs, spewing into formula.""" ante, desc = store[anteID], store[descID] if formula is None: return (ante, desc) else: ante = api.TripleStore([api.Triple(x[:3]+[formula]) for x in ante]) desc = api.TripleStore([api.Triple(x[:3]+[formula]) for x in desc]) return (ante, desc) # List handling stuff def findLists(store, formula=None, debug=0): fq = ntuples.parsent('?x %s ?y .' % daml_first)[0] firsts = store[formula].query(fq) result = [] for triple in firsts: if isList(store, triple[0], formula=formula): if debug: print triple[0], 'is a list' result.append(triple[0]) return result def isList(store, id, formula=None): try: x = len(store[formula]._triples[daml_first][`id`]) except KeyError: x = 0 try: y = len(store[formula]._triples[daml_rest][`id`]) except KeyError: y = 0 rrq = ntuples.parsent('?x %s %s .' % (daml_rest, id))[0] # reverse rest q z = len(store[formula].query(rrq, 0)) return ((x > 0) and (y > 0) and (z == 0)) def getList(store, id, formula=None, all=0): first, lengthy = id, 1 result = ([], api.TripleStore())[all] while (lengthy > 0): x, y = [], [] # firsts and rests try: for obj in store[formula]._triples[daml_first][`first`]: x.append(api.Triple( (api.Article(first), api.Article(daml_first), api.Article(obj)) )) except KeyError: x = [] try: for obj in store[formula]._triples[daml_rest][`first`]: y.append(api.Triple( (api.Article(first), api.Article(daml_rest), api.Article(obj)) )) except KeyError: y = [] lengthx = len(x) lengthy = len(y) if not all: if (lengthx > 0): result.append(x[0][2]) if (lengthy > 0): first = y[0][2] else: if (lengthx > 0): result.append(x[0]) if (lengthy > 0): first = y[0][2] result.append(y[0]) return result def getLists(store, formula=None, debug=0): lists = findLists(store, formula, debug) if debug: print 'lists', lists result = [] for list in lists: result.append(getList(store, list, formula)) return result def genID(self): import random as r x = ''.join([r.choice('abcdefghijklmnopqrstuvwxyz') for x in ' '*5]) return api.Article('_:'+x) def makeList(bits): """Turn a Python list into a DAML-list-in-TripleStore.""" result = api.TripleStore() if len(bits) == 1: result.append(api.Triple((list, DAML_first, bits[0]))) result.append(list, DAML_rest, DAML_nil) if len(bits) > 0: first = list for i in range(len(bits)): result.append(api.Triple((first, DAML_first, bits[i]))) if i != (len(bits)-1): rest = genID() result.append(api.Triple((first, DAML_rest, rest))) first = rest result.append(api.Triple((rest, DAML_rest, DAML_nil))) return result # Specific inference stuff def ifilter(store, rule): """Give the results only - don't add to the store.""" ANTE, DESC = 0, 1 store = store[None] qres, result = query.query(store, rule[ANTE]), [] for res in qres: int = [] for t in rule[DESC]: t = list(t[:]) for p in (0, 1, 2): if (t[p].type == 'Univar') and (`t[p]` in res[1].keys()): t[p] = api.Article(res[1][`t[p]`]) int.append(t) result.append(int) return result def iapply(store, rule, formula=None): # formula is the target formula results = ifilter(store, rule) for result in results: for t in result: store.append(api.Triple(t), formula) return store def filter(store, rstore=None, DEBUG=1): # data.n3 -filter=q.n3 store=data.n3, rstore=q.n3 if rstore is None: rstore = store res = api.Store() rules = getRules(rstore) # if DEBUG: printe(rules) import bi for rule in rules: # for each rule, make a proper rule bag rule = makeRule(rstore, `rule[SUBJECT]`, `rule[OBJECT]`) results = bi.bfilter(store, rule) for result in results: for t in result: res.append(api.Triple(t), None) if DEBUG: printe('Found', len(res), 'new statements') for triple in res[None]: for article in triple: if `article` in store.formulae(): res[`article`] = store[`article`] # print ntuples.serialize(res) return res # n3.serialize(res) def apply(fn, rstore=None, DEBUG=1): store = n3.parse(open(fn, 'r').read(), '') # fn) if rstore is None: rstore = store oldlen = len(store) rules = getRules(rstore) # if DEBUG: printe(rules) for rule in rules: # for each rule, make a proper rule bag rule = makeRule(rstore, `rule[SUBJECT]`, `rule[OBJECT]`) results = ifilter(store, rule) for result in results: for t in result: store.append(api.Triple(t), None) if DEBUG: printe('Found', (len(store)-oldlen), 'new statements') print ntuples.serialize(store) def lists(fn, DEBUG=1): store = n3.parse(open(fn, 'r').read(), '') # fn) return getLists(store) def think(fn, DEBUG=1): store = n3.parse(open(fn, 'r').read(), '') # fn) oldlen, newlen = len(store), len(store) + 1 if DEBUG: printe('Length of store is:', len(store)) while (newlen > oldlen): oldlen = len(store) rules = getRules(store) for rule in rules: # for each rule, make a proper rule bag rule = makeRule(store, `rule[SUBJECT]`, `rule[OBJECT]`) store = iapply(store, rule) newlen = len(store) if DEBUG: printe('Found', (newlen-oldlen), 'new statements') print ntuples.serialize(store) if __name__=="__main__": if len(sys.argv) > 2: if sys.argv[1] in ('-think',): think(sys.argv[2]) elif sys.argv[1] in ('-filter',): store = n3.parse(open(sys.argv[2], 'r').read(), '') print n3.serialize(filter(store)) elif sys.argv[1] in ('-apply',): # store = n3.parse(open(sys.argv[2], 'r').read(), '') apply(sys.argv[2]) elif sys.argv[1] == '-test': store = n3.parse(open('data.n3', 'r').read(), '') rstore = n3.parse(open('q.n3', 'r').read(), '') filter(store, rstore) elif sys.argv[1] == '-ntriples': store = n3.parse(open(sys.argv[2], 'r').read(), '') print ntuples.serialize(store) elif sys.argv[1] == '-lists': for x in lists(sys.argv[2]): print x elif len(sys.argv) == 2: print n3.serialize(n3.parse(open(sys.argv[1], 'r').read(), '')) else: print __doc__