quartic-rule-py/quartic_rule.py

112 lines
3.6 KiB
Python

import json
import datetime
class Rule(object):
"""docstring for Rule."""
def __init__(self, rule_line,lineno):
super(Rule, self).__init__()
self.pattern,constraint = [x.strip() for x in rule_line.split("->")]
space_sep = constraint.split()
(self.value_type,op),const = space_sep[:2]," ".join(space_sep[2:])
self.const = get_const_for_type(self.value_type,const)
self.neg = True if op[0] == "!" else False
self.op = op[1:] if self.neg else op
self.lineno = lineno
def __repr__(self):
neg_val = "not" if self.neg else ""
rule_val = [self.pattern,self.value_type,neg_val,self.op,self.const]
return "{}:{} {} {} {}".format(*rule_val)
def apply_neg(self,x):
return not x if self.neg else x
def apply(self,signal):
if signal.signal == self.pattern:
if signal.value_type != self.value_type:
return True
if self.op == ">":
return self.apply_neg(signal.value > self.const)
if self.op == "<":
return self.apply_neg(signal.value < self.const)
if self.op == "=":
return self.apply_neg(signal.value == self.const)
return True
@classmethod
def rule_engine_from(cls,rule_str):
return Rule.rule_engine_gen([cls(rule_str,1)])
@staticmethod
def rule_engine_gen(rule_list):
def check_rules(signal):
for rule in rule_list:
if not rule.apply(signal):
return (False,rule)
return (True,None)
return check_rules
@classmethod
def read_rules(cls,filename):
rule_list = []
with open(filename,"r") as rule_file:
lineno = 1
for line in rule_file.readlines():
rulestr = line.strip()
if rulestr != "" and not rulestr.startswith("#"):
rule_list.append(cls(rulestr,lineno))
lineno+=1
return Rule.rule_engine_gen(rule_list)
date_parse = lambda x : datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S")
def get_const_for_type(value_type,value_data):
if value_type == "Integer":
value = int(float(value_data))
elif value_type == "String":
value = str(value_data)
elif value_type == "Datetime":
if value_data == "now":
value = datetime.datetime.now()
else:
value = date_parse(value_data)
else:
value = "invalid"
return value
class SignalItem(object):
"""docstring for SignalItem."""
def __init__(self, sig_dict):
super(SignalItem, self).__init__()
self.signal = sig_dict["signal"]
self.value_type = sig_dict["value_type"]
self.value = get_const_for_type(self.value_type,sig_dict["value"])
def __repr__(self):
return self.signal+":"+str(self.value)
def load_sigs(filename):
with open(filename,"r") as sig_file:
return json.load(sig_file)
def validate_sigs(signals,rule_engine):
invalids = []
index = 0
for sig in signals:
sig_obj = SignalItem(sig)
index+=1
status,rule = rule_engine(sig_obj)
if not status:
invalids.append((sig_obj,rule,index))
return invalids
def main():
check_rules = Rule.read_rules("./rules.txt")
signals = load_sigs("./raw_data.json")
invalids = validate_sigs(signals,check_rules)
invalid_idx = [i[2] for i in invalids]
print invalid_idx
for sig_obj,rule,index in invalids:
print index,sig_obj," Cause Rule ",rule.lineno
if __name__ == '__main__':
main()