🎉 JSON parser from scratch chapter is now out!
Rule engine
Wrapping up the rule engine

Wrapping up

The rule engine is now complete with

  • AndRule & OrRule, including nested rules
  • Reason for the rule failures

Congratulations on implementing your own Rule engine from scratch! Maybe up next, we can implement a JSON based rule engine (where rules are in JSON instead of code).

Codebase

from __future__ import annotations
 
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Union
 
facts_type = dict[str, Union[str, int, float]]
 
 
@dataclass
class RuleEngineResult:
    result: bool
    reason: Optional[str]
 
 
class AbstractRule(ABC):
    @abstractmethod
    def run(self, facts: facts_type) -> RuleEngineResult:
        pass
 
 
class RuleOp(Enum):
    EQ = "EQ"
    GTE = "GTE"
 
 
@dataclass
class Rule(AbstractRule):
    fact_key: str | int
    operator: RuleOp
    value: str | int
    reason: Optional[str]
 
    def run(self, facts) -> RuleEngineResult:
        if self.operator == RuleOp.EQ:
            rule_res = facts.get(self.fact_key) == self.value
            return RuleEngineResult(
                result=rule_res,
                reason=self.reason if not rule_res else None,
            )
        elif self.operator == RuleOp.GTE:
            rule_res = facts.get(self.fact_key) >= self.value
            return RuleEngineResult(
                result=rule_res,
                reason=self.reason if not rule_res else None,
            )
        else:
            raise Exception(f"unknown operator: {self.operator}")
 
 
@dataclass
class AndRule(AbstractRule):
    rules: list[AbstractRule]
    reason: Optional[str]
 
    def run(self, facts) -> RuleEngineResult:
        for r in self.rules:
            r_res: RuleEngineResult = r.run(facts)
            if not r_res.result:
                # one condition failed, short circuit
                return RuleEngineResult(
                    result=r_res.result,
                    reason=r_res.reason if r_res.reason else self.reason,
                )
        return RuleEngineResult(result=True, reason=None)
 
 
@dataclass
class OrRule(AbstractRule):
    rules: list[AbstractRule]
    reason: Optional[str]
 
    def run(self, facts) -> RuleEngineResult:
        for r in self.rules:
            r_res = r.run(facts)
            if r_res.result:
                # one condition satisfied, short circuit
                return RuleEngineResult(
                    result=True, reason=r_res.reason if r_res.reason else self.reason
                )
        # nothing was true, or rule failed
        return RuleEngineResult(result=False, reason=self.reason)
 
 
def run_engine(rule: AbstractRule, facts: facts_type) -> RuleEngineResult:
    return rule.run(facts=facts)
 
 
if __name__ == "__main__":
    USER_UNDER_AGE_RULE = Rule(
        fact_key="age", operator=RuleOp.GTE, value=18, reason="user under min age"
    )
    USER_IN_US = Rule(fact_key="country", operator=RuleOp.EQ, value="US", reason=None)
    USER_IN_IN = OrRule(
        [
            Rule(fact_key="country", operator=RuleOp.EQ, value="IN", reason=None),
            Rule(fact_key="country", operator=RuleOp.EQ, value="india", reason=None),
        ],
        reason=None,
    )
    USER_IN_ALLOWED_COUNTRIES = OrRule(
        [USER_IN_IN, USER_IN_US], reason="user not in allowed countries"
    )
 
    PRODUCT_ELIGIBILITY = AndRule(
        [
            USER_UNDER_AGE_RULE,
            USER_IN_ALLOWED_COUNTRIES,
        ],
        reason="user ineligible",
    )
 
    print(run_engine(rule=PRODUCT_ELIGIBILITY, facts={"age": 18, "country": "RU"})) # RuleEngineResult(result=False, reason='user not in allowed countries')
    print(run_engine(rule=PRODUCT_ELIGIBILITY, facts={"age": 18, "country": "IN"})) # RuleEngineResult(result=True, reason=None)