import json import datetime # import ijson # import ijson.backends.yajl2_cffi as ijson class Rule(object): """docstring for Rule.""" def __init__(self, rule_line,lineno): super(Rule, self).__init__() self.pattern,constraint = [x.strip() for x in rule_line.split("->")] space_sep = constraint.split() (self.value_type,op),const = space_sep[:2]," ".join(space_sep[2:]) self.const = get_const_for_type(self.value_type,const) self.neg = True if op[0] == "!" else False self.op = op[1:] if self.neg else op self.lineno = lineno def __repr__(self): neg_val = "not" if self.neg else "" rule_val = [self.pattern,self.value_type,neg_val,self.op,self.const] return "{}:{} {} {} {}".format(*rule_val) def apply_neg(self,x): return not x if self.neg else x def apply(self,signal): if signal.value_type != self.value_type: return True if self.op == ">": return self.apply_neg(signal.value > self.const) if self.op == "<": return self.apply_neg(signal.value < self.const) if self.op == "=": return self.apply_neg(signal.value == self.const) return True @classmethod def rule_engine_from(cls,rule_str): return Rule.rule_engine_gen([cls(rule_str,1)]) @staticmethod def rule_engine_gen(rule_list): pattern_dict = {} for r in rule_list: if not pattern_dict.has_key(r.pattern): pattern_dict[r.pattern] = [r] else: pattern_dict[r.pattern].append(r) def check_rules(signal): if pattern_dict.has_key(signal.signal): for rule in pattern_dict[signal.signal]: if not rule.apply(signal): return (False,rule) return (True,None) return check_rules @classmethod def read_rules(cls,filename): rule_list = [] with open(filename,"r") as rule_file: lineno = 1 for line in rule_file.readlines(): rulestr = line.strip() if rulestr != "" and not rulestr.startswith("#"): rule_list.append(cls(rulestr,lineno)) lineno+=1 return Rule.rule_engine_gen(rule_list) date_parse = lambda x : datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S") def get_const_for_type(value_type,value_data): if value_type == "Integer": value = int(float(value_data)) elif value_type == "String": value = str(value_data) elif value_type == "Datetime": if value_data == "now": value = datetime.datetime.now() else: value = date_parse(value_data) else: value = "invalid" return value class SignalItem(object): """docstring for SignalItem.""" def __init__(self, sig_dict): super(SignalItem, self).__init__() self.signal = sig_dict["signal"] self.value_type = sig_dict["value_type"] self.value = get_const_for_type(self.value_type,sig_dict["value"]) def __repr__(self): return self.signal+":"+str(self.value) def load_sigs(filename): with open(filename,"r") as sig_file: return json.load(sig_file) # def load_isigs(filename): # with open(filename, 'r') as fd: # return [o for o in ijson.items(fd,'item')] def validate_sigs(signals,rule_engine): invalids = [] index = 0 for sig in signals: sig_obj = SignalItem(sig) index+=1 status,rule = rule_engine(sig_obj) if not status: invalids.append((sig_obj,rule,index)) return invalids def main(): check_rules = Rule.read_rules("./rules.txt") signals = load_sigs("./raw_data.json") invalids = validate_sigs(signals,check_rules) invalid_idx = [i[2] for i in invalids] print invalid_idx for sig_obj,rule,index in invalids: print index,sig_obj," Cause Rule ",rule.lineno if __name__ == '__main__': main()