Wrapping up
The rule engine is now complete with
AndRule
&OrRule
, including nested rules- Reason for the rule failures
Congratulations on implementing your own Rule engine from scratch! Maybe up next, we can implement a JSON based rule engine (where rules are in JSON instead of code).
Codebase
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Union
facts_type = dict[str, Union[str, int, float]]
@dataclass
class RuleEngineResult:
result: bool
reason: Optional[str]
class AbstractRule(ABC):
@abstractmethod
def run(self, facts: facts_type) -> RuleEngineResult:
pass
class RuleOp(Enum):
EQ = "EQ"
GTE = "GTE"
@dataclass
class Rule(AbstractRule):
fact_key: str | int
operator: RuleOp
value: str | int
reason: Optional[str]
def run(self, facts) -> RuleEngineResult:
if self.operator == RuleOp.EQ:
rule_res = facts.get(self.fact_key) == self.value
return RuleEngineResult(
result=rule_res,
reason=self.reason if not rule_res else None,
)
elif self.operator == RuleOp.GTE:
rule_res = facts.get(self.fact_key) >= self.value
return RuleEngineResult(
result=rule_res,
reason=self.reason if not rule_res else None,
)
else:
raise Exception(f"unknown operator: {self.operator}")
@dataclass
class AndRule(AbstractRule):
rules: list[AbstractRule]
reason: Optional[str]
def run(self, facts) -> RuleEngineResult:
for r in self.rules:
r_res: RuleEngineResult = r.run(facts)
if not r_res.result:
# one condition failed, short circuit
return RuleEngineResult(
result=r_res.result,
reason=r_res.reason if r_res.reason else self.reason,
)
return RuleEngineResult(result=True, reason=None)
@dataclass
class OrRule(AbstractRule):
rules: list[AbstractRule]
reason: Optional[str]
def run(self, facts) -> RuleEngineResult:
for r in self.rules:
r_res = r.run(facts)
if r_res.result:
# one condition satisfied, short circuit
return RuleEngineResult(
result=True, reason=r_res.reason if r_res.reason else self.reason
)
# nothing was true, or rule failed
return RuleEngineResult(result=False, reason=self.reason)
def run_engine(rule: AbstractRule, facts: facts_type) -> RuleEngineResult:
return rule.run(facts=facts)
if __name__ == "__main__":
USER_UNDER_AGE_RULE = Rule(
fact_key="age", operator=RuleOp.GTE, value=18, reason="user under min age"
)
USER_IN_US = Rule(fact_key="country", operator=RuleOp.EQ, value="US", reason=None)
USER_IN_IN = OrRule(
[
Rule(fact_key="country", operator=RuleOp.EQ, value="IN", reason=None),
Rule(fact_key="country", operator=RuleOp.EQ, value="india", reason=None),
],
reason=None,
)
USER_IN_ALLOWED_COUNTRIES = OrRule(
[USER_IN_IN, USER_IN_US], reason="user not in allowed countries"
)
PRODUCT_ELIGIBILITY = AndRule(
[
USER_UNDER_AGE_RULE,
USER_IN_ALLOWED_COUNTRIES,
],
reason="user ineligible",
)
print(run_engine(rule=PRODUCT_ELIGIBILITY, facts={"age": 18, "country": "RU"})) # RuleEngineResult(result=False, reason='user not in allowed countries')
print(run_engine(rule=PRODUCT_ELIGIBILITY, facts={"age": 18, "country": "IN"})) # RuleEngineResult(result=True, reason=None)