読者です 読者をやめる 読者になる 読者になる

ワークフローを正規表現で制御する案

簡単なワークフローは状態遷移図で表すことができます(簡単でなくても気力があればできるけど)。それならば、ワークフローの設定を有限オートマトンでやれば良いんじゃないのかなと思って、ちょっと書いてみました。すぐ下にあるテストコードの一番下にあるのですが、次のようにTicketオブジェクトにチケットが誰によって、どのようなActionで変更を受けてきたか文字列を返すメソッドget_action_historyがあったとします。例としては、履歴に対応して"(user1, new) (user2, accept) (user2, resolve)"みたいな文字列を返す関数です。ワークフローはActionの正規表現で定義されています。compile関数を通して正規表現に対応したNFAが生成されます。NFAに履歴を渡してあげれば、次にとりうるワークフローに沿った状態が得られます。

    def test_ticket_get_action_history(self):
        ticket = Ticket(1)
        nfa = compile("(user1, new) (user2, accept). (user3, test)(user1, resolve).+.(user4, close).")
        self.assertEqual([Action('user3', 'test'), Action('user4', 'close')], 
        nfa.get_actions(ticket.get_action_history()))

ただ、現在のtracに埋め込むには、ユーザーをパーミッションに変更しないといけないでしょうし、get_actionsのインプットもアウトプットも大幅に修正する必要があるでしょう。

ソースコード


テストコード

#!/usr/bin/env python2.6
import unittest
from workflow import *
class WorkflowTests(unittest.TestCase):
    def test_action(self):
        a1 = Action('user1', 'new')
        self.assertEqual('user1', a1.user)
        self.assertEqual('new', a1.name)
        def mod_name(act):
            act.name = 'newname'
        def mod_user(act):
            act.user = 'newuser'
        self.assertRaises(AttributeError, mod_name, a1)
        self.assertRaises(AttributeError, mod_user, a1)
        self.assertEqual("Action('user1', 'new')", repr(a1))
    def test_action_equals(self):
        self.assertEqual(Action('user1', 'new'), Action('user1', 'new'))
        self.assertNotEqual(Action('user1', 'new'), Action('user1', 'accept'))
        self.assertNotEqual(Action('user1', 'new'), Action('user2', 'new'))
        self.assertNotEqual(Action('user1', 'new'), Action('user2', 'accept'))
    def test_ticketstate_connect(self):        
        s1 = TicketState(Action('user1', 'new'))
        s2 = TicketState(Action('user2', 'accept'))
        self.assertTrue(s1.out is None)
        s1.connect(s2)
        self.assertEqual(s2, s1.out)
    def test_ticketstate_transfer(self):
        s1 = TicketState(Action('user1', 'new'))
        s2 = TicketState(Action('user2', 'accept'))
        s1.connect(s2)
        nlist = []
        s1.transfer(Action('name', 'action'), nlist)
        self.assertEqual([], nlist)
        s1.transfer(Action('user1', 'new'), nlist)
        self.assertEqual([s2], nlist)
    def test_split_connect(self):
        s1 = TicketState(Action('user1', 'new'))
        s2 = TicketState(Action('user2', 'accept'))
        s = SplitState(s1)
        self.assertEqual(s1, s.left)
        self.assertTrue(s.right is None)
        s.connect(s2)
        self.assertEqual(s2, s.right)
    def test_state_appendTo(self):
        s1 = TicketState(Action('user1', 'new'))
        s2 = TicketState(Action('user2', 'accept'))
        s3 = TicketState(Action('user3', 'resolve'))
        s1.connect(s2)
        s = SplitState(s1)
        s.connect(s3)
        clist = []
        s1.appendTo(clist)
        self.assertEqual([s1], clist)
        clist = []
        s.appendTo(clist)
        self.assertEqual([s1, s3], clist)
    def test_flagment_connect(self):
        s1 = TicketState(Action('user1', 'new'))
        s2 = TicketState(Action('user2', 'accept'))
        f1 = Fragment(s1, [s1])
        f2 = Fragment(s2, [s2])
        f1.connect(f2.start)
        self.assertEqual(s2, s1.out)
    def test_lexer_parse_action(self):
        source = "(user, action)"
        lexer = Lexer(source)
        result = lexer.parse_action()
        self.assertEqual(Token('action', ('user', 'action')), result)
        self.assertEqual(len(source), lexer.pos)
    def test_token(self):
        token = Token('action', ('user1', 'new'))
        self.assertEqual('action', token.type)
        self.assertEqual(('user1', 'new'), token.value)
    def test_tokenize(self):
        source = "(user1, new) (user2, accept)."
        result = Lexer(source).tokenize()
        self.assertEqual(Token('action', ('user1', 'new')), next(result))
        self.assertEqual(Token('action', ('user2', 'accept')), next(result))
        self.assertEqual(Token('operator', 'cat'), next(result))
        self.assertRaises(StopIteration, next, result)
    def test_cat_get_actions(self):
        m = compile("(user1, new) (user2, accept).")
        self.assertEqual([], m.get_actions("(user1, accept)"))
        result = m.get_actions("(user1, new)")
        self.assertEqual([Action('user2', 'accept')], result)
    def test_plus_get_actions(self):
        m = compile("(user2, accept)+ (user3, resolve). (user4, closed).")
        s = "(user2, accept) "
        self.assertEqual(None, m.get_actions(""))
        self.assertEqual([], m.get_actions("(user3, resolve)"))
        self.assertEqual([Action('user2', 'accept'), Action('user3', 'resolve')], m.get_actions(s*1))
        self.assertEqual([Action('user2', 'accept'), Action('user3', 'resolve')], m.get_actions(s*3))
        self.assertEqual([Action('user2', 'accept'), Action('user3', 'resolve')], m.get_actions(s*8))
    def test_star_get_actions(self):
        m = compile("(user2, accept)* (user3, resolve). (user4, closed).")
        s = "(user2, accept) "
        self.assertEqual(None, m.get_actions(""))
        self.assertEqual([Action('user4', 'closed')], m.get_actions("(user3, resolve)"))
        self.assertEqual([Action('user2', 'accept'), Action('user3', 'resolve')], m.get_actions(s*1))
        self.assertEqual([Action('user2', 'accept'), Action('user3', 'resolve')], m.get_actions(s*3))
        self.assertEqual([Action('user2', 'accept'), Action('user3', 'resolve')], m.get_actions(s*8)) 
    def test_alt_get_actions(self):
        m = compile("(user1, accept) (user2, accept)| (user3, resolve).")
        self.assertEqual([], m.get_actions("(user3, accept)"))
        self.assertEqual([Action('user3', 'resolve')], m.get_actions("(user1, accept)"))
        self.assertEqual([Action('user3', 'resolve')], m.get_actions("(user2, accept)")) 
    def test_quest_get_actions(self):
        m = compile("(user1, accept)? (user2, resolve). (user3, closed).")
        self.assertEqual([], m.get_actions("(user3, accept)"))
        self.assertEqual([Action('user2', 'resolve')], m.get_actions("(user1, accept)"))
        self.assertEqual([], m.get_actions("(user1, accept) "*2))
        self.assertEqual([Action('user3', 'closed')], m.get_actions("(user2, resolve)")) 
    def test_ticket_get_action_history(self):
        ticket = Ticket(1)
        m = compile("(user1, new) (user2, accept). (user3, test)(user1, resolve).+.(user4, close).")
        self.assertEqual([Action('user3', 'test'), Action('user4', 'close')], 
        m.get_actions(ticket.get_action_history()))
if __name__ == '__main__':
    unittest.main()

workflow/__init__.py

from workflow import *
from lexer import *

workflow/workflow.py

import operator
from lexer import *
class Ticket(object):
    def __init__(self,id):
        self.id = id
    def get_action_history(self):
        return "(user1, new) (user2, accept) (user3, test) (user1, resolve)"
class State(object):
    def appendTo(self, nlist):
        nlist.append(self)
    def transfer(self, action, nlist):
        pass
End = State()
class TicketState(object):
    def __init__(self, action):
        self.action = action
        self.out = None
    def connect(self, out):
        self.out = out
    def transfer(self, action, nlist):
        if self.action == action:
            self.out.appendTo(nlist)
    def appendTo(self, nlist):
        nlist.append(self)
class SplitState(object):
    def __init__(self, left):
        self.left = left
        self.right = None
    def connect(self, right):
        self.right = right
    def appendTo(self, nlist):
        nlist.append(self.left)
        if self.right is not None:
            nlist.append(self.right)

def compile(source):
    stack = []
    for token in Lexer(source).tokenize():
        if token.type == 'action':
            s = TicketState(token.value)
            stack.append(Fragment(s, [s]))
        elif token.type == 'operator':
            if token.value == 'cat':
                f2 = stack.pop()
                f1 = stack.pop()
                f1.connect(f2.start)           
                stack.append(Fragment(f1.start, f2.out))
            elif token.value == 'plus':
                f1 = stack.pop()
                s = SplitState(f1.start)
                f1.connect(s)
                stack.append(Fragment(f1.start, [s])) 
            elif token.value == 'star':
                f1 = stack.pop()
                s = SplitState(f1.start)
                f1.connect(s)
                stack.append(Fragment(s, [s]))
            elif token.value == 'alt':
                f2 = stack.pop()
                f1 = stack.pop()
                s = SplitState(f1.start)
                s.connect(f2.start)
                out = []
                out.extend(f1.out)
                out.extend(f2.out)
                stack.append(Fragment(s, out))
            elif token.value == 'quest':
                f1 = stack.pop()
                s = SplitState(f1.start)
                out = [s]
                out.extend(f1.out)
                stack.append(Fragment(s, out))
        else:
            raise Exception(repr(operator))
    f = stack.pop()
    f.connect(End)
    assert [] == stack
    return NFA(f.start)
class NFA(object):
    def __init__(self, start):
        self.start = start
    def get_actions(self, source):
        if source == "":
            return
        clist = []
        nlist = []
        self.start.appendTo(clist)
        for token in Lexer(source).tokenize():
            assert token.type == 'action'
            for s in clist:
                s.transfer(Action(*token.value), nlist)
            clist = nlist
            nlist = []
        return [x.action for x in clist if isinstance(x, TicketState)]
class Action(tuple):
    user, name = (property(operator.itemgetter(x)) for x in range(2))
    def __new__(cls, user, name):
        return tuple.__new__(cls, (user, name))
    def __repr__(self):
        return "%s(%r, %r)" %(self.__class__.__name__, self.user, self.name)
class Fragment(object):
    def __init__(self, start, out):
        self.start = start
        self.out = out
    def connect(self, start):
        for s in self.out:
            s.connect(start)

workflow/lexer.py

import operator
import re
action_re = re.compile('\(\s*([a-zA-Z][a-zA-Z0-9-_]*)\s*,\s*([a-zA-Z][a-zA-Z0-9-_]*)\s*\)') 
class WorkflowError(Exception):
    pass
class Lexer(object):
    def __init__(self, source):
        self.source = source
        self.pos = 0
        self.len = len(source)
    def tokenize(self):
        while self.len > self.pos:
            c = self.source[self.pos]
            if c in " \t\n":
                self.pos += 1
                continue
            elif c == '(':
                yield self.parse_action()
            elif c == '.':
                self.pos += 1
                yield Token('operator', 'cat')
            elif c == '|':
                self.pos += 1
                yield Token('operator', 'alt')
            elif c == '+':
                self.pos += 1
                yield Token('operator', 'plus') 
            elif c == '*':
                self.pos += 1
                yield Token('operator', 'star')
            elif c == '?':
                self.pos += 1
                yield Token('operator', 'quest')
            else:
                raise WorkflowError('%s is unknown operator' % c) 
    def parse_action(self):
        # '(' user ',' name ')'
        assert '(' == self.source[self.pos]
        m = action_re.match(self.source, self.pos)
        self.pos = m.end()
        return Token('action', m.groups())
class Token(tuple):
    type, value = (property(operator.itemgetter(x)) for x in range(2))
    def __new__(cls, type, value):
        return tuple.__new__(cls, (type, value))