"""
:mod:`transformer.task` -- HTTP requests and related processing
===============================================================
Each HTTP request from a HAR file is seen by Transformer as a separate
:term:`task` to be eventually converted into a :any:`locust.core.task` function:
.. figure:: _static/basic-conversion.*
:align: center
*Transformer converts HAR requests into Locust tasks.*
:class:`~transformer.request.Request` only represents an HTTP request, not the
potential pre- and post-processing that could be desired in the same Locust task
(e.g. before or after the ``requests.get`` call).
Transformer's :term:`task` is an object encapsulating a
:class:`~transformer.request.Request` *and* that additional processing code,
in a one-to-one relationship with :any:`locust.core.task`-decorated functions.
However, Transformer's tasks have no notion of :ref:`weight <specifying-weights>`
or :ref:`grouping <hierarchical-scenarios>`: these come with
:term:`scenarios <scenario>`.
"""
import json
from collections import OrderedDict
from json import JSONDecodeError
from types import MappingProxyType
from typing import (
Iterable,
NamedTuple,
Iterator,
Sequence,
Optional,
Mapping,
Dict,
List,
Tuple,
cast,
)
import dataclasses
from dataclasses import dataclass
from requests.structures import CaseInsensitiveDict
import transformer.python as py
from transformer.blacklist import on_blacklist, Blacklist, get_empty
from transformer.request import HttpMethod, Request, QueryPair
IMMUTABLE_EMPTY_DICT = MappingProxyType({})
TIMEOUT = 30
ACTION_INDENTATION_LEVEL = 12
JSON_MIME_TYPE = "application/json"
class LocustRequest(NamedTuple):
"""
All parameters for the request performed by the Locust client object.
.. deprecated:: 1.0.2
Only used by :class:`Task`, which is itself deprecated.
Use :class:`Task2` instead of :class:`Task`.
"""
method: HttpMethod
url: str
headers: CaseInsensitiveDict = MappingProxyType({})
post_data: dict = MappingProxyType({})
query: Sequence[QueryPair] = ()
name: Optional[str] = None
@classmethod
def from_request(cls, r: Request) -> "LocustRequest":
return LocustRequest(
method=r.method,
url=repr(r.url.geturl()),
headers=r.headers,
post_data=r.post_data,
query=r.query,
name=repr(r.name or r.url.geturl()),
)
[docs]@dataclass
class Task2:
"""
Represents a :term:`task`, i.e. an HTTP request along with some optional
pre- and post-processing code.
.. attribute:: name
:any:`str` --
Name of the corresponding :any:`locust.core.task` function in
the locustfile.
.. attribute:: request
:any:`transformer.request.Request` --
HTTP request executed by this task.
.. attribute:: statements
:any:`Sequence <typing.Sequence>` of |Statement| --
Body of the corresponding :any:`locust.core.task` function in the
locustfile.
One of these statements contains an |ExpressionView| pointing to
:attr:`request`.
The other statements (if any) represent pre- or post-processing code
for that request, depending on whether they appear before or after the
statement containing the |ExpressionView|.
.. warning::
Plugins should be careful if they replace the |ExpressionView|
object found in :attr:`statements`.
Other plugins should still be able to change :attr:`request` and
expect to see these changes reflected in :attr:`statements` via
|ExpressionView|.
.. attribute:: global_code_blocks
:any:`Mapping <typing.Mapping>` of
:any:`str` to |Statement|
.. deprecated:: 1.0.2
This attribute is only kept for backward compatibility purposes.
It exists because Transformer's first plugin system didn't have
:term:`OnPythonProgram`, so plugins had to specify the top-level
locustfile code blocks they needed (e.g. imports, global variables)
at the :class:`Task` level and let the plugin system percolate these
code blocks through the scenario tree.
This explains why scenarios have the similar
:any:`transformer.scenario.Scenario.global_code_blocks` field.
"""
name: str
request: Request
statements: Sequence[py.Statement] = ()
# TODO: Replace me with a plugin framework that accesses the full tree.
# See https://github.com/zalando-incubator/Transformer/issues/11.
global_code_blocks: Mapping[str, Sequence[str]] = IMMUTABLE_EMPTY_DICT
def __post_init__(self,) -> None:
self.statements = list(self.statements)
self.global_code_blocks = {
k: list(v) for k, v in self.global_code_blocks.items()
}
[docs] @classmethod
def from_requests(cls, requests: Iterable[Request]) -> Iterator["Task2"]:
"""
Generates a set of tasks from a given set of HTTP requests.
Each request will be turned into an unevaluated function call
(:class:`transformer.python.FunctionCall`)
making the actual request.
The returned tasks are ordered by increasing :any:`timestamp
<transformer.request.Request.timestamp>` of the corresponding request.
"""
# TODO: Update me when merging Task with Task2: "statements" needs to
# contain a ExpressionView to Task2.request.
# See what is done in from_task (but without the LocustRequest part).
# See https://github.com/zalando-incubator/Transformer/issues/11.
for req in sorted(requests, key=lambda r: r.timestamp):
if not on_blacklist(req.url.netloc):
yield cls(name=req.task_name(), request=req, statements=...)
@classmethod
def from_task(cls, task: "Task") -> "Task2":
# TODO: Remove me as soon as the old Task is no longer used and Task2 is
# renamed to Task.
# See https://github.com/zalando-incubator/Transformer/issues/11.
t = cls(name=task.name, request=task.request)
if task.locust_request:
expr_view = py.ExpressionView(
name="this task's request field",
target=lambda: task.locust_request,
converter=lreq_to_expr,
)
else:
expr_view = py.ExpressionView(
name="this task's request field",
target=lambda: t.request,
converter=req_to_expr,
)
t.statements = [
*[py.OpaqueBlock(x) for x in task.locust_preprocessing],
py.Assignment("response", expr_view),
*[py.OpaqueBlock(x) for x in task.locust_postprocessing],
]
return t
NOOP_HTTP_METHODS = {HttpMethod.GET, HttpMethod.OPTIONS, HttpMethod.DELETE}
def req_to_expr(r: Request) -> py.FunctionCall:
url = py.Literal(str(r.url.geturl()))
args: Dict[str, py.Expression] = OrderedDict(
url=url,
name=py.Literal(r.name) if r.name else url,
timeout=py.Literal(TIMEOUT),
allow_redirects=py.Literal(False),
)
if r.headers:
args["headers"] = py.Literal(r.headers)
if r.method is HttpMethod.POST:
if r.post_data:
rpd = RequestsPostData.from_har_post_data(r.post_data)
args.update(rpd.as_kwargs())
elif r.method in (HttpMethod.PUT, HttpMethod.PATCH):
if r.post_data:
rpd = RequestsPostData.from_har_post_data(r.post_data)
args.update(rpd.as_kwargs())
args.setdefault("params", py.Literal([]))
cast(py.Literal, args["params"]).value.extend(
_params_from_name_value_dicts([dataclasses.asdict(q) for q in r.query])
)
elif r.method not in NOOP_HTTP_METHODS:
raise ValueError(f"unsupported HTTP method: {r.method!r}")
method = r.method.name.lower()
return py.FunctionCall(name=f"self.client.{method}", named_args=args)
def lreq_to_expr(lr: LocustRequest) -> py.FunctionCall:
# TODO: Remove me once LocustRequest no longer exists.
# See https://github.com/zalando-incubator/Transformer/issues/11.
url = _peel_off_repr(lr.url)
name = _peel_off_repr(lr.name) if lr.name else url
args: Dict[str, py.Expression] = OrderedDict(
url=url,
name=name,
timeout=py.Literal(TIMEOUT),
allow_redirects=py.Literal(False),
)
if lr.headers:
args["headers"] = py.Literal(lr.headers)
if lr.method is HttpMethod.POST:
if lr.post_data:
rpd = RequestsPostData.from_har_post_data(lr.post_data)
args.update(rpd.as_kwargs())
elif lr.method in (HttpMethod.PUT, HttpMethod.PATCH):
if lr.post_data:
rpd = RequestsPostData.from_har_post_data(lr.post_data)
args.update(rpd.as_kwargs())
args.setdefault("params", py.Literal([]))
cast(py.Literal, args["params"]).value.extend(
_params_from_name_value_dicts([dataclasses.asdict(q) for q in lr.query])
)
elif lr.method not in NOOP_HTTP_METHODS:
raise ValueError(f"unsupported HTTP method: {lr.method!r}")
method = lr.method.name.lower()
return py.FunctionCall(name=f"self.client.{method}", named_args=args)
def _peel_off_repr(s: str) -> py.Literal:
"""
Reverse the effect of LocustRequest's repr() calls on url and name.
"""
if s.startswith("f"):
return py.FString(eval(s[1:], {}, {}))
return py.Literal(eval(s, {}, {}))
class Task(NamedTuple):
"""
One step of "doing something" on a website.
This basically represents a @task in Locust-speak.
.. deprecated:: 1.0.2
Use :class:`Task2` instead. :class:`Task` is kept for backward
compatibility with existing plugins that have not yet migrated to
:class:`Task2`.
Transformer will automatically convert :class:`Task` objects into
:class:`Task2` objects using :meth:`Task2.from_task`.
"""
name: str
request: Request
locust_request: Optional[LocustRequest] = None
locust_preprocessing: Sequence[str] = ()
locust_postprocessing: Sequence[str] = ()
global_code_blocks: Mapping[str, Sequence[str]] = MappingProxyType({})
@classmethod
def from_requests(
cls, requests: Iterable[Request], blacklist: Optional[Blacklist] = None
) -> Iterator["Task"]:
"""
Generates a set of Tasks from a given set of Requests.
"""
if blacklist is None:
blacklist = get_empty()
for req in sorted(requests, key=lambda r: r.timestamp):
if on_blacklist(blacklist, req.url.netloc):
continue
else:
yield cls(name=req.task_name(), request=req)
def inject_headers(self, headers: dict):
if self.locust_request is None:
original_locust_request = LocustRequest.from_request(self.request)
else:
original_locust_request = self.locust_request
new_locust_request = original_locust_request._replace(
headers=CaseInsensitiveDict({**original_locust_request.headers, **headers})
)
task = self._replace(locust_request=new_locust_request)
return task
def replace_url(self, url: str):
if self.locust_request is None:
original_locust_request = LocustRequest.from_request(self.request)
else:
original_locust_request = self.locust_request
new_locust_request = original_locust_request._replace(url=url)
return self._replace(locust_request=new_locust_request)
@dataclass
class RequestsPostData:
"""
Data to be sent via HTTP POST, along with which API of the requests library
to use.
"""
data: Optional[py.Literal] = None
params: Optional[py.Literal] = None
json: Optional[py.Literal] = None
def as_kwargs(self) -> Dict[str, py.Expression]:
return {k: v for k, v in dataclasses.asdict(self).items() if v is not None}
@classmethod
def from_har_post_data(cls, post_data: dict) -> "RequestsPostData":
"""
Converts a HAR postData object into a RequestsPostData instance.
:param post_data: a HAR "postData" object,
see http://www.softwareishard.com/blog/har-12-spec/#postData.
:raise ValueError: if *post_data* is invalid.
"""
try:
return _from_har_post_data(post_data)
except ValueError as err:
raise ValueError(f"invalid HAR postData object: {post_data!r}") from err
def _from_har_post_data(post_data: dict) -> RequestsPostData:
mime_k = "mimeType"
try:
mime: str = post_data[mime_k]
except KeyError:
raise ValueError(f"missing {mime_k!r} field") from None
rpd = RequestsPostData()
# The "text" and "params" fields are supposed to be mutually
# exclusive (according to the HAR spec) but nobody respects that.
# Often, both text and params are provided for x-www-form-urlencoded.
text_k, params_k = "text", "params"
if text_k not in post_data and params_k not in post_data:
raise ValueError(f"should contain {text_k!r} or {params_k!r}")
_extract_text(mime, post_data, text_k, rpd)
try:
params = _params_from_post_data(params_k, post_data)
if params is not None:
rpd.params = py.Literal(params)
except (KeyError, UnicodeEncodeError, TypeError) as err:
raise ValueError("unreadable params field") from err
return rpd
def _extract_text(
mime: str, post_data: dict, text_k: str, rpd: RequestsPostData
) -> None:
text = post_data.get(text_k)
if mime == JSON_MIME_TYPE:
if text is None:
raise ValueError(f"missing {text_k!r} field for {JSON_MIME_TYPE} content")
try:
rpd.json = py.Literal(json.loads(text))
except JSONDecodeError as err:
raise ValueError(f"unreadable JSON from field {text_k!r}") from err
elif text is not None: # Probably application/x-www-form-urlencoded.
try:
rpd.data = py.Literal(text.encode())
except UnicodeEncodeError as err:
raise ValueError(f"cannot encode the {text_k!r} field in UTF-8") from err
def _params_from_post_data(
key: str, post_data: dict
) -> Optional[List[Tuple[bytes, bytes]]]:
"""
Extracts the *key* list from *post_data* and calls
_params_from_name_value_dicts with that list.
:raise TypeError: if the object at *key* is built using unexpected data types.
"""
params = post_data.get(key)
if params is None:
return None
if not isinstance(params, list):
raise TypeError(f"the {key!r} field should be a list")
return _params_from_name_value_dicts(params)
def _params_from_name_value_dicts(
dicts: Iterable[Mapping[str, str]]
) -> List[Tuple[bytes, bytes]]:
"""
Converts a HAR "params" element [0] into a list of tuples that can be used
as value for requests' "params" keyword-argument.
[0]: http://www.softwareishard.com/blog/har-12-spec/#params
[1]: http://docs.python-requests.org/en/master/user/quickstart/
#more-complicated-post-requests
:raise KeyError: if one of the elements doesn't contain a "name" or "value" field.
:raise UnicodeEncodeError: if an element's "name" or "value" string cannot
be encoded in UTF-8.
"""
return [(d["name"].encode(), d["value"].encode()) for d in dicts]