Open Source RPA | Intelligent Automation Software

*** Settings ***

Library RPA.HTTP

Library RPA.JSON

Library RPA.Robocorp.WorkItems

*** Variables ***

${WORK_ITEM_NAME}= traffic_data

Process traffic data

${payload}= Get Work Item Payload

${traffic_data}= Set Variable ${payload}[${WORK_ITEM_NAME}]

${valid}= Validate traffic data ${traffic_data}

IF ${valid}

Post traffic data to sales system ${traffic_data}

ELSE

Handle invalid traffic data ${traffic_data}

END

def _pop_item(self):

# Get the next input work item from the cloud queue.

url = url_join(

"runs",

self._process_run_id,

"robotRuns",

self._step_run_id,

"reserve-next-work-item",

)

logging.info("Reserving new input work item from: %s", url)

response = self._process_requests.post(url)

return response.json()["workItemId"]

developers

import copy
import email
import fnmatch
import json
import logging
import os
from abc import ABC, abstractmethod
from enum import Enum
from pathlib import Path
from shutil import copy2
from threading import Event
from typing import Callable, Type, Any, Optional, Union, Dict, List, Tuple

import yaml
from robot.api.deco import library, keyword
from robot.libraries.BuiltIn import BuiltIn

from RPA.Email.ImapSmtp import ImapSmtp
from RPA.FileSystem import FileSystem
from RPA.core.helpers import import_by_name, required_env
from RPA.core.logger import deprecation
from RPA.core.notebook import notebook_print
from .utils import (
    JSONType,
    Requests,
    is_json_equal,
    json_dumps,
    resolve_path,
    truncate,
    url_join,
)


UNDEFINED = object()  # Undefined default value


class State(Enum):
    """Work item state. (set when released)"""

    DONE = "COMPLETED"
    FAILED = "FAILED"


class Error(Enum):
    """Failed work item error type."""

    BUSINESS = "BUSINESS"  # wrong/missing data, shouldn't be retried
    APPLICATION = "APPLICATION"  # logic issue/timeout, can be retried


class EmptyQueue(IndexError):
    """Raised when trying to load an input item and none available."""


class BaseAdapter(ABC):
    """Abstract base class for work item adapters."""

    @abstractmethod
    def reserve_input(self) -> str:
        """Get next work item ID from the input queue and reserve it."""
        raise NotImplementedError

    @abstractmethod
    def release_input(
        self, item_id: str, state: State, exception: Optional[dict] = None
    ):
        """Release the lastly retrieved input work item and set state."""
        raise NotImplementedError

    @abstractmethod
    def create_output(self, parent_id: str, payload: Optional[JSONType] = None) -> str:
        """Create new output for work item, and return created ID."""
        raise NotImplementedError

    @abstractmethod
    def load_payload(self, item_id: str) -> JSONType:
        """Load JSON payload from work item."""
        raise NotImplementedError

    @abstractmethod
    def save_payload(self, item_id: str, payload: JSONType):
        """Save JSON payload to work item."""
        raise NotImplementedError

    @abstractmethod
    def list_files(self, item_id: str) -> List[str]:
        """List attached files in work item."""
        raise NotImplementedError

    @abstractmethod
    def get_file(self, item_id: str, name: str) -> bytes:
        """Read file's contents from work item."""
        raise NotImplementedError

    @abstractmethod
    def add_file(self, item_id: str, name: str, *, original_name: str, content: bytes):
        """Attach file to work item."""
        raise NotImplementedError

    @abstractmethod
    def remove_file(self, item_id: str, name: str):
        """Remove attached file from work item."""
        raise NotImplementedError


class RobocorpAdapter(BaseAdapter):
    """Adapter for saving/loading work items from Robocorp Control Room.

    Required environment variables:

    * RC_API_WORKITEM_HOST:     Work item API hostname
    * RC_API_WORKITEM_TOKEN:    Work item API access token

    * RC_API_PROCESS_HOST:      Process API hostname
    * RC_API_PROCESS_TOKEN:     Process API access token

    * RC_WORKSPACE_ID:          Control room workspace ID
    * RC_PROCESS_ID:            Control room process ID
    * RC_PROCESS_RUN_ID:        Control room process run ID
    * RC_ROBOT_RUN_ID:          Control room robot run ID

    * RC_WORKITEM_ID:           Control room work item ID (input)
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # IDs identifying the current robot run and its input.
        self._workspace_id = required_env("RC_WORKSPACE_ID")
        self._process_run_id = required_env("RC_PROCESS_RUN_ID")
        self._step_run_id = required_env("RC_ACTIVITY_RUN_ID")
        self._initial_item_id: Optional[str] = required_env("RC_WORKITEM_ID")

        self._workitem_requests = self._process_requests = None
        self._init_workitem_requests()
        self._init_process_requests()

    def _init_workitem_requests(self):
        # Endpoint for old work items API.
        workitem_host = required_env("RC_API_WORKITEM_HOST")
        workitem_token = required_env("RC_API_WORKITEM_TOKEN")
        route_prefix = (
            url_join(
                workitem_host, "json-v1", "workspaces", self._workspace_id, "workitems"
            )
            + "/"
        )
        default_headers = {
            "Authorization": f"Bearer {workitem_token}",
            "Content-Type": "application/json",
        }
        logging.info("Work item API route prefix: %s", route_prefix)
        self._workitem_requests = Requests(
            route_prefix, default_headers=default_headers
        )

    def _init_process_requests(self):
        # Endpoint for the new process API.
        process_host = required_env("RC_API_PROCESS_HOST")
        process_token = required_env("RC_API_PROCESS_TOKEN")
        process_id = required_env("RC_PROCESS_ID")
        route_prefix = (
            url_join(
                process_host,
                "process-v1",
                "workspaces",
                self._workspace_id,
                "processes",
                process_id,
            )
            + "/"
        )
        default_headers = {
            "Authorization": f"Bearer {process_token}",
            "Content-Type": "application/json",
        }
        logging.info("Process API route prefix: %s", route_prefix)
        self._process_requests = Requests(route_prefix, default_headers=default_headers)

    def _pop_item(self):
        # Get the next input work item from the cloud queue.
        url = url_join(
            "runs",
            self._process_run_id,
            "robotRuns",
            self._step_run_id,
            "reserve-next-work-item",
        )
        logging.info("Reserving new input work item from: %s", url)
        response = self._process_requests.post(url)
        return response.json()["workItemId"]

    def reserve_input(self) -> str:
        if self._initial_item_id:
            item_id = self._initial_item_id
            self._initial_item_id = None
            return item_id

        item_id = self._pop_item()
        if not item_id:
            raise EmptyQueue("No work items in the input queue")
        return item_id

    def release_input(
        self, item_id: str, state: State, exception: Optional[dict] = None
    ):
        # Release the current input work item in the cloud queue.
        url = url_join(
            "runs",
            self._process_run_id,
            "robotRuns",
            self._step_run_id,
            "release-work-item",
        )
        body = {"workItemId": item_id, "state": state.value}
        if exception:
            for key, value in list(exception.items()):
                if value is None:
                    del exception[key]
            body["exception"] = exception
        logging.info(
            "Releasing %s input work item %r into %r with exception: %s",
            state.value,
            item_id,
            url,
            exception,
        )
        self._process_requests.post(url, json=body)

Xổ số miền Bắc