|
|
|
|
|
import unittest |
|
|
|
import pytest |
|
|
|
from graphrag.index.config import PipelineWorkflowReference |
|
from graphrag.index.errors import UnknownWorkflowError |
|
from graphrag.index.workflows.load import create_workflow, load_workflows |
|
|
|
from .helpers import mock_verbs, mock_workflows |
|
|
|
|
|
class TestCreateWorkflow(unittest.TestCase): |
|
def test_workflow_with_steps_should_not_fail(self): |
|
create_workflow( |
|
"workflow_with_steps", |
|
[ |
|
{ |
|
"verb": "mock_verb", |
|
"args": { |
|
"column": "test", |
|
}, |
|
} |
|
], |
|
config=None, |
|
additional_verbs=mock_verbs, |
|
) |
|
|
|
def test_non_existent_workflow_without_steps_should_crash(self): |
|
|
|
|
|
with pytest.raises(UnknownWorkflowError): |
|
create_workflow("test", None, config=None, additional_verbs=mock_verbs) |
|
|
|
def test_existing_workflow_should_not_crash(self): |
|
create_workflow( |
|
"mock_workflow", |
|
None, |
|
config=None, |
|
additional_verbs=mock_verbs, |
|
additional_workflows=mock_workflows, |
|
) |
|
|
|
|
|
class TestLoadWorkflows(unittest.TestCase): |
|
def test_non_existent_workflow_should_crash(self): |
|
with pytest.raises(UnknownWorkflowError): |
|
load_workflows( |
|
[ |
|
PipelineWorkflowReference( |
|
name="some_workflow_that_does_not_exist", |
|
config=None, |
|
) |
|
], |
|
additional_workflows=mock_workflows, |
|
additional_verbs=mock_verbs, |
|
) |
|
|
|
def test_single_workflow_should_not_crash(self): |
|
load_workflows( |
|
[ |
|
PipelineWorkflowReference( |
|
name="mock_workflow", |
|
config=None, |
|
) |
|
], |
|
additional_workflows=mock_workflows, |
|
additional_verbs=mock_verbs, |
|
) |
|
|
|
def test_multiple_workflows_should_not_crash(self): |
|
load_workflows( |
|
[ |
|
PipelineWorkflowReference( |
|
name="mock_workflow", |
|
config=None, |
|
), |
|
PipelineWorkflowReference( |
|
name="mock_workflow_2", |
|
config=None, |
|
), |
|
], |
|
|
|
additional_workflows=mock_workflows, |
|
additional_verbs=mock_verbs, |
|
) |
|
|
|
def test_two_interdependent_workflows_should_provide_correct_order(self): |
|
ordered_workflows, _deps = load_workflows( |
|
[ |
|
PipelineWorkflowReference( |
|
name="interdependent_workflow_1", |
|
steps=[ |
|
{ |
|
"verb": "mock_verb", |
|
"args": { |
|
"column": "test", |
|
}, |
|
"input": { |
|
"source": "workflow:interdependent_workflow_2" |
|
}, |
|
} |
|
], |
|
), |
|
PipelineWorkflowReference( |
|
name="interdependent_workflow_2", |
|
steps=[ |
|
{ |
|
"verb": "mock_verb", |
|
"args": { |
|
"column": "test", |
|
}, |
|
} |
|
], |
|
), |
|
], |
|
|
|
additional_workflows=mock_workflows, |
|
additional_verbs=mock_verbs, |
|
) |
|
|
|
|
|
assert len(ordered_workflows) == 2 |
|
assert ordered_workflows[0].workflow.name == "interdependent_workflow_2" |
|
assert ordered_workflows[1].workflow.name == "interdependent_workflow_1" |
|
|
|
def test_three_interdependent_workflows_should_provide_correct_order(self): |
|
ordered_workflows, _deps = load_workflows( |
|
[ |
|
PipelineWorkflowReference( |
|
name="interdependent_workflow_3", |
|
steps=[ |
|
{ |
|
"verb": "mock_verb", |
|
"args": { |
|
"column": "test", |
|
}, |
|
} |
|
], |
|
), |
|
PipelineWorkflowReference( |
|
name="interdependent_workflow_1", |
|
steps=[ |
|
{ |
|
"verb": "mock_verb", |
|
"args": { |
|
"column": "test", |
|
}, |
|
"input": {"source": "workflow:interdependent_workflow_2"}, |
|
} |
|
], |
|
), |
|
PipelineWorkflowReference( |
|
name="interdependent_workflow_2", |
|
steps=[ |
|
{ |
|
"verb": "mock_verb", |
|
"args": { |
|
"column": "test", |
|
}, |
|
"input": {"source": "workflow:interdependent_workflow_3"}, |
|
} |
|
], |
|
), |
|
], |
|
|
|
additional_workflows=mock_workflows, |
|
additional_verbs=mock_verbs, |
|
) |
|
|
|
order = [ |
|
"interdependent_workflow_3", |
|
"interdependent_workflow_2", |
|
"interdependent_workflow_1", |
|
] |
|
assert [x.workflow.name for x in ordered_workflows] == order |
|
|
|
def test_two_workflows_dependent_on_another_single_workflow_should_provide_correct_order( |
|
self, |
|
): |
|
ordered_workflows, _deps = load_workflows( |
|
[ |
|
|
|
PipelineWorkflowReference( |
|
name="interdependent_workflow_3", |
|
steps=[ |
|
{ |
|
"verb": "mock_verb", |
|
"args": { |
|
"column": "test", |
|
}, |
|
} |
|
], |
|
), |
|
PipelineWorkflowReference( |
|
name="interdependent_workflow_1", |
|
steps=[ |
|
{ |
|
"verb": "mock_verb", |
|
"args": { |
|
"column": "test", |
|
}, |
|
"input": {"source": "workflow:interdependent_workflow_3"}, |
|
} |
|
], |
|
), |
|
PipelineWorkflowReference( |
|
name="interdependent_workflow_2", |
|
steps=[ |
|
{ |
|
"verb": "mock_verb", |
|
"args": { |
|
"column": "test", |
|
}, |
|
"input": {"source": "workflow:interdependent_workflow_3"}, |
|
} |
|
], |
|
), |
|
], |
|
|
|
additional_workflows=mock_workflows, |
|
additional_verbs=mock_verbs, |
|
) |
|
|
|
assert len(ordered_workflows) == 3 |
|
assert ordered_workflows[0].workflow.name == "interdependent_workflow_3" |
|
|
|
|
|
assert ordered_workflows[1].workflow.name in [ |
|
"interdependent_workflow_1", |
|
"interdependent_workflow_2", |
|
] |
|
assert ordered_workflows[2].workflow.name in [ |
|
"interdependent_workflow_1", |
|
"interdependent_workflow_2", |
|
] |
|
|