ワークフローを正規表現で制御する案
簡単なワークフローは状態遷移図で表すことができます(簡単でなくても気力があればできるけど)。それならば、ワークフローの設定を有限オートマトンでやれば良いんじゃないのかなと思って、ちょっと書いてみました。すぐ下にあるテストコードの一番下にあるのですが、次のようにTicketオブジェクトにチケットが誰によって、どのようなActionで変更を受けてきたか文字列を返すメソッドget_action_historyがあったとします。例としては、履歴に対応して"(user1, new) (user2, accept) (user2, resolve)"みたいな文字列を返す関数です。ワークフローはActionの正規表現で定義されています。compile関数を通して正規表現に対応したNFAが生成されます。NFAに履歴を渡してあげれば、次にとりうるワークフローに沿った状態が得られます。
def test_ticket_get_action_history(self): ticket = Ticket(1) nfa = compile("(user1, new) (user2, accept). (user3, test)(user1, resolve).+.(user4, close).") self.assertEqual([Action('user3', 'test'), Action('user4', 'close')], nfa.get_actions(ticket.get_action_history()))
ただ、現在のtracに埋め込むには、ユーザーをパーミッションに変更しないといけないでしょうし、get_actionsのインプットもアウトプットも大幅に修正する必要があるでしょう。
ソースコード
テストコード
#!/usr/bin/env python2.6 import unittest from workflow import * class WorkflowTests(unittest.TestCase): def test_action(self): a1 = Action('user1', 'new') self.assertEqual('user1', a1.user) self.assertEqual('new', a1.name) def mod_name(act): act.name = 'newname' def mod_user(act): act.user = 'newuser' self.assertRaises(AttributeError, mod_name, a1) self.assertRaises(AttributeError, mod_user, a1) self.assertEqual("Action('user1', 'new')", repr(a1)) def test_action_equals(self): self.assertEqual(Action('user1', 'new'), Action('user1', 'new')) self.assertNotEqual(Action('user1', 'new'), Action('user1', 'accept')) self.assertNotEqual(Action('user1', 'new'), Action('user2', 'new')) self.assertNotEqual(Action('user1', 'new'), Action('user2', 'accept')) def test_ticketstate_connect(self): s1 = TicketState(Action('user1', 'new')) s2 = TicketState(Action('user2', 'accept')) self.assertTrue(s1.out is None) s1.connect(s2) self.assertEqual(s2, s1.out) def test_ticketstate_transfer(self): s1 = TicketState(Action('user1', 'new')) s2 = TicketState(Action('user2', 'accept')) s1.connect(s2) nlist = [] s1.transfer(Action('name', 'action'), nlist) self.assertEqual([], nlist) s1.transfer(Action('user1', 'new'), nlist) self.assertEqual([s2], nlist) def test_split_connect(self): s1 = TicketState(Action('user1', 'new')) s2 = TicketState(Action('user2', 'accept')) s = SplitState(s1) self.assertEqual(s1, s.left) self.assertTrue(s.right is None) s.connect(s2) self.assertEqual(s2, s.right) def test_state_appendTo(self): s1 = TicketState(Action('user1', 'new')) s2 = TicketState(Action('user2', 'accept')) s3 = TicketState(Action('user3', 'resolve')) s1.connect(s2) s = SplitState(s1) s.connect(s3) clist = [] s1.appendTo(clist) self.assertEqual([s1], clist) clist = [] s.appendTo(clist) self.assertEqual([s1, s3], clist) def test_flagment_connect(self): s1 = TicketState(Action('user1', 'new')) s2 = TicketState(Action('user2', 'accept')) f1 = Fragment(s1, [s1]) f2 = Fragment(s2, [s2]) f1.connect(f2.start) self.assertEqual(s2, s1.out) def test_lexer_parse_action(self): source = "(user, action)" lexer = Lexer(source) result = lexer.parse_action() self.assertEqual(Token('action', ('user', 'action')), result) self.assertEqual(len(source), lexer.pos) def test_token(self): token = Token('action', ('user1', 'new')) self.assertEqual('action', token.type) self.assertEqual(('user1', 'new'), token.value) def test_tokenize(self): source = "(user1, new) (user2, accept)." result = Lexer(source).tokenize() self.assertEqual(Token('action', ('user1', 'new')), next(result)) self.assertEqual(Token('action', ('user2', 'accept')), next(result)) self.assertEqual(Token('operator', 'cat'), next(result)) self.assertRaises(StopIteration, next, result) def test_cat_get_actions(self): m = compile("(user1, new) (user2, accept).") self.assertEqual([], m.get_actions("(user1, accept)")) result = m.get_actions("(user1, new)") self.assertEqual([Action('user2', 'accept')], result) def test_plus_get_actions(self): m = compile("(user2, accept)+ (user3, resolve). (user4, closed).") s = "(user2, accept) " self.assertEqual(None, m.get_actions("")) self.assertEqual([], m.get_actions("(user3, resolve)")) self.assertEqual([Action('user2', 'accept'), Action('user3', 'resolve')], m.get_actions(s*1)) self.assertEqual([Action('user2', 'accept'), Action('user3', 'resolve')], m.get_actions(s*3)) self.assertEqual([Action('user2', 'accept'), Action('user3', 'resolve')], m.get_actions(s*8)) def test_star_get_actions(self): m = compile("(user2, accept)* (user3, resolve). (user4, closed).") s = "(user2, accept) " self.assertEqual(None, m.get_actions("")) self.assertEqual([Action('user4', 'closed')], m.get_actions("(user3, resolve)")) self.assertEqual([Action('user2', 'accept'), Action('user3', 'resolve')], m.get_actions(s*1)) self.assertEqual([Action('user2', 'accept'), Action('user3', 'resolve')], m.get_actions(s*3)) self.assertEqual([Action('user2', 'accept'), Action('user3', 'resolve')], m.get_actions(s*8)) def test_alt_get_actions(self): m = compile("(user1, accept) (user2, accept)| (user3, resolve).") self.assertEqual([], m.get_actions("(user3, accept)")) self.assertEqual([Action('user3', 'resolve')], m.get_actions("(user1, accept)")) self.assertEqual([Action('user3', 'resolve')], m.get_actions("(user2, accept)")) def test_quest_get_actions(self): m = compile("(user1, accept)? (user2, resolve). (user3, closed).") self.assertEqual([], m.get_actions("(user3, accept)")) self.assertEqual([Action('user2', 'resolve')], m.get_actions("(user1, accept)")) self.assertEqual([], m.get_actions("(user1, accept) "*2)) self.assertEqual([Action('user3', 'closed')], m.get_actions("(user2, resolve)")) def test_ticket_get_action_history(self): ticket = Ticket(1) m = compile("(user1, new) (user2, accept). (user3, test)(user1, resolve).+.(user4, close).") self.assertEqual([Action('user3', 'test'), Action('user4', 'close')], m.get_actions(ticket.get_action_history())) if __name__ == '__main__': unittest.main()
workflow/__init__.py
from workflow import * from lexer import *
workflow/workflow.py
import operator from lexer import * class Ticket(object): def __init__(self,id): self.id = id def get_action_history(self): return "(user1, new) (user2, accept) (user3, test) (user1, resolve)" class State(object): def appendTo(self, nlist): nlist.append(self) def transfer(self, action, nlist): pass End = State() class TicketState(object): def __init__(self, action): self.action = action self.out = None def connect(self, out): self.out = out def transfer(self, action, nlist): if self.action == action: self.out.appendTo(nlist) def appendTo(self, nlist): nlist.append(self) class SplitState(object): def __init__(self, left): self.left = left self.right = None def connect(self, right): self.right = right def appendTo(self, nlist): nlist.append(self.left) if self.right is not None: nlist.append(self.right) def compile(source): stack = [] for token in Lexer(source).tokenize(): if token.type == 'action': s = TicketState(token.value) stack.append(Fragment(s, [s])) elif token.type == 'operator': if token.value == 'cat': f2 = stack.pop() f1 = stack.pop() f1.connect(f2.start) stack.append(Fragment(f1.start, f2.out)) elif token.value == 'plus': f1 = stack.pop() s = SplitState(f1.start) f1.connect(s) stack.append(Fragment(f1.start, [s])) elif token.value == 'star': f1 = stack.pop() s = SplitState(f1.start) f1.connect(s) stack.append(Fragment(s, [s])) elif token.value == 'alt': f2 = stack.pop() f1 = stack.pop() s = SplitState(f1.start) s.connect(f2.start) out = [] out.extend(f1.out) out.extend(f2.out) stack.append(Fragment(s, out)) elif token.value == 'quest': f1 = stack.pop() s = SplitState(f1.start) out = [s] out.extend(f1.out) stack.append(Fragment(s, out)) else: raise Exception(repr(operator)) f = stack.pop() f.connect(End) assert [] == stack return NFA(f.start) class NFA(object): def __init__(self, start): self.start = start def get_actions(self, source): if source == "": return clist = [] nlist = [] self.start.appendTo(clist) for token in Lexer(source).tokenize(): assert token.type == 'action' for s in clist: s.transfer(Action(*token.value), nlist) clist = nlist nlist = [] return [x.action for x in clist if isinstance(x, TicketState)] class Action(tuple): user, name = (property(operator.itemgetter(x)) for x in range(2)) def __new__(cls, user, name): return tuple.__new__(cls, (user, name)) def __repr__(self): return "%s(%r, %r)" %(self.__class__.__name__, self.user, self.name) class Fragment(object): def __init__(self, start, out): self.start = start self.out = out def connect(self, start): for s in self.out: s.connect(start)
workflow/lexer.py
import operator import re action_re = re.compile('\(\s*([a-zA-Z][a-zA-Z0-9-_]*)\s*,\s*([a-zA-Z][a-zA-Z0-9-_]*)\s*\)') class WorkflowError(Exception): pass class Lexer(object): def __init__(self, source): self.source = source self.pos = 0 self.len = len(source) def tokenize(self): while self.len > self.pos: c = self.source[self.pos] if c in " \t\n": self.pos += 1 continue elif c == '(': yield self.parse_action() elif c == '.': self.pos += 1 yield Token('operator', 'cat') elif c == '|': self.pos += 1 yield Token('operator', 'alt') elif c == '+': self.pos += 1 yield Token('operator', 'plus') elif c == '*': self.pos += 1 yield Token('operator', 'star') elif c == '?': self.pos += 1 yield Token('operator', 'quest') else: raise WorkflowError('%s is unknown operator' % c) def parse_action(self): # '(' user ',' name ')' assert '(' == self.source[self.pos] m = action_re.match(self.source, self.pos) self.pos = m.end() return Token('action', m.groups()) class Token(tuple): type, value = (property(operator.itemgetter(x)) for x in range(2)) def __new__(cls, type, value): return tuple.__new__(cls, (type, value))