# coding=utf-8
"""Module to generate test data for sbi and pb client.
FIXME(BMo) Move this to a method on the SchedulingBlockInstance object?
"""
import datetime
import json
from random import choice, randint
from typing import List, Union
from .. import ConfigDb
from ..release import __pb_version__, __sbi_version__
from ..scheduling.processing_block import ProcessingBlock
from ..scheduling.scheduling_block_instance import SchedulingBlockInstance
DB = ConfigDb()
PB_TYPES = [
'realtime',
'offline'
]
REALTIME_WORKFLOWS = [
'vis_ingest_test'
]
OFFLINE_WORKFLOWS = [
'ical_test'
]
[docs]def add_workflow_definitions(sbi_config: dict):
"""Add any missing SBI workflow definitions as placeholders.
This is a utility function used in testing and adds mock / test workflow
definitions to the database for workflows defined in the specified
SBI config.
Args:
sbi_config (dict): SBI configuration dictionary.
"""
registered_workflows = []
for i in range(len(sbi_config['processing_blocks'])):
workflow_config = sbi_config['processing_blocks'][i]['workflow']
workflow_name = '{}:{}'.format(workflow_config['id'],
workflow_config['version'])
if workflow_name in registered_workflows:
continue
workflow_definition = dict(
id=workflow_config['id'],
version=workflow_config['version'],
stages=[]
)
key = "workflow_definitions:{}:{}".format(workflow_config['id'],
workflow_config['version'])
DB.save_dict(key, workflow_definition, hierarchical=False)
registered_workflows.append(workflow_name)
[docs]def generate_version(max_major: int = 1, max_minor: int = 7,
max_patch: int = 15) -> str:
"""Select a random version.
Args:
max_major (int, optional) maximum major version
max_minor (int, optional) maximum minor version
max_patch (int, optional) maximum patch version
Returns:
str, Version String
"""
major = randint(0, max_major)
minor = randint(0, max_minor)
patch = randint(0, max_patch)
return '{:d}.{:d}.{:d}'.format(major, minor, patch)
[docs]def generate_sb(date: datetime.datetime, project: str,
programme_block: str) -> dict:
"""Generate a Scheduling Block data object.
Args:
date (datetime.datetime): UTC date of the SBI
project (str): Project Name
programme_block (str): Programme
Returns:
str, Scheduling Block Instance (SBI) ID.
"""
date = date.strftime('%Y%m%d')
instance_id = randint(0, 9999)
sb_id = 'SB-{}-{}-{:04d}'.format(date, project, instance_id)
return dict(id=sb_id, project=project, programme_block=programme_block)
[docs]def generate_pb_config(pb_id: str,
pb_config: dict = None,
workflow_config: dict = None) -> dict:
"""Generate a PB configuration dictionary.
Args:
pb_id (str): Processing Block Id
pb_config (dict, optional) PB configuration.
workflow_config (dict, optional): Workflow configuration
Returns:
dict, PB configuration dictionary.
"""
if workflow_config is None:
workflow_config = dict()
if pb_config is None:
pb_config = dict()
pb_type = pb_config.get('type', choice(PB_TYPES))
workflow_id = workflow_config.get('id')
if workflow_id is None:
if pb_type == 'offline':
workflow_id = choice(OFFLINE_WORKFLOWS)
else:
workflow_id = choice(REALTIME_WORKFLOWS)
workflow_version = workflow_config.get('version', generate_version())
workflow_parameters = workflow_config.get('parameters', dict())
pb_data = dict(
id=pb_id,
version=__pb_version__,
type=pb_type,
priority=pb_config.get('priority', randint(0, 10)),
dependencies=pb_config.get('dependencies', []),
resources_required=pb_config.get('resources_required', []),
workflow=dict(
id=workflow_id,
version=workflow_version,
parameters=workflow_parameters
)
)
return pb_data
[docs]def generate_sbi_config(num_pbs: int = 3, project: str = 'sip',
programme_block: str = 'sip_demos',
pb_config: Union[dict, List[dict]] = None,
workflow_config:
Union[dict, List[dict]] = None,
register_workflows=False) -> dict:
"""Generate a SBI configuration dictionary.
Args:
num_pbs (int, optional): Number of Processing Blocks (default = 3)
project (str, optional): Project to associate the SBI with.
programme_block (str, optional): SBI programme block
pb_config (dict, List[dict], optional): PB configuration
workflow_config (dict, List[dict], optional): Workflow configuration
register_workflows (bool, optional): If true also register workflows.
Returns:
dict, SBI configuration dictionary
"""
if isinstance(workflow_config, dict):
workflow_config = [workflow_config]
if isinstance(pb_config, dict):
pb_config = [pb_config]
utc_now = datetime.datetime.utcnow()
pb_list = []
for i in range(num_pbs):
pb_id = ProcessingBlock.get_id(utc_now)
if workflow_config is not None:
_workflow_config = workflow_config[i]
else:
_workflow_config = None
if pb_config is not None:
_pb_config = pb_config[i]
else:
_pb_config = None
pb_dict = generate_pb_config(pb_id, _pb_config, _workflow_config)
pb_list.append(pb_dict)
sbi_config = dict(
id=SchedulingBlockInstance.get_id(utc_now, project),
version=__sbi_version__,
scheduling_block=generate_sb(utc_now, project, programme_block),
processing_blocks=pb_list
)
if register_workflows:
add_workflow_definitions(sbi_config)
return sbi_config
[docs]def generate_sbi_json(num_pbs: int = 3, project: str = 'sip',
programme_block: str = 'sip_demos',
pb_config: Union[dict, List[dict]] = None,
workflow_config:
Union[dict, List[dict]] = None,
register_workflows=True) -> str:
"""Return a JSON string used to configure an SBI."""
return json.dumps(generate_sbi_config(num_pbs, project, programme_block,
pb_config,
workflow_config, register_workflows))