#!/usr/bin/python """Eep Notation3 Parser cf. """ __author__ = 'Sean B. Palmer' __license__ = 'Copyright (C) 2002-02 Sean B. Palmer. GNU GPL 2' import re, eep def group(*n): return '(%s)' % '|'.join(n) Name = r'[A-Za-z0-9_]+' URI = r'<[^ >]*>' bNode = r'_:' + Name Univar = r'\?' + Name QName = r'(?:[A-Za-z][A-Za-z0-9_]*)?:' + Name Literal = r'"[^"\\]*(?:\\.[^"\\]*)*"' # r'"(?:\\"|[^"])*"' LLiteral = r'"""[^"\\]*(?:(?:\\.|"(?!""))[^"\\]*)*"""[^\"]' Prefix = r'(?:[A-Za-z][A-Za-z0-9_]*)?:' PrefixDecl = r'@prefix' WS = r'[ \t]' RDF_NS = 'http://www.w3.org/1999/02/22-rdf-syntax-ns#' # for 'a' keyword DAML_NS = 'http://www.daml.org/2001/03/daml+oil#' # for '=' keyword Tokens = group(LLiteral, URI, Literal, PrefixDecl, QName, bNode, Prefix, Univar, 'a', '{', '}', '\(', '\)', '\[', '\]', ',', ';', '\.', WS, '\n') Token = re.compile(Tokens, re.S) # General list processing functions def filterWs(list): """Filter whitespace from a list.""" return filter(lambda x: x not in (' ', '\t', '\n', '\r'), list) def getSpan(list, start, end): """e.g. getSpan(['p', 'q', 'r'], 1, 2) gives (['q'], ['p', 'r'])""" rest, part = [], list[start:end] rest.extend(list[0:start]) rest.extend(list[end:len(list)]) return part, rest def listify(list, start, end): part, list = getSpan(list, start, end) lit = ' '.join(part).replace('\n', '\\n') repl = re.compile(r'[^\\]"', re.S).findall(lit) for r in repl: lit = lit.replace(r, r[0]+'\\"') list.insert(start, '"'+lit+'"') # rest.insert(start, part) return list def posns(list, item): """Gets all positions of an item in a list, plus the total length.""" p, list, offset = [], list[:], 0 while item in list: p.append((list.index(item)+offset)) list.remove(item) offset += 1 p.append((len(list)+offset)) return p # N3 Related Functions def notComment(s): N3Comment = re.compile(r'([ \t]*\#[^\n]*)', re.S) if N3Comment.match(s): return '' else: return s def toke(s): """Notation3 tokenizer. Takes in a string, returns a raw token list.""" if len(s) == 0: raise 'Document has no content' s = '\n'.join([notComment(line) for line in s.replace('\r\n', '\n').replace('\r', '\n').split('\n')]).strip() return Token.findall(s) def getPrefixes(list): prefixes, ns, name = {}, 1, 2 while '@prefix' in list: pos = list.index('@prefix') binding, list = getSpan(list, pos, (pos+4)) # processes the prefix tokens prefixes[binding[ns][:-1]] = binding[name][1:-1] return prefixes, list def applyStuff(prefixes, list): while 'a' in list: list[list.index('a')] = '<%stype>' % RDF_NS while '=' in list: list[list.index('=')] = '<%sequivalentTo>' % DAML_NS while 'this' in list: list[list.index('this')] = '' # ugh for i in range(len(list)): if list[i][0] not in '<_"?.;,{}[]()': ns, name = list[i].split(':') if ns in prefixes.keys(): list[i] = '<'+prefixes[ns]+name+'>' else: raise "Prefix not declared:", ns elif list[i][0] == '"': # Congratulations - it's a literal! if list[i][:3] == '"""': if list[i][-4:-1] == '"""': # A big literal... lit = list[i][3:-4].replace('\n', '\\n') repl = re.compile(r'[^\\]"', re.S).findall(lit) for r in repl: lit = lit.replace(r, r[0]+'\\"') list[i] = '"'+lit+'"' else: raise "Incorrect string formatting"+list[i][-3:] elif '\n' in list[i]: raise "Newline in literal "+list[i] return list def getStatements(list): statements = [] while '.' in list: statement, list = getSpan(list, 0, (list.index('.')+1)) statements.append(statement[:-1]) return statements def getPovs(list): povs = [] while ';' in list: pos = posns(list, ';')[0:2] pov, list = getSpan(list, pos[0], pos[1]) povs.append(pov[1:]) return list, povs def getObjs(list): objs = [] while ',' in list: pos = list.index(',') obj, list = getSpan(list, pos, (pos+2)) objs.append(obj[1]) return list, objs def statementize(list): if len(list) == 3: return [list] elif len(list) < 3: raise "Error: statement too short!" else: (spo, po), all = getPovs(list), [] subject = spo[0] for pop in po: myPo, obj = getObjs(pop) predicate = myPo[0] all.append([subject, predicate, myPo[1]]) for x in obj: all.append([subject, predicate, x]) spo, objs = getObjs(spo) subject, predicate = spo[0], spo[1] all.append([subject, predicate, spo[2]]) for obj in objs: all.append([subject, predicate, obj]) return all def doLists(list, schar, echar): """Get lists from an N3 token stream, and convert them into literals.""" while schar in list: ndict, nestingLevel, biggest = {}, 0, 0 for i in range(len(list)): if list[i] == schar: nestingLevel += 1 if nestingLevel not in ndict.keys(): ndict[nestingLevel] = [[i]] else: ndict[nestingLevel].append([i]) elif list[i] == echar: if nestingLevel not in ndict.keys(): ndict[nestingLevel] = [i] else: ndict[nestingLevel][len(ndict[nestingLevel])-1].append(i) nestingLevel = nestingLevel - 1 # elif type(list[i]) == type([]): # list[i] = doLists(list[i], schar, echar) for key in ndict.keys(): if key > biggest: biggest = key tol = ndict[biggest][0] list = listify(list, tol[0], (tol[1]+1)) return list def listStuff(list): # y, z = zip(['[', ']'], ['{', '}'], ['(', ')']) # return map(doLists, [list, list, list], y, z).pop() list = doLists(list, '[', ']') list = doLists(list, '{', '}') return doLists(list, '(', ')') def n3tolist(s): """Convert an N3 string into a list of triples as strings.""" result = [] t = filterWs(toke(s)) # tokenize the stream, and filter whitespace tokens prefixes, t = getPrefixes(t) # get the prefix directives, and add to a dict t = applyStuff(prefixes, t) # apply prefixes, keywords, and string formatting t = listStuff(t) # apply list stuff: todo t = getStatements(t) # get all of the "statements" from the stream for x in [statementize(stat) for stat in t]: for y in x: result.append(y) return result def parse(s): """Get a string, tokenize, create list, convert to Eep store.""" return [[eep.Article(t[0]), eep.Article(t[1]), eep.Article(t[2])] for t in n3tolist(s)] def n3tont(s): """Convert Notation3 into NTriples.""" return eep.serialize(parse(s)) def test(): import urllib2 print n3tont(urllib2.urlopen('http://www.w3.org/2001/03/earl/0.95.n3').read()) if __name__=="__main__": import sys, urllib2 if len(sys.argv) < 2: print __doc__ elif sys.argv[1][:5] == 'http:': print n3tont(urllib2.urlopen(sys.argv[1]).read()) else: print n3tont(open(sys.argv[1], 'r').read())