Commit f9b222a3 authored by Ulrich Wohlfeil's avatar Ulrich Wohlfeil 💬
Browse files

- add admin_get_process_list

- add admin_get_workflow_list
parent df16cc23
{
"python.pythonPath": "/home/rk/Repos/ecmind_blue_client_workflow/env/bin/python3.9",
"python.pythonPath": "./env/bin/python",
"python.testing.unittestArgs": [
"-v",
"-s",
".",
"-p",
"test*.py"
"*test*.py"
],
"python.testing.pytestEnabled": false,
"python.testing.nosetestsEnabled": false,
......
import datetime
from enum import Enum
from ecmind_blue_client import Job
from ecmind_blue_client import Client
from XmlElement import XmlElement
class WfState(Enum):
INIT = 1
RUNNING = 2
SUSPENDED = 4
ACTIVE = 8
TERMINATED = 16
COMPLETED = 32
SYSSUSPENDED = 64
def get_organisations(client:Client, only_active:bool=True):
result_get_organisations = client.execute(Job(jobname='wfm.GetOrganisations'))
......@@ -19,6 +31,51 @@ def get_organisations(client:Client, only_active:bool=True):
return { o['@Id']: o['@Name'] for o in organisations }
def admin_get_workflow_list(client: Client, organisation_id: str) -> dict[str, dict[str, object]]:
job = Job('wfm.AdminGetWorkflowList', OrganisationId=organisation_id)
get_workflow_list = client.execute(job)
if not get_workflow_list.return_code == 0:
raise RuntimeError(get_workflow_list.error_message)
workflows = XmlElement.from_string(get_workflow_list.values['Workflows']).to_dict()['Workflow']
workflows = [workflows] if type(workflows) is dict else workflows
return { w['@Id']: {'name': w['@Name'], 'process_count': w['@ProcessCount']} for w in workflows }
def admin_get_process_list(client: Client, organisation_id: str, workflow_id: str) -> dict[str, dict[str, object]]:
job = Job('wfm.AdminGetProcessList', OrganisationId=organisation_id, WorkflowId=workflow_id)
admin_get_process_list_result = client.execute(job)
if not admin_get_process_list_result.return_code == 0:
raise RuntimeError(admin_get_process_list_result.error_message)
processes = XmlElement.from_string(admin_get_process_list_result.values['Processes']).to_dict()['Process']
processes = [processes] if type(processes) is dict else processes
return {
p['@Id']: {
'name': p['@Name'],
'subject': p['@Subject'],
'state': WfState(int(p['@State'])),
'suspended_activity': p['@SuspendedActivity'],
'creation':{
'user_id': p['Creation']['@UserId'],
'user_name': p['Creation']['@UserName'],
'time': datetime.datetime.fromtimestamp(p['Creation']['@Time']),
},
'last_activity':{
'id': p['LastActivity']['@Id'],
'name': p['LastActivity']['@Name'],
'user_id': p['LastActivity']['@UserId'],
'user_name': p['LastActivity']['@UserName'],
'exec_time': datetime.datetime.fromtimestamp(p['LastActivity']['@ExecTime']),
}
} for p in processes
}
def get_roles(client:Client, organisation_id:str):
result_get_organisations_objects = client.execute(Job(
jobname='wfm.GetOrganisationObjects',
......
......@@ -10,6 +10,20 @@ class TestMethods(unittest.TestCase):
orgs = workflow.get_organisations(self.client, only_active=True)
self.assertTrue('Organisation' in orgs.values())
def test_admin_get_workflow_list(self):
orgs = workflow.get_organisations(self.client, only_active=True)
for org_id in orgs:
workflows = workflow.admin_get_workflow_list(self.client, org_id)
self.assertTrue('Adhoc' in [w['name'] for w in workflows.values()])
def test_admin_get_process_list(self):
orgs = workflow.get_organisations(self.client, only_active=True)
for org_id in orgs:
workflows = workflow.admin_get_workflow_list(self.client, org_id)
for workflow_id in workflows:
processes = workflow.admin_get_process_list(self.client, org_id, workflow_id=workflow_id)
self.assertTrue('Adhoc 2' in [w['name'] for w in processes.values()])
def test_get_roles(self):
orgs = workflow.get_organisations(self.client, only_active=True)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment