#!/usr/bin/env python """ rdfe.py - An RDF Editor That's Schema Aware. Sean B. Palmer, Date: 2003-10; License: GNU GPL 2 This editor is most useful for creating new RDF files from scratch in a console. The general idea is best illustrated with an example. If I enter: Person name "Sean B. Palmer" I expect to generate: [ rdf:type foaf:Person; foaf:name "Sean B. Palmer" ] . Whereas if I enter: Person type Class I expect to generate: foaf:Person rdf:type rdfs:Class . The role of each term is flexible, and depends upon the stuff being used. @@ * cardinality stuff """ import sys, os, re try: import readline # @@ except: readline = None import pyrple from pyrple import Graph from pyrple.namespaces import RDF, RDFS, OWL pyrple.parse.cache = '~/.pyrple/cache/' if not os.path.exists('rdfs.nt'): print >> sys.stderr, "Need rdfs.nt to be schema aware." # The following should be externally configurable... namespaces = pyrple.namespaces.all extra = ( ('rdfe', 'http://infomesh.net/2003/rdfe#'), ) for (prefix, ns) in extra: namespaces[prefix] = pyrple.namespaces.Namespace(ns) path = 'rdf:rdfs:owl:foaf:dc:rdfe' path = path.split(':') # @@ assert each path prefix is in namespaces.keys() def generateSchema(): schema = Graph() # Load each of the schemata from the namespaces mapping for prefix in namespaces.keys(): uri = str(namespaces[prefix]) try: schema.feedURI(uri) except Exception, e: print >> sys.stderr, "*** Unable to load from %s" % uri print >> sys.stderr, " Reason: %s: %s" % (type(e), e) print >> sys.stderr, "*** Schema length:", len(schema) if os.path.exists('local.nt'): s = open('local.nt').read() schema.feedNTriples(s) print >> sys.stderr, "*** Schema length:", len(schema) schema.feedNTriples(open('rdfs.nt').read()) print >> sys.stderr, "*** Schema length:", len(schema) schema.think() print >> sys.stderr, "*** Schema length:", len(schema) return schema if os.path.exists('schema.nt'): schema = Graph(ntriples=open('schema.nt').read()) else: schema = generateSchema() open('schema.nt', 'w').write(schema.serialize()) def isProperty(term): return pyrple.triple.Triple(term, RDF.type, RDF.Property) in schema def isClass(term): return pyrple.triple.Triple(term, RDF.type, RDFS.Class) in schema def getDomains(term): query = Graph(triples=[pyrple.triple.Triple( term, RDFS.domain, pyrple.Var('x') )]) rdfs = [result[pyrple.Var('x')] for result in schema.query(query)] query = Graph(triples=[pyrple.triple.Triple( term, namespaces['rdfe'].domain, pyrple.Var('x') )]) rdfe = [result[pyrple.Var('x')] for result in schema.query(query)] return rdfs + rdfe def getRanges(term): query = Graph(triples=[pyrple.triple.Triple( term, RDFS.range, pyrple.Var('x') )]) rdfs = [result[pyrple.Var('x')] for result in schema.query(query)] query = Graph(triples=[pyrple.triple.Triple( term, namespaces['rdfe'].range, pyrple.Var('x') )]) rdfe = [result[pyrple.Var('x')] for result in schema.query(query)] return rdfs + rdfe def getSubProperties(term, G=None): """Get subproperties of term.""" if G is None: G = schema query = Graph(triples=[pyrple.triple.Triple( pyrple.Var('x'), RDFS.subPropertyOf, term )]) return [result[pyrple.Var('x')] for result in G.query(query)] def disjoint(p, q): if pyrple.Triple(p, OWL.disjointWith, q) in schema: return True elif pyrple.Triple(q, OWL.disjointWith, p) in schema: return True elif pyrple.Triple(p, namespaces['rdfe'].disjointWith, q) in schema: return True elif pyrple.Triple(q, namespaces['rdfe'].disjointWith, p) in schema: return True return False def cardinality(cls, prop): Q = Graph(triples=[ # @@ check OWL properties pyrple.triple.Triple(cls, RDFS.subClassOf, pyrple.Var('R')), pyrple.triple.Triple(pyrple.Var('R'), OWL.onProperty, prop), pyrple.triple.Triple(pyrple.Var('R'), OWL.cardinality, pyrple.Var('N')) ]) result = None results = schema.query(Q) for result in results: result = int(result[pyrple.Var('N')].value) return result r_uri = re.compile(r'[^ \t"<>]+') # @@ "this" document? <>? class RDFEditor(object): """ States: (is expecting...) RES: a Resource (URI, bNode, or QName) LIT: a Literal (Literal) PROP: a Property (URI, or QName) """ def __init__(self, uris=None): self.graph = pyrple.Graph(uri=uris) self.stack = [] self.subject = None self.predicate = None self.multipleObjects = False self.replaceValue = False self.expecting = None def do(self): # print (self.subject, self.predicate) if not self.subject: self.getSubject() elif not self.predicate: self.getPredicate() else: self.getObject() def getSubject(self): self.expecting = 'RES' s = self.getInput() if not s: sys.stdout.write(self.graph.serialize()) sys.exit(0) elif s.startswith('@'): self.doCommand(s) return self.subject = self.handleResource(s) def getPredicate(self): self.expecting = 'PROP' s = self.getInput() if not s: if self.stack: self.subject = self.stack.pop() else: self.subject = None return elif s.startswith('@'): self.doCommand(s) return elif s.endswith('*'): self.multipleObjects = True s = s[:-1] prop = self.handleProperty(s) if prop: self.predicate = prop # get the domain of the predicate, and munge domains = getDomains(self.predicate) for domain in domains: if ((self.subject == domain) and disjoint(self.subject, RDFS.Class)): subj = pyrple.bNode(pyrple.util.label()) self.gotTriple(subj, RDF.type, self.subject) self.subject = subj # cardinality stuff... # @@ should also check for types of the subject if cardinality(domain, self.predicate) == 1: # find whether it's already set Q = Graph(triples=[pyrple.Triple( self.subject, self.predicate, pyrple.Var('x') )]) result = [r[pyrple.Var('x')] for r in self.graph.query(Q)] if len(result) > 1: p = self.predicate print >> sys.stderr, "*** Inconsistent! %s Used >1" % p elif len(result) == 1: # this is what we want! p = self.uriToQName(self.predicate) or self.predicate msg = "*** %s currently set to: %s" % (p, result[0]) print >> sys.stderr, msg self.replaceValue = result[0] def getObject(self): # get the ranges of the current property ranges = getRanges(self.predicate) if RDFS.Literal in ranges: self.expecting = 'LIT' else: self.expecting = 'RES' s = self.getInput() makeAnon = False if not s: self.predicate = None self.multipleObjects = False return elif s.startswith('@'): s = self.doCommand(s) if s is None: return elif (self.expecting == 'RES' and (' ' in s or '"' in s)): if s.startswith('"'): s = s[1:] if s.endswith('"'): s = s[:-1] self.expecting = 'LIT' if (self.expecting == 'RES' and s.startswith('[') and s.endswith(']')): makeAnon = True s = s[1:-1] if not s: s = RDFS.Resource.value if self.expecting == 'LIT': object = self.handleLiteral(s) else: object = self.handleResource(s) if object: # might need to make a new subject if (object in ranges) or makeAnon: subj = pyrple.bNode(pyrple.util.label()) self.foundObject(subj) self.stack.append(self.subject) self.subject = subj self.gotTriple(self.subject, RDF.type, object) else: self.foundObject(object) if not self.multipleObjects: self.predicate = None def foundObject(self, objt): if self.replaceValue: t = pyrple.Triple(self.subject, self.predicate, self.replaceValue) self.graph.remove(t) self.replaceValue = False self.gotTriple(self.subject, self.predicate, objt) def doCommand(self, s): assert s.startswith('@') s = s[1:] if ' ' in s: i = s.index(' ') command, args = s[:i], s[i+1:].split(' ') else: command, args = s, [] if command in ('s', 'subj', 'subject'): print >> sys.stderr, '*** Current subject: %r' % self.subject elif command in ('p', 'pred', 'predicate'): print >> sys.stderr, '*** Current predicate: %r' % self.predicate elif command in ('LIT', 'lit', 'Literal'): self.expecting = 'LIT' return ' '.join(args) elif command in ('RES', 'res', 'Resource'): self.expecting = 'RES' return ' '.join(args) # elif command in ('s', 'subj', 'subject'): # print >> sys.stderr, 'Current subject:', self.subject def resolveName(self, name): for prefix in path: term = namespaces[prefix][name] if isProperty(term) or isClass(term): return term else: print >> sys.stderr, "*** Not found: %s" % name def resolveQName(self, qname): i = qname.index(':') prefix, name = qname[:i], qname[i+1:] if namespaces.has_key(prefix): return namespaces[prefix][name] return None def handleResource(self, s): # s is a URI, bNode, or QName if s == '_': return pyrple.bNode(pyrple.util.label()) elif s.startswith('_:'): return pyrple.bNode(s[2:]) if ':' in s: n = self.resolveQName(s) if n: return n elif r_uri.match(s): return pyrple.URI(s) else: print >> sys.stderr, "*** Invalid URI: %s" % s else: return self.resolveName(s) def handleProperty(self, s): # a URI or a QName if ':' in s: term = self.resolveQName(s) if not term: if r_uri.match(s): term = pyrple.URI(s) else: print >> sys.stderr, "*** Invalid URI: %s" % s else: term = self.resolveName(s) if isProperty(term): return term else: raise "PropertyNotFound", term def handleLiteral(self, s): return pyrple.Literal(s) def getInput(self): p = self.makePrompt() sys.stderr.write(p) return raw_input() def uriToQName(self, uri): for prefix in namespaces.keys(): if uri.value.startswith(namespaces[prefix]): return '%s:%s' % (prefix, uri.value[len(namespaces[prefix]):]) return False def makePrompt(self): result = '' if self.stack: result += ' ' * (len(self.stack) * 3) if self.subject: # find subProperties of rdfs:label... labelProps = [RDFS.label] labelProps += getSubProperties(RDFS.label, G=(schema + self.graph)) labels = [] for labelProp in labelProps: Q = pyrple.Graph(triples=[pyrple.Triple( self.subject, labelProp, pyrple.Var('x') )]) res = [r[pyrple.Var('x')] for r in schema.query(Q)] res += [r[pyrple.Var('x')] for r in self.graph.query(Q)] # print len(schema), len(self.graph), len(all), len(res) labels.extend(res) if labels: result += '%s ' % labels[0] elif type(self.subject) is pyrple.URI: qname = self.uriToQName(self.subject) if qname: result += qname + ' ' else: result += '+ ' elif type(self.subject) is pyrple.bNode: result += '_ ' else: raise "UnknownSubjectType", `self.subject` if self.predicate: qname = self.uriToQName(self.predicate) if qname: result += qname + ' ' else: print >> sys.stderr, "*** Odd..." result += '+ ' result += '%s> ' % self.expecting return result def gotTriple(self, subj, pred, objt): # print >> sys.stderr, '* ADDED %s %s %s *' % (subj, pred, objt) triple = pyrple.Triple(subj, pred, objt) self.graph.append(triple) def run(self): while 1: self.do() def main(argv=None): if argv is None: argv = sys.argv[1:] editor = RDFEditor(uris=argv) editor.run() if __name__=="__main__": main()