#!/usr/bin/python """ SquishQL Parser Sean B. Palmer, , 2003-06 GPL 2. Share and Enjoy! """ import sys, re # Tokenization stuff NumOps = ('==', '=', '!=', '<=', '>=', '>', '<') StringOps = ('like', 'ne', 'eq', '~') keywords = ['SELECT', 'FROM', 'WHERE', 'AND', 'USING', 'FOR'] keywords += NumOps keywords += StringOps Identifier = r'[A-Za-z][A-Za-z0-9]*' Var = r'\?' + Identifier NumericLiteral = r'(?:[-+]?[0-9]+)(?:\.[0-9]+)?(?:e[-+]?[0-9]+)?' UriLiteral = r'[A-Za-z][A-Za-z0-9+.-]*:[^\r\n\f\t <>")]+' TextLiteral = r"'[^'\\]*(?:\\.[^'\\]*)*'" Literal = '|'.join([TextLiteral, UriLiteral, NumericLiteral]) VarOrLiteral = '|'.join([Var, Literal]) Token = re.compile('(?i)(%s)' % '|'.join([ '|'.join(keywords), VarOrLiteral, Identifier, ',', r'\(', r'\)', r'\Z' ])) # Datatyping of tokens def compile(pattern): return re.compile(r'^(?i)%s$' % pattern) SELECT = compile('SELECT') FROM = compile('FROM') WHERE = compile('WHERE') AND = compile('AND') USING = compile('USING') FOR = compile('FOR') IDENTIFIER = compile(Identifier) VAR = compile(Var) NUMERIC_LITERAL = compile(NumericLiteral) URI_LITERAL = compile(UriLiteral) TEXT_LITERAL = compile(TextLiteral) LITERAL = compile(Literal) VAR_OR_LITERAL = compile(VarOrLiteral) # The parser class class SquishQLParser(dict): def __init__(self, s=None): dict.__init__(self) self.tokens = [] self.pos = 0 self['SELECT'] = [] # vars self['FROM'] = [] # URIs self['WHERE'] = [[], {}] # triples and constraints self['USING'] = {} # prefix to URI mappings if s: self.feed(s) def readToken(self): self.pos += 1 return self.tokens[self.pos - 1] def feed(self, s): self.tokens.extend(Token.findall(s)) def document(self): self.query() assert self.readToken() == '', "No EOF? How marvellously odd" def query(self): self.selectClause() if FROM.match(self.tokens[self.pos]): self.fromClause() self.triplePatternClause() if AND.match(self.tokens[self.pos]): self.constraintClause() if USING.match(self.tokens[self.pos]): self.usingClause() def selectClause(self): assert SELECT.match(self.readToken()) self['SELECT'] += self.varList() def varList(self): vars = [] assert VAR.match(self.tokens[self.pos]) vars.append(self.readToken()) while ((self.tokens[self.pos] == ',') or VAR.match(self.tokens[self.pos])): if self.tokens[self.pos] == ',': assert self.readToken() == ',' if VAR.match(self.tokens[self.pos]): vars.append(self.readToken()) return vars def fromClause(self): assert FROM.match(self.readToken()) self['FROM'] += self.uriList() def uriList(self): uris = [] assert URI_LITERAL.match(self.tokens[self.pos]) uris.append(self.readToken()) while ((self.tokens[self.pos] == ',') or URI_LITERAL.match(self.tokens[self.pos])): if self.tokens[self.pos] == ',': assert self.readToken() == ',' if URI_LITERAL.match(self.tokens[self.pos]): uris.append(self.readToken()) return uris def triplePatternClause(self): assert WHERE.match(self.readToken()) self['WHERE'][0] += self.triplePatternList() def triplePatternList(self): triples = [] triples.append(self.triplePattern()) while self.tokens[self.pos] == '(': triples.append(self.triplePattern()) return triples def triplePattern(self): assert self.readToken() == '(' triple = (self.varOrLiteral(), self.varOrLiteral(), self.varOrLiteral()) assert self.readToken() == ')' return triple def varOrLiteral(self): vol = self.readToken() assert VAR_OR_LITERAL.match(vol) return vol def constraintClause(self): assert AND.match(self.readToken()) self.constraintList() def constraintList(self): self.expression() while AND.match(self.tokens[self.pos]): assert AND.match(self.readToken()) self.expression() def expression(self): var = self.readToken() assert VAR.match(var) self['WHERE'][1][var] = self.someFunction() def someFunction(self): if self.tokens[self.pos] in NumOps: expr = self.numExpression() elif self.tokens[self.pos] in StringOps: expr = self.stringExpression() else: raise "ExpectedOperator", self.tokens[self.pos] return expr def numExpression(self): operator = self.readToken() assert operator in NumOps numlit = self.readToken() assert NUMERIC_LITERAL.match(numlit) return (operator, numlit) def stringExpression(self): operator = self.readToken() assert operator in StringOps literal = self.readToken() assert LITERAL.match(literal) return (operator, literal) def usingClause(self): assert USING.match(self.readToken()) prefix, uri = self.forList() self['USING'][prefix] = uri while IDENTIFIER.match(self.tokens[self.pos]): prefix, uri = self.forList() self['USING'][prefix] = uri def forList(self): prefix = self.readToken() assert IDENTIFIER.match(prefix) assert FOR.match(self.readToken()) uri = self.readToken() assert URI_LITERAL.match(uri) return (prefix, uri) def qnameToURI(qname, using): if URI_LITERAL.match(qname): i = qname.find(':') prefix, name = qname[:i], qname[i+1:] if using.has_key(prefix): return using[prefix] + name else: return qname # it's a URI else: return qname # it's neither a qname nor a URI def normalizeTriples(triples, using): return [tuple([qnameToURI(term, using) for term in triple]) for triple in triples] def toNTripleTerm(term): result = None if VAR.match(term): result = term elif URI_LITERAL.match(term): result = '<%s>' % term elif NUMERIC_LITERAL.match(term): result = '"%s"' % term elif TEXT_LITERAL.match(term): term = term.replace("\\'", "'") # unescape apostrophes term = re.sub(r'([^\\])"', '\g<0>\"', term) # escape quote marks result = '"%s"' % term else: raise "OddTerm", term return result def toNTriple(triple): assert len(triple) == 3 triple = [triple[1], triple[0], triple[2]] return tuple([toNTripleTerm(term) for term in triple]) def toNTriples(triples, using): norm = normalizeTriples(triples, using) return [toNTriple(triple) for triple in norm] def parse(s): p = SquishQLParser() p.feed(s) p.document() return p def parseURI(uri): import urllib s = urllib.urlopen(uri).read() return parse(s) def test(s): print >> sys.stderr, "input length: %s bytes" % len(s) p = parse(s) print 'Tokens:', p.tokens print 'SELECT', p['SELECT'] print 'FROM', p['FROM'] print 'WHERE (triples)', p['WHERE'][0] print 'Normalized:', normalizeTriples(p['WHERE'][0], p['USING']) print 'WHERE (constraints)', p['WHERE'][1] print 'USING', p['USING'] def main(argv): if len(argv) > 0: for fn in argv: print >> sys.stderr, "Test parsing file %s" % fn test(open(fn).read()) else: print __doc__ if __name__=="__main__": main(sys.argv[1:])