Open Source RPA | Intelligent Automation Software
*** Settings ***
Library RPA.HTTP
Library RPA.JSON
Library RPA.Robocorp.WorkItems
*** Variables ***
${WORK_ITEM_NAME}= traffic_data
Process traffic data
${payload}= Get Work Item Payload
${traffic_data}= Set Variable ${payload}[${WORK_ITEM_NAME}]
${valid}= Validate traffic data ${traffic_data}
IF ${valid}
Post traffic data to sales system ${traffic_data}
ELSE
Handle invalid traffic data ${traffic_data}
END
def _pop_item(self):
# Get the next input work item from the cloud queue.
url = url_join(
"runs",
self._process_run_id,
"robotRuns",
self._step_run_id,
"reserve-next-work-item",
)
logging.info("Reserving new input work item from: %s", url)
response = self._process_requests.post(url)
return response.json()["workItemId"]
0
developers
import copy import email import fnmatch import json import logging import os from abc import ABC, abstractmethod from enum import Enum from pathlib import Path from shutil import copy2 from threading import Event from typing import Callable, Type, Any, Optional, Union, Dict, List, Tuple import yaml from robot.api.deco import library, keyword from robot.libraries.BuiltIn import BuiltIn from RPA.Email.ImapSmtp import ImapSmtp from RPA.FileSystem import FileSystem from RPA.core.helpers import import_by_name, required_env from RPA.core.logger import deprecation from RPA.core.notebook import notebook_print from .utils import ( JSONType, Requests, is_json_equal, json_dumps, resolve_path, truncate, url_join, ) UNDEFINED = object() # Undefined default value class State(Enum): """Work item state. (set when released)""" DONE = "COMPLETED" FAILED = "FAILED" class Error(Enum): """Failed work item error type.""" BUSINESS = "BUSINESS" # wrong/missing data, shouldn't be retried APPLICATION = "APPLICATION" # logic issue/timeout, can be retried class EmptyQueue(IndexError): """Raised when trying to load an input item and none available.""" class BaseAdapter(ABC): """Abstract base class for work item adapters.""" @abstractmethod def reserve_input(self) -> str: """Get next work item ID from the input queue and reserve it.""" raise NotImplementedError @abstractmethod def release_input( self, item_id: str, state: State, exception: Optional[dict] = None ): """Release the lastly retrieved input work item and set state.""" raise NotImplementedError @abstractmethod def create_output(self, parent_id: str, payload: Optional[JSONType] = None) -> str: """Create new output for work item, and return created ID.""" raise NotImplementedError @abstractmethod def load_payload(self, item_id: str) -> JSONType: """Load JSON payload from work item.""" raise NotImplementedError @abstractmethod def save_payload(self, item_id: str, payload: JSONType): """Save JSON payload to work item.""" raise NotImplementedError @abstractmethod def list_files(self, item_id: str) -> List[str]: """List attached files in work item.""" raise NotImplementedError @abstractmethod def get_file(self, item_id: str, name: str) -> bytes: """Read file's contents from work item.""" raise NotImplementedError @abstractmethod def add_file(self, item_id: str, name: str, *, original_name: str, content: bytes): """Attach file to work item.""" raise NotImplementedError @abstractmethod def remove_file(self, item_id: str, name: str): """Remove attached file from work item.""" raise NotImplementedError class RobocorpAdapter(BaseAdapter): """Adapter for saving/loading work items from Robocorp Control Room. Required environment variables: * RC_API_WORKITEM_HOST: Work item API hostname * RC_API_WORKITEM_TOKEN: Work item API access token * RC_API_PROCESS_HOST: Process API hostname * RC_API_PROCESS_TOKEN: Process API access token * RC_WORKSPACE_ID: Control room workspace ID * RC_PROCESS_ID: Control room process ID * RC_PROCESS_RUN_ID: Control room process run ID * RC_ROBOT_RUN_ID: Control room robot run ID * RC_WORKITEM_ID: Control room work item ID (input) """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # IDs identifying the current robot run and its input. self._workspace_id = required_env("RC_WORKSPACE_ID") self._process_run_id = required_env("RC_PROCESS_RUN_ID") self._step_run_id = required_env("RC_ACTIVITY_RUN_ID") self._initial_item_id: Optional[str] = required_env("RC_WORKITEM_ID") self._workitem_requests = self._process_requests = None self._init_workitem_requests() self._init_process_requests() def _init_workitem_requests(self): # Endpoint for old work items API. workitem_host = required_env("RC_API_WORKITEM_HOST") workitem_token = required_env("RC_API_WORKITEM_TOKEN") route_prefix = ( url_join( workitem_host, "json-v1", "workspaces", self._workspace_id, "workitems" ) + "/" ) default_headers = { "Authorization": f"Bearer {workitem_token}", "Content-Type": "application/json", } logging.info("Work item API route prefix: %s", route_prefix) self._workitem_requests = Requests( route_prefix, default_headers=default_headers ) def _init_process_requests(self): # Endpoint for the new process API. process_host = required_env("RC_API_PROCESS_HOST") process_token = required_env("RC_API_PROCESS_TOKEN") process_id = required_env("RC_PROCESS_ID") route_prefix = ( url_join( process_host, "process-v1", "workspaces", self._workspace_id, "processes", process_id, ) + "/" ) default_headers = { "Authorization": f"Bearer {process_token}", "Content-Type": "application/json", } logging.info("Process API route prefix: %s", route_prefix) self._process_requests = Requests(route_prefix, default_headers=default_headers) def _pop_item(self): # Get the next input work item from the cloud queue. url = url_join( "runs", self._process_run_id, "robotRuns", self._step_run_id, "reserve-next-work-item", ) logging.info("Reserving new input work item from: %s", url) response = self._process_requests.post(url) return response.json()["workItemId"] def reserve_input(self) -> str: if self._initial_item_id: item_id = self._initial_item_id self._initial_item_id = None return item_id item_id = self._pop_item() if not item_id: raise EmptyQueue("No work items in the input queue") return item_id def release_input( self, item_id: str, state: State, exception: Optional[dict] = None ): # Release the current input work item in the cloud queue. url = url_join( "runs", self._process_run_id, "robotRuns", self._step_run_id, "release-work-item", ) body = {"workItemId": item_id, "state": state.value} if exception: for key, value in list(exception.items()): if value is None: del exception[key] body["exception"] = exception logging.info( "Releasing %s input work item %r into %r with exception: %s", state.value, item_id, url, exception, ) self._process_requests.post(url, json=body)