You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2865 lines
115 KiB
Python

8 months ago
# -*- coding: utf-8 -*-
"""
The module :mod:`odoo.tests.common` provides unittest test cases and a few
helpers and classes to write tests.
"""
import base64
import collections
import concurrent.futures
import contextlib
import difflib
import functools
import importlib
import inspect
import itertools
import json
import logging
import operator
import os
import pathlib
import platform
import pprint
import re
import shutil
import signal
import subprocess
import sys
import tempfile
import threading
import time
import unittest
from . import case
import warnings
from collections import defaultdict
from concurrent.futures import Future, CancelledError, wait
try:
from concurrent.futures import InvalidStateError
except ImportError:
InvalidStateError = NotImplementedError
from contextlib import contextmanager, ExitStack
from datetime import datetime, date
from dateutil.relativedelta import relativedelta
from itertools import zip_longest as izip_longest
from unittest.mock import patch
from xmlrpc import client as xmlrpclib
import requests
import werkzeug.urls
from lxml import etree, html
import odoo
from odoo import api
from odoo.models import BaseModel
from odoo.exceptions import AccessError
from odoo.modules.registry import Registry
from odoo.osv import expression
from odoo.osv.expression import normalize_domain, TRUE_LEAF, FALSE_LEAF
from odoo.service import security
from odoo.sql_db import BaseCursor, Cursor
from odoo.tools import float_compare, single_email_re, profiler, lower_logging
from odoo.tools.misc import find_in_path
from odoo.tools.safe_eval import safe_eval
try:
# the behaviour of decorator changed in 5.0.5 changing the structure of the traceback when
# an error is raised inside a method using a decorator.
# this is not a hudge problem for test execution but this makes error message
# more difficult to read and breaks test_with_decorators
# This also changes the error format making runbot error matching fail
# This also breaks the first frame meaning that the module detection will also fail on runbot
# In 5.1 decoratorx was introduced and it looks like it has the same behaviour of old decorator
from decorator import decoratorx as decorator
except ImportError:
from decorator import decorator
try:
import websocket
except ImportError:
# chrome headless tests will be skipped
websocket = None
_logger = logging.getLogger(__name__)
# The odoo library is supposed already configured.
ADDONS_PATH = odoo.tools.config['addons_path']
HOST = '127.0.0.1'
# Useless constant, tests are aware of the content of demo data
ADMIN_USER_ID = odoo.SUPERUSER_ID
CHECK_BROWSER_SLEEP = 0.1 # seconds
CHECK_BROWSER_ITERATIONS = 100
BROWSER_WAIT = CHECK_BROWSER_SLEEP * CHECK_BROWSER_ITERATIONS # seconds
def get_db_name():
db = odoo.tools.config['db_name']
# If the database name is not provided on the command-line,
# use the one on the thread (which means if it is provided on
# the command-line, this will break when installing another
# database from XML-RPC).
if not db and hasattr(threading.current_thread(), 'dbname'):
return threading.current_thread().dbname
return db
standalone_tests = defaultdict(list)
def standalone(*tags):
""" Decorator for standalone test functions. This is somewhat dedicated to
tests that install, upgrade or uninstall some modules, which is currently
forbidden in regular test cases. The function is registered under the given
``tags`` and the corresponding Odoo module name.
"""
def register(func):
# register func by odoo module name
if func.__module__.startswith('odoo.addons.'):
module = func.__module__.split('.')[2]
standalone_tests[module].append(func)
# register func with aribitrary name, if any
for tag in tags:
standalone_tests[tag].append(func)
standalone_tests['all'].append(func)
return func
return register
# For backwards-compatibility - get_db_name() should be used instead
DB = get_db_name()
def new_test_user(env, login='', groups='base.group_user', context=None, **kwargs):
""" Helper function to create a new test user. It allows to quickly create
users given its login and groups (being a comma separated list of xml ids).
Kwargs are directly propagated to the create to further customize the
created user.
User creation uses a potentially customized environment using the context
parameter allowing to specify a custom context. It can be used to force a
specific behavior and/or simplify record creation. An example is to use
mail-related context keys in mail tests to speedup record creation.
Some specific fields are automatically filled to avoid issues
* groups_id: it is filled using groups function parameter;
* name: "login (groups)" by default as it is required;
* email: it is either the login (if it is a valid email) or a generated
string 'x.x@example.com' (x being the first login letter). This is due
to email being required for most odoo operations;
"""
if not login:
raise ValueError('New users require at least a login')
if not groups:
raise ValueError('New users require at least user groups')
if context is None:
context = {}
groups_id = [(6, 0, [env.ref(g.strip()).id for g in groups.split(',')])]
create_values = dict(kwargs, login=login, groups_id=groups_id)
# automatically generate a name as "Login (groups)" to ease user comprehension
if not create_values.get('name'):
create_values['name'] = '%s (%s)' % (login, groups)
# automatically give a password equal to login
if not create_values.get('password'):
create_values['password'] = login + 'x' * (8 - len(login))
# generate email if not given as most test require an email
if 'email' not in create_values:
if single_email_re.match(login):
create_values['email'] = login
else:
create_values['email'] = '%s.%s@example.com' % (login[0], login[0])
# ensure company_id + allowed company constraint works if not given at create
if 'company_id' in create_values and 'company_ids' not in create_values:
create_values['company_ids'] = [(4, create_values['company_id'])]
return env['res.users'].with_context(**context).create(create_values)
class RecordCapturer:
def __init__(self, model, domain):
self._model = model
self._domain = domain
def __enter__(self):
self._before = self._model.search(self._domain)
self._after = None
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
if exc_type is None:
self._after = self._model.search(self._domain) - self._before
@property
def records(self):
if self._after is None:
return self._model.search(self._domain) - self._before
return self._after
class MetaCase(type):
""" Metaclass of test case classes to assign default 'test_tags':
'standard', 'at_install' and the name of the module.
"""
def __init__(cls, name, bases, attrs):
super(MetaCase, cls).__init__(name, bases, attrs)
# assign default test tags
if cls.__module__.startswith('odoo.addons.'):
if getattr(cls, 'test_tags', None) is None:
cls.test_tags = {'standard', 'at_install'}
cls.test_module = cls.__module__.split('.')[2]
cls.test_class = cls.__name__
cls.test_sequence = 0
def _normalize_arch_for_assert(arch_string, parser_method="xml"):
"""Takes some xml and normalize it to make it comparable to other xml
in particular, blank text is removed, and the output is pretty-printed
:param str arch_string: the string representing an XML arch
:param str parser_method: an string representing which lxml.Parser class to use
when normalizing both archs. Takes either "xml" or "html"
:return: the normalized arch
:rtype str:
"""
Parser = None
if parser_method == 'xml':
Parser = etree.XMLParser
elif parser_method == 'html':
Parser = etree.HTMLParser
parser = Parser(remove_blank_text=True)
arch_string = etree.fromstring(arch_string, parser=parser)
return etree.tostring(arch_string, pretty_print=True, encoding='unicode')
class BaseCase(case.TestCase, metaclass=MetaCase):
""" Subclass of TestCase for Odoo-specific code. This class is abstract and
expects self.registry, self.cr and self.uid to be initialized by subclasses.
"""
longMessage = True # more verbose error message by default: https://www.odoo.com/r/Vmh
warm = True # False during warm-up phase (see :func:`warmup`)
_python_version = sys.version_info
def __init__(self, methodName='runTest'):
super().__init__(methodName)
self.addTypeEqualityFunc(etree._Element, self.assertTreesEqual)
self.addTypeEqualityFunc(html.HtmlElement, self.assertTreesEqual)
def run(self, result):
testMethod = getattr(self, self._testMethodName)
if getattr(testMethod, '_retry', True) and getattr(self, '_retry', True):
tests_run_count = int(os.environ.get('ODOO_TEST_FAILURE_RETRIES', 0)) + 1
else:
tests_run_count = 1
_logger.info('Auto retry disabled for %s', self)
failure = False
for retry in range(tests_run_count):
if retry:
_logger.runbot(f'Retrying a failed test: {self}')
if retry < tests_run_count-1:
with warnings.catch_warnings(), \
result.soft_fail(), \
lower_logging(25, logging.INFO) as quiet_log:
super().run(result)
failure = result.had_failure or quiet_log.had_error_log
else: # last try
super().run(result)
if not failure:
break
def cursor(self):
return self.registry.cursor()
@property
def uid(self):
""" Get the current uid. """
return self.env.uid
@uid.setter
def uid(self, user):
""" Set the uid by changing the test's environment. """
self.env = self.env(user=user)
def ref(self, xid):
""" Returns database ID for the provided :term:`external identifier`,
shortcut for ``_xmlid_lookup``
:param xid: fully-qualified :term:`external identifier`, in the form
:samp:`{module}.{identifier}`
:raise: ValueError if not found
:returns: registered id
"""
return self.browse_ref(xid).id
def browse_ref(self, xid):
""" Returns a record object for the provided
:term:`external identifier`
:param xid: fully-qualified :term:`external identifier`, in the form
:samp:`{module}.{identifier}`
:raise: ValueError if not found
:returns: :class:`~odoo.models.BaseModel`
"""
assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'"
return self.env.ref(xid)
def patch(self, obj, key, val):
""" Do the patch ``setattr(obj, key, val)``, and prepare cleanup. """
patcher = patch.object(obj, key, val) # this is unittest.mock.patch
patcher.start()
self.addCleanup(patcher.stop)
@classmethod
def classPatch(cls, obj, key, val):
""" Do the patch ``setattr(obj, key, val)``, and prepare cleanup. """
patcher = patch.object(obj, key, val) # this is unittest.mock.patch
patcher.start()
cls.addClassCleanup(patcher.stop)
def startPatcher(self, patcher):
mock = patcher.start()
self.addCleanup(patcher.stop)
return mock
@classmethod
def startClassPatcher(cls, patcher):
mock = patcher.start()
cls.addClassCleanup(patcher.stop)
return mock
@contextmanager
def with_user(self, login):
""" Change user for a given test, like with self.with_user() ... """
old_uid = self.uid
try:
user = self.env['res.users'].sudo().search([('login', '=', login)])
assert user, "Login %s not found" % login
# switch user
self.uid = user.id
self.env = self.env(user=self.uid)
yield
finally:
# back
self.uid = old_uid
self.env = self.env(user=self.uid)
@contextmanager
def debug_mode(self):
""" Enable the effects of group 'base.group_no_one'; mainly useful with :class:`Form`. """
origin_user_has_groups = BaseModel.user_has_groups
def user_has_groups(self, groups):
group_set = set(groups.split(','))
if '!base.group_no_one' in group_set:
return False
elif 'base.group_no_one' in group_set:
group_set.remove('base.group_no_one')
return not group_set or origin_user_has_groups(self, ','.join(group_set))
return origin_user_has_groups(self, groups)
with patch('odoo.models.BaseModel.user_has_groups', user_has_groups):
yield
@contextmanager
def _assertRaises(self, exception, *, msg=None):
""" Context manager that clears the environment upon failure. """
with ExitStack() as init:
if hasattr(self, 'env'):
init.enter_context(self.env.cr.savepoint())
if issubclass(exception, AccessError):
# The savepoint() above calls flush(), which leaves the
# record cache with lots of data. This can prevent
# access errors to be detected. In order to avoid this
# issue, we clear the cache before proceeding.
self.env.cr.clear()
with ExitStack() as inner:
cm = inner.enter_context(super().assertRaises(exception, msg=msg))
# *moves* the cleanups from init to inner, this ensures the
# savepoint gets rolled back when `yield` raises `exception`,
# but still allows the initialisation to be protected *and* not
# interfered with by `assertRaises`.
inner.push(init.pop_all())
yield cm
def assertRaises(self, exception, func=None, *args, **kwargs):
if func:
with self._assertRaises(exception):
func(*args, **kwargs)
else:
return self._assertRaises(exception, **kwargs)
if sys.version_info < (3, 10):
# simplified backport of assertNoLogs()
@contextmanager
def assertNoLogs(self, logger: str, level: str):
# assertLogs ensures there is at least one log record when
# exiting the context manager. We insert one dummy record just
# so we pass that silly test while still capturing the logs.
with self.assertLogs(logger, level) as capture:
logging.getLogger(logger).log(getattr(logging, level), "Dummy log record")
yield
if len(capture.output) > 1:
raise self.failureException(f"Unexpected logs found: {capture.output[1:]}")
@contextmanager
def assertQueries(self, expected, flush=True):
""" Check the queries made by the current cursor. ``expected`` is a list
of strings representing the expected queries being made. Query strings
are matched against each other, ignoring case and whitespaces.
"""
Cursor_execute = Cursor.execute
actual_queries = []
def execute(self, query, params=None, log_exceptions=None):
actual_queries.append(query)
return Cursor_execute(self, query, params, log_exceptions)
def get_unaccent_wrapper(cr):
return lambda x: x
if flush:
self.env.flush_all()
self.env.cr.flush()
with patch('odoo.sql_db.Cursor.execute', execute):
with patch('odoo.osv.expression.get_unaccent_wrapper', get_unaccent_wrapper):
yield actual_queries
if flush:
self.env.flush_all()
self.env.cr.flush()
self.assertEqual(
len(actual_queries), len(expected),
"\n---- actual queries:\n%s\n---- expected queries:\n%s" % (
"\n".join(actual_queries), "\n".join(expected),
)
)
for actual_query, expect_query in zip(actual_queries, expected):
self.assertEqual(
"".join(actual_query.lower().split()),
"".join(expect_query.lower().split()),
"\n---- actual query:\n%s\n---- not like:\n%s" % (actual_query, expect_query),
)
@contextmanager
def assertQueryCount(self, default=0, flush=True, **counters):
""" Context manager that counts queries. It may be invoked either with
one value, or with a set of named arguments like ``login=value``::
with self.assertQueryCount(42):
...
with self.assertQueryCount(admin=3, demo=5):
...
The second form is convenient when used with :func:`users`.
"""
if self.warm:
# mock random in order to avoid random bus gc
with patch('random.random', lambda: 1):
login = self.env.user.login
expected = counters.get(login, default)
if flush:
self.env.flush_all()
self.env.cr.flush()
count0 = self.cr.sql_log_count
yield
if flush:
self.env.flush_all()
self.env.cr.flush()
count = self.cr.sql_log_count - count0
if count != expected:
# add some info on caller to allow semi-automatic update of query count
frame, filename, linenum, funcname, lines, index = inspect.stack()[2]
if "/odoo/addons/" in filename:
filename = filename.rsplit("/odoo/addons/", 1)[1]
if count > expected:
msg = "Query count more than expected for user %s: %d > %d in %s at %s:%s"
# add a subtest in order to continue the test_method in case of failures
with self.subTest():
self.fail(msg % (login, count, expected, funcname, filename, linenum))
else:
logger = logging.getLogger(type(self).__module__)
msg = "Query count less than expected for user %s: %d < %d in %s at %s:%s"
logger.info(msg, login, count, expected, funcname, filename, linenum)
else:
# flush before and after during warmup, in order to reproduce the
# same operations, otherwise the caches might not be ready!
if flush:
self.env.flush_all()
self.env.cr.flush()
yield
if flush:
self.env.flush_all()
self.env.cr.flush()
def assertRecordValues(self, records, expected_values):
''' Compare a recordset with a list of dictionaries representing the expected results.
This method performs a comparison element by element based on their index.
Then, the order of the expected values is extremely important.
Note that:
- Comparison between falsy values is supported: False match with None.
- Comparison between monetary field is also treated according the currency's rounding.
- Comparison between x2many field is done by ids. Then, empty expected ids must be [].
- Comparison between many2one field id done by id. Empty comparison can be done using any falsy value.
:param records: The records to compare.
:param expected_values: List of dicts expected to be exactly matched in records
'''
def _compare_candidate(record, candidate, field_names):
''' Compare all the values in `candidate` with a record.
:param record: record being compared
:param candidate: dict of values to compare
:return: A dictionary will encountered difference in values.
'''
diff = {}
for field_name in field_names:
record_value = record[field_name]
field = record._fields[field_name]
field_type = field.type
if field_type == 'monetary':
# Compare monetary field.
currency_field_name = record._fields[field_name].get_currency_field(record)
record_currency = record[currency_field_name]
if field_name not in candidate:
diff[field_name] = (record_value, None)
elif record_currency:
if record_currency.compare_amounts(candidate[field_name], record_value):
diff[field_name] = (record_value, record_currency.round(candidate[field_name]))
elif candidate[field_name] != record_value:
diff[field_name] = (record_value, candidate[field_name])
elif field_type == 'float' and field.get_digits(record.env):
prec = field.get_digits(record.env)[1]
if float_compare(candidate[field_name], record_value, precision_digits=prec) != 0:
diff[field_name] = (record_value, candidate[field_name])
elif field_type in ('one2many', 'many2many'):
# Compare x2many relational fields.
# Empty comparison must be an empty list to be True.
if field_name not in candidate:
diff[field_name] = (sorted(record_value.ids), None)
elif set(record_value.ids) != set(candidate[field_name]):
diff[field_name] = (sorted(record_value.ids), sorted(candidate[field_name]))
elif field_type == 'many2one':
# Compare many2one relational fields.
# Every falsy value is allowed to compare with an empty record.
if field_name not in candidate:
diff[field_name] = (record_value.id, None)
elif (record_value or candidate[field_name]) and record_value.id != candidate[field_name]:
diff[field_name] = (record_value.id, candidate[field_name])
else:
# Compare others fields if not both interpreted as falsy values.
if field_name not in candidate:
diff[field_name] = (record_value, None)
elif (candidate[field_name] or record_value) and record_value != candidate[field_name]:
diff[field_name] = (record_value, candidate[field_name])
return diff
# Compare records with candidates.
different_values = []
field_names = list(expected_values[0].keys())
for index, record in enumerate(records):
is_additional_record = index >= len(expected_values)
candidate = {} if is_additional_record else expected_values[index]
diff = _compare_candidate(record, candidate, field_names)
if diff:
different_values.append((index, 'additional_record' if is_additional_record else 'regular_diff', diff))
for index in range(len(records), len(expected_values)):
diff = {}
for field_name in field_names:
diff[field_name] = (None, expected_values[index][field_name])
different_values.append((index, 'missing_record', diff))
# Build error message.
if not different_values:
return
errors = ['The records and expected_values do not match.']
if len(records) != len(expected_values):
errors.append('Wrong number of records to compare: %d records versus %d expected values.' % (len(records), len(expected_values)))
for index, diff_type, diff in different_values:
if diff_type == 'regular_diff':
errors.append('\n==== Differences at index %s ====' % index)
record_diff = ['%s:%s' % (k, v[0]) for k, v in diff.items()]
candidate_diff = ['%s:%s' % (k, v[1]) for k, v in diff.items()]
errors.append('\n'.join(difflib.unified_diff(record_diff, candidate_diff)))
elif diff_type == 'additional_record':
errors += [
'\n==== Additional record ====',
pprint.pformat(dict((k, v[0]) for k, v in diff.items())),
]
elif diff_type == 'missing_record':
errors += [
'\n==== Missing record ====',
pprint.pformat(dict((k, v[1]) for k, v in diff.items())),
]
self.fail('\n'.join(errors))
# turns out this thing may not be quite as useful as we thought...
def assertItemsEqual(self, a, b, msg=None):
self.assertCountEqual(a, b, msg=None)
def assertTreesEqual(self, n1, n2, msg=None):
self.assertIsNotNone(n1, msg)
self.assertIsNotNone(n2, msg)
self.assertEqual(n1.tag, n2.tag, msg)
# Because lxml.attrib is an ordereddict for which order is important
# to equality, even though *we* don't care
self.assertEqual(dict(n1.attrib), dict(n2.attrib), msg)
self.assertEqual((n1.text or u'').strip(), (n2.text or u'').strip(), msg)
self.assertEqual((n1.tail or u'').strip(), (n2.tail or u'').strip(), msg)
for c1, c2 in izip_longest(n1, n2):
self.assertTreesEqual(c1, c2, msg)
def _assertXMLEqual(self, original, expected, parser="xml"):
"""Asserts that two xmls archs are equal
:param original: the xml arch to test
:type original: str
:param expected: the xml arch of reference
:type expected: str
:param parser: an string representing which lxml.Parser class to use
when normalizing both archs. Takes either "xml" or "html"
:type parser: str
"""
if original:
original = _normalize_arch_for_assert(original, parser)
if expected:
expected = _normalize_arch_for_assert(expected, parser)
self.assertEqual(original, expected)
def assertXMLEqual(self, original, expected):
return self._assertXMLEqual(original, expected)
def assertHTMLEqual(self, original, expected):
return self._assertXMLEqual(original, expected, 'html')
def profile(self, description='', **kwargs):
test_method = getattr(self, '_testMethodName', 'Unknown test method')
if not hasattr(self, 'profile_session'):
self.profile_session = profiler.make_session(test_method)
return profiler.Profiler(
description='%s uid:%s %s %s' % (test_method, self.env.user.id, 'warm' if self.warm else 'cold', description),
db=self.env.cr.dbname,
profile_session=self.profile_session,
**kwargs)
savepoint_seq = itertools.count()
class TransactionCase(BaseCase):
""" Test class in which all test methods are run in a single transaction,
but each test method is run in a sub-transaction managed by a savepoint.
The transaction's cursor is always closed without committing.
The data setup common to all methods should be done in the class method
`setUpClass`, so that it is done once for all test methods. This is useful
for test cases containing fast tests but with significant database setup
common to all cases (complex in-db test data).
After being run, each test method cleans up the record cache and the
registry cache. However, there is no cleanup of the registry models and
fields. If a test modifies the registry (custom models and/or fields), it
should prepare the necessary cleanup (`self.registry.reset_changes()`).
"""
registry: Registry = None
env: api.Environment = None
cr: Cursor = None
@classmethod
def _gc_filestore(cls):
# attachment can be created or unlink during the tests.
# they can addup during test and take some disc space.
# since cron are not running during tests, we need to gc manually
# We need to check the status of the file system outside of the test cursor
with odoo.registry(get_db_name()).cursor() as cr:
gc_env = api.Environment(cr, odoo.SUPERUSER_ID, {})
gc_env['ir.attachment']._gc_file_store_unsafe()
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.addClassCleanup(cls._gc_filestore)
cls.registry = odoo.registry(get_db_name())
cls.addClassCleanup(cls.registry.reset_changes)
cls.addClassCleanup(cls.registry.clear_caches)
cls.cr = cls.registry.cursor()
cls.addClassCleanup(cls.cr.close)
cls.env = api.Environment(cls.cr, odoo.SUPERUSER_ID, {})
def setUp(self):
super().setUp()
# restore environments after the test to avoid invoking flush() with an
# invalid environment (inexistent user id) from another test
envs = self.env.all.envs
for env in list(envs):
self.addCleanup(env.clear)
# restore the set of known environments as it was at setUp
self.addCleanup(envs.update, list(envs))
self.addCleanup(envs.clear)
self.addCleanup(self.registry.clear_caches)
# This prevents precommit functions and data from piling up
# until cr.flush is called in 'assertRaises' clauses
# (these are not cleared in self.env.clear or envs.clear)
cr = self.env.cr
def _reset(cb, funcs, data):
cb._funcs = funcs
cb.data = data
for callback in [cr.precommit, cr.postcommit, cr.prerollback, cr.postrollback]:
self.addCleanup(_reset, callback, collections.deque(callback._funcs), dict(callback.data))
# flush everything in setUpClass before introducing a savepoint
self.env.flush_all()
self._savepoint_id = next(savepoint_seq)
self.cr.execute('SAVEPOINT test_%d' % self._savepoint_id)
self.addCleanup(self.cr.execute, 'ROLLBACK TO SAVEPOINT test_%d' % self._savepoint_id)
self.patch(self.registry['res.partner'], '_get_gravatar_image', lambda *a: False)
class SavepointCase(TransactionCase):
@classmethod
def __init_subclass__(cls):
super().__init_subclass__()
warnings.warn(
"Deprecated class SavepointCase has been merged into TransactionCase",
DeprecationWarning, stacklevel=2,
)
class SingleTransactionCase(BaseCase):
""" TestCase in which all test methods are run in the same transaction,
the transaction is started with the first test method and rolled back at
the end of the last.
"""
@classmethod
def __init_subclass__(cls):
super().__init_subclass__()
if issubclass(cls, TransactionCase):
_logger.warning("%s inherits from both TransactionCase and SingleTransactionCase")
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.registry = odoo.registry(get_db_name())
cls.addClassCleanup(cls.registry.reset_changes)
cls.addClassCleanup(cls.registry.clear_caches)
cls.cr = cls.registry.cursor()
cls.addClassCleanup(cls.cr.close)
cls.env = api.Environment(cls.cr, odoo.SUPERUSER_ID, {})
def setUp(self):
super(SingleTransactionCase, self).setUp()
self.env.flush_all()
class ChromeBrowserException(Exception):
pass
def fmap(future, map_fun):
"""Maps a future's result through a callback.
Resolves to the application of ``map_fun`` to the result of ``future``.
.. warning:: this does *not* recursively resolve futures, if that's what
you need see :func:`fchain`
"""
fmap_future = Future()
@future.add_done_callback
def _(f):
try:
fmap_future.set_result(map_fun(f.result()))
except Exception as e:
fmap_future.set_exception(e)
return fmap_future
def fchain(future, next_callback):
"""Chains a future's result to a new future through a callback.
Corresponds to the ``bind`` monadic operation (aka flatmap aka then...
kinda).
"""
new_future = Future()
@future.add_done_callback
def _(f):
try:
n = next_callback(f.result())
@n.add_done_callback
def _(f):
try:
new_future.set_result(f.result())
except Exception as e:
new_future.set_exception(e)
except Exception as e:
new_future.set_exception(e)
return new_future
class ChromeBrowser:
""" Helper object to control a Chrome headless process. """
remote_debugging_port = 0 # 9222, change it in a non-git-tracked file
def __init__(self, test_class):
self._logger = test_class._logger
self.test_class = test_class
if websocket is None:
self._logger.warning("websocket-client module is not installed")
raise unittest.SkipTest("websocket-client module is not installed")
self.devtools_port = None
self.ws_url = '' # WebSocketUrl
self.ws = None # websocket
self.user_data_dir = tempfile.mkdtemp(suffix='_chrome_odoo')
self.chrome_pid = None
otc = odoo.tools.config
self.screenshots_dir = os.path.join(otc['screenshots'], get_db_name(), 'screenshots')
self.screencasts_dir = None
self.screencasts_frames_dir = None
if otc['screencasts']:
self.screencasts_dir = os.path.join(otc['screencasts'], get_db_name(), 'screencasts')
self.screencasts_frames_dir = os.path.join(self.screencasts_dir, 'frames')
os.makedirs(self.screencasts_frames_dir, exist_ok=True)
self.screencast_frames = []
os.makedirs(self.screenshots_dir, exist_ok=True)
self.window_size = test_class.browser_size
self.touch_enabled = test_class.touch_enabled
self.sigxcpu_handler = None
self._chrome_start()
self._find_websocket()
self._logger.info('Websocket url found: %s', self.ws_url)
self._open_websocket()
self._request_id = itertools.count()
self._result = Future()
self.error_checker = None
self.had_failure = False
# maps request_id to Futures
self._responses = {}
# maps frame ids to callbacks
self._frames = {}
self._handlers = {
'Runtime.consoleAPICalled': self._handle_console,
'Runtime.exceptionThrown': self._handle_exception,
'Page.frameStoppedLoading': self._handle_frame_stopped_loading,
'Page.screencastFrame': self._handle_screencast_frame,
}
self._receiver = threading.Thread(
target=self._receive,
name="WebSocket events consumer",
args=(get_db_name(),)
)
self._receiver.start()
self._logger.info('Enable chrome headless console log notification')
self._websocket_send('Runtime.enable')
self._logger.info('Chrome headless enable page notifications')
self._websocket_send('Page.enable')
if os.name == 'posix':
self.sigxcpu_handler = signal.getsignal(signal.SIGXCPU)
signal.signal(signal.SIGXCPU, self.signal_handler)
def signal_handler(self, sig, frame):
if sig == signal.SIGXCPU:
_logger.info('CPU time limit reached, stopping Chrome and shutting down')
self.stop()
os._exit(0)
def stop(self):
if self.chrome_pid is not None:
self._logger.info("Closing chrome headless with pid %s", self.chrome_pid)
self._websocket_send('Browser.close')
self._logger.info("Closing websocket connection")
self.ws.close()
self._logger.info("Terminating chrome headless with pid %s", self.chrome_pid)
os.kill(self.chrome_pid, signal.SIGTERM)
if self.user_data_dir and os.path.isdir(self.user_data_dir) and self.user_data_dir != '/':
self._logger.info('Removing chrome user profile "%s"', self.user_data_dir)
shutil.rmtree(self.user_data_dir, ignore_errors=True)
# Restore previous signal handler
if self.sigxcpu_handler and os.name == 'posix':
signal.signal(signal.SIGXCPU, self.sigxcpu_handler)
@property
def executable(self):
system = platform.system()
if system == 'Linux':
for bin_ in ['google-chrome', 'chromium', 'chromium-browser', 'google-chrome-stable']:
try:
return find_in_path(bin_)
except IOError:
continue
elif system == 'Darwin':
bins = [
'/Applications/Google Chrome.app/Contents/MacOS/Google Chrome',
'/Applications/Chromium.app/Contents/MacOS/Chromium',
]
for bin_ in bins:
if os.path.exists(bin_):
return bin_
elif system == 'Windows':
bins = [
'%ProgramFiles%\\Google\\Chrome\\Application\\chrome.exe',
'%ProgramFiles(x86)%\\Google\\Chrome\\Application\\chrome.exe',
'%LocalAppData%\\Google\\Chrome\\Application\\chrome.exe',
]
for bin_ in bins:
bin_ = os.path.expandvars(bin_)
if os.path.exists(bin_):
return bin_
raise unittest.SkipTest("Chrome executable not found")
def _chrome_without_limit(self, cmd):
if os.name == 'posix' and platform.system() != 'Darwin':
# since the introduction of pointer compression in Chrome 80 (v8 v8.0),
# the memory reservation algorithm requires more than 8GiB of
# virtual mem for alignment this exceeds our default memory limits.
def preexec():
import resource
resource.setrlimit(resource.RLIMIT_AS, (resource.RLIM_INFINITY, resource.RLIM_INFINITY))
else:
preexec = None
# pylint: disable=subprocess-popen-preexec-fn
return subprocess.Popen(cmd, stderr=subprocess.DEVNULL, preexec_fn=preexec)
def _spawn_chrome(self, cmd):
proc = self._chrome_without_limit(cmd)
port_file = pathlib.Path(self.user_data_dir, 'DevToolsActivePort')
for _ in range(CHECK_BROWSER_ITERATIONS):
time.sleep(CHECK_BROWSER_SLEEP)
if port_file.is_file() and port_file.stat().st_size > 5:
with port_file.open('r', encoding='utf-8') as f:
self.devtools_port = int(f.readline())
return proc.pid
raise unittest.SkipTest(f'Failed to detect chrome devtools port after {BROWSER_WAIT :.1f}s.')
def _chrome_start(self):
if self.chrome_pid is not None:
return
switches = {
'--headless': '',
'--no-default-browser-check': '',
'--no-first-run': '',
'--disable-extensions': '',
'--disable-background-networking' : '',
'--disable-background-timer-throttling' : '',
'--disable-backgrounding-occluded-windows': '',
'--disable-renderer-backgrounding' : '',
'--disable-breakpad': '',
'--disable-client-side-phishing-detection': '',
'--disable-crash-reporter': '',
'--disable-default-apps': '',
'--disable-dev-shm-usage': '',
'--disable-device-discovery-notifications': '',
'--disable-namespace-sandbox': '',
'--user-data-dir': self.user_data_dir,
'--disable-translate': '',
# required for tours that use Youtube autoplay conditions (namely website_slides' "course_tour")
'--autoplay-policy': 'no-user-gesture-required',
'--window-size': self.window_size,
'--remote-debugging-address': HOST,
'--remote-debugging-port': str(self.remote_debugging_port),
'--no-sandbox': '',
'--disable-gpu': '',
'--remote-allow-origins': '*',
# '--enable-precise-memory-info': '', # uncomment to debug memory leaks in qunit suite
# '--js-flags': '--expose-gc', # uncomment to debug memory leaks in qunit suite
}
if self.touch_enabled:
# enable Chrome's Touch mode, useful to detect touch capabilities using
# "'ontouchstart' in window"
switches['--touch-events'] = ''
cmd = [self.executable]
cmd += ['%s=%s' % (k, v) if v else k for k, v in switches.items()]
url = 'about:blank'
cmd.append(url)
try:
self.chrome_pid = self._spawn_chrome(cmd)
except OSError:
raise unittest.SkipTest("%s not found" % cmd[0])
self._logger.info('Chrome pid: %s', self.chrome_pid)
def _find_websocket(self):
version = self._json_command('version')
self._logger.info('Browser version: %s', version['Browser'])
infos = self._json_command('', get_key=0) # Infos about the first tab
self.ws_url = infos['webSocketDebuggerUrl']
self.dev_tools_frontend_url = infos.get('devtoolsFrontendUrl')
self._logger.info('Chrome headless temporary user profile dir: %s', self.user_data_dir)
def _json_command(self, command, timeout=3, get_key=None):
"""Queries browser state using JSON
Available commands:
``''``
return list of tabs with their id
``list`` (or ``json/``)
list tabs
``new``
open a new tab
:samp:`activate/{id}`
activate a tab
:samp:`close/{id}`
close a tab
``version``
get chrome and dev tools version
``protocol``
get the full protocol
"""
command = '/'.join(['json', command]).strip('/')
url = werkzeug.urls.url_join('http://%s:%s/' % (HOST, self.devtools_port), command)
self._logger.info("Issuing json command %s", url)
delay = 0.1
tries = 0
failure_info = None
while timeout > 0:
try:
os.kill(self.chrome_pid, 0)
except ProcessLookupError:
message = 'Chrome crashed at startup'
break
try:
r = requests.get(url, timeout=3)
if r.ok:
res = r.json()
if get_key is None:
return res
else:
return res[get_key]
except requests.ConnectionError as e:
failure_info = str(e)
message = 'Connection Error while trying to connect to Chrome debugger'
except requests.exceptions.ReadTimeout as e:
failure_info = str(e)
message = 'Connection Timeout while trying to connect to Chrome debugger'
break
except (KeyError, IndexError):
message = 'Key "%s" not found in json result "%s" after connecting to Chrome debugger' % (get_key, res)
time.sleep(delay)
timeout -= delay
delay = delay * 1.5
tries += 1
self._logger.error("%s after %s tries" % (message, tries))
if failure_info:
self._logger.info(failure_info)
self.stop()
raise unittest.SkipTest("Error during Chrome headless connection")
def _open_websocket(self):
self.ws = websocket.create_connection(self.ws_url, enable_multithread=True, suppress_origin=True)
if self.ws.getstatus() != 101:
raise unittest.SkipTest("Cannot connect to chrome dev tools")
self.ws.settimeout(0.01)
def _receive(self, dbname):
threading.current_thread().dbname = dbname
# So CDT uses a streamed JSON-RPC structure, meaning a request is
# {id, method, params} and eventually a {id, result | error} should
# arrive the other way, however for events it uses "notifications"
# meaning request objects without an ``id``, but *coming from the server
while True: # or maybe until `self._result` is `done()`?
try:
msg = self.ws.recv()
self._logger.debug('\n<- %s', msg)
except websocket.WebSocketTimeoutException:
continue
except Exception as e:
# if the socket is still connected something bad happened,
# otherwise the client was just shut down
if self.ws.connected:
self._result.set_exception(e)
raise
self._result.cancel()
return
res = json.loads(msg)
request_id = res.get('id')
try:
if request_id is None:
handler = self._handlers.get(res['method'])
if handler:
handler(**res['params'])
else:
f = self._responses.pop(request_id, None)
if f:
if 'result' in res:
f.set_result(res['result'])
else:
f.set_exception(ChromeBrowserException(res['error']['message']))
except Exception:
_logger.exception("While processing message %s", msg)
def _websocket_request(self, method, *, params=None, timeout=10.0):
assert threading.get_ident() != self._receiver.ident,\
"_websocket_request must not be called from the consumer thread"
if self.ws is None:
return
f = self._websocket_send(method, params=params, with_future=True)
try:
return f.result(timeout=timeout)
except concurrent.futures.TimeoutError:
raise TimeoutError(f'{method}({params or ""})')
def _websocket_send(self, method, *, params=None, with_future=False):
"""send chrome devtools protocol commands through websocket
If ``with_future`` is set, returns a ``Future`` for the operation.
"""
if self.ws is None:
return
result = None
request_id = next(self._request_id)
if with_future:
result = self._responses[request_id] = Future()
payload = {'method': method, 'id': request_id}
if params:
payload['params'] = params
self._logger.debug('\n-> %s', payload)
self.ws.send(json.dumps(payload))
return result
def _handle_console(self, type, args=None, stackTrace=None, **kw): # pylint: disable=redefined-builtin
# console formatting differs somewhat from Python's, if args[0] has
# format modifiers that many of args[1:] get formatted in, missing
# args are replaced by empty strings and extra args are concatenated
# (space-separated)
#
# current version modifies the args in place which could and should
# probably be improved
if args:
arg0, args = str(self._from_remoteobject(args[0])), args[1:]
else:
arg0, args = '', []
formatted = [re.sub(r'%[%sdfoOc]', self.console_formatter(args), arg0)]
# formatter consumes args it uses, leaves unformatted args untouched
formatted.extend(str(self._from_remoteobject(arg)) for arg in args)
message = ' '.join(formatted)
stack = ''.join(self._format_stack({'type': type, 'stackTrace': stackTrace}))
if stack:
message += '\n' + stack
log_type = type
self._logger.getChild('browser').log(
self._TO_LEVEL.get(log_type, logging.INFO),
"%s", message # might still have %<x> characters
)
if log_type == 'error':
self.had_failure = True
if not self.error_checker or self.error_checker(message):
self.take_screenshot()
self._save_screencast()
try:
self._result.set_exception(ChromeBrowserException(message))
except CancelledError:
...
except InvalidStateError:
self._logger.warning(
"Trying to set result to failed (%s) but found the future settled (%s)",
message, self._result
)
elif 'test successful' in message:
if self.test_class.allow_end_on_form:
self._result.set_result(True)
return
qs = fchain(
self._websocket_send('DOM.getDocument', params={'depth': 0}, with_future=True),
lambda d: self._websocket_send("DOM.querySelector", params={
'nodeId': d['root']['nodeId'],
'selector': '.o_legacy_form_view.o_form_editable, .o_form_dirty',
}, with_future=True)
)
@qs.add_done_callback
def _qs_result(fut):
node_id = 0
with contextlib.suppress(Exception):
node_id = fut.result()['nodeId']
if node_id:
self.take_screenshot("unsaved_form_")
self._result.set_exception(ChromeBrowserException("""\
Tour finished with an open form view in edition mode.
Form views in edition mode are automatically saved when the page is closed, \
which leads to stray network requests and inconsistencies."""))
return
try:
self._result.set_result(True)
except Exception:
# if the future was already failed, we're happy,
# otherwise swap for a new failed
if self._result.exception() is None:
self._result = Future()
self._result.set_exception(ChromeBrowserException(
"Tried to make the tour successful twice."
))
def _handle_exception(self, exceptionDetails, timestamp):
message = exceptionDetails['text']
exception = exceptionDetails.get('exception')
if exception:
message += str(self._from_remoteobject(exception))
exceptionDetails['type'] = 'trace' # fake this so _format_stack works
stack = ''.join(self._format_stack(exceptionDetails))
if stack:
message += '\n' + stack
self.take_screenshot()
self._save_screencast()
try:
self._result.set_exception(ChromeBrowserException(message))
except CancelledError:
...
except InvalidStateError:
self._logger.warning(
"Trying to set result to failed (%s) but found the future settled (%s)",
message, self._result
)
def _handle_frame_stopped_loading(self, frameId):
wait = self._frames.pop(frameId, None)
if wait:
wait()
def _handle_screencast_frame(self, sessionId, data, metadata):
self._websocket_send('Page.screencastFrameAck', params={'sessionId': sessionId})
outfile = os.path.join(self.screencasts_frames_dir, 'frame_%05d.b64' % len(self.screencast_frames))
with open(outfile, 'w') as f:
f.write(data)
self.screencast_frames.append({
'file_path': outfile,
'timestamp': metadata.get('timestamp')
})
_TO_LEVEL = {
'debug': logging.DEBUG,
'log': logging.INFO,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
# TODO: what do with
# dir, dirxml, table, trace, clear, startGroup, startGroupCollapsed,
# endGroup, assert, profile, profileEnd, count, timeEnd
}
def take_screenshot(self, prefix='sc_', suffix=None):
def handler(f):
base_png = f.result(timeout=0)['data']
if not base_png:
self._logger.warning("Couldn't capture screenshot: expected image data, got ?? error ??")
return
decoded = base64.b64decode(base_png, validate=True)
fname = '{}{:%Y%m%d_%H%M%S_%f}{}.png'.format(
prefix, datetime.now(),
suffix or '_%s' % self.test_class.__name__)
full_path = os.path.join(self.screenshots_dir, fname)
with open(full_path, 'wb') as f:
f.write(decoded)
self._logger.runbot('Screenshot in: %s', full_path)
self._logger.info('Asking for screenshot')
f = self._websocket_send('Page.captureScreenshot', with_future=True)
f.add_done_callback(handler)
return f
def _save_screencast(self, prefix='failed'):
# could be encododed with something like that
# ffmpeg -framerate 3 -i frame_%05d.png output.mp4
if not self.screencast_frames:
self._logger.debug('No screencast frames to encode')
return None
for f in self.screencast_frames:
with open(f['file_path'], 'rb') as b64_file:
frame = base64.decodebytes(b64_file.read())
os.unlink(f['file_path'])
f['file_path'] = f['file_path'].replace('.b64', '.png')
with open(f['file_path'], 'wb') as png_file:
png_file.write(frame)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')
fname = '%s_screencast_%s.mp4' % (prefix, timestamp)
outfile = os.path.join(self.screencasts_dir, fname)
try:
ffmpeg_path = find_in_path('ffmpeg')
except IOError:
ffmpeg_path = None
if ffmpeg_path:
nb_frames = len(self.screencast_frames)
concat_script_path = os.path.join(self.screencasts_dir, fname.replace('.mp4', '.txt'))
with open(concat_script_path, 'w') as concat_file:
for i in range(nb_frames):
frame_file_path = os.path.join(self.screencasts_frames_dir, self.screencast_frames[i]['file_path'])
end_time = time.time() if i == nb_frames - 1 else self.screencast_frames[i+1]['timestamp']
duration = end_time - self.screencast_frames[i]['timestamp']
concat_file.write("file '%s'\nduration %s\n" % (frame_file_path, duration))
concat_file.write("file '%s'" % frame_file_path) # needed by the concat plugin
r = subprocess.run([ffmpeg_path, '-intra', '-f', 'concat','-safe', '0', '-i', concat_script_path, '-pix_fmt', 'yuv420p', outfile])
self._logger.log(25, 'Screencast in: %s', outfile)
else:
outfile = outfile.strip('.mp4')
shutil.move(self.screencasts_frames_dir, outfile)
self._logger.runbot('Screencast frames in: %s', outfile)
def start_screencast(self):
assert self.screencasts_dir
self._websocket_send('Page.startScreencast')
def set_cookie(self, name, value, path, domain):
params = {'name': name, 'value': value, 'path': path, 'domain': domain}
self._websocket_request('Network.setCookie', params=params)
return
def delete_cookie(self, name, **kwargs):
params = {k: v for k, v in kwargs.items() if k in ['url', 'domain', 'path']}
params['name'] = name
self._websocket_request('Network.deleteCookies', params=params)
return
def _wait_ready(self, ready_code, timeout=60):
self._logger.info('Evaluate ready code "%s"', ready_code)
start_time = time.time()
result = None
while True:
taken = time.time() - start_time
if taken > timeout:
break
result = self._websocket_request('Runtime.evaluate', params={
'expression': "try { %s } catch {}" % ready_code,
'awaitPromise': True,
}, timeout=timeout-taken)['result']
if result == {'type': 'boolean', 'value': True}:
time_to_ready = time.time() - start_time
if taken > 2:
self._logger.info('The ready code tooks too much time : %s', time_to_ready)
return True
self.take_screenshot(prefix='sc_failed_ready_')
self._logger.info('Ready code last try result: %s', result)
return False
def _wait_code_ok(self, code, timeout, error_checker=None):
self.error_checker = error_checker
self._logger.info('Evaluate test code "%s"', code)
start = time.time()
res = self._websocket_request('Runtime.evaluate', params={
'expression': code,
'awaitPromise': True,
}, timeout=timeout)['result']
if res.get('subtype') == 'error':
raise ChromeBrowserException("Running code returned an error: %s" % res)
err = ChromeBrowserException("failed")
try:
# if the runcode was a promise which took some time to execute,
# discount that from the timeout
if self._result.result(time.time() - start + timeout) and not self.had_failure:
return
except CancelledError:
# regular-ish shutdown
return
except Exception as e:
err = e
self.take_screenshot()
self._save_screencast()
if isinstance(err, ChromeBrowserException):
raise err
if isinstance(err, concurrent.futures.TimeoutError):
raise ChromeBrowserException('Script timeout exceeded') from err
raise ChromeBrowserException("Unknown error") from err
def navigate_to(self, url, wait_stop=False):
self._logger.info('Navigating to: "%s"', url)
nav_result = self._websocket_request('Page.navigate', params={'url': url}, timeout=20.0)
self._logger.info("Navigation result: %s", nav_result)
if wait_stop:
frame_id = nav_result['frameId']
e = threading.Event()
self._frames[frame_id] = e.set
self._logger.info('Waiting for frame %r to stop loading', frame_id)
e.wait(10)
def clear(self):
self._websocket_send('Page.stopScreencast')
if self.screencasts_dir and os.path.isdir(self.screencasts_frames_dir):
shutil.rmtree(self.screencasts_frames_dir)
self.screencast_frames = []
self._websocket_request('Page.stopLoading')
self._websocket_request('Runtime.evaluate', params={'expression': """
('serviceWorker' in navigator) &&
navigator.serviceWorker.getRegistrations().then(
registrations => Promise.all(registrations.map(r => r.unregister()))
)
""", 'awaitPromise': True})
# wait for the screenshot or whatever
wait(self._responses.values(), 10)
self._logger.info('Deleting cookies and clearing local storage')
self._websocket_request('Network.clearBrowserCache')
self._websocket_request('Network.clearBrowserCookies')
self._websocket_request('Runtime.evaluate', params={'expression': 'try {localStorage.clear(); sessionStorage.clear();} catch(e) {}'})
self.navigate_to('about:blank', wait_stop=True)
# hopefully after navigating to about:blank there's no event left
self._frames.clear()
# wait for the clearing requests to finish in case the browser is re-used
wait(self._responses.values(), 10)
self._responses.clear()
self._result.cancel()
self._result = Future()
self.had_failure = False
def _from_remoteobject(self, arg):
""" attempts to make a CDT RemoteObject comprehensible
"""
objtype = arg['type']
subtype = arg.get('subtype')
if objtype == 'undefined':
# the undefined remoteobject is literally just {type: undefined}...
return 'undefined'
elif objtype != 'object' or subtype not in (None, 'array'):
# value is the json representation for json object
# otherwise fallback on the description which is "a string
# representation of the object" e.g. the traceback for errors, the
# source for functions, ... finally fallback on the entire arg mess
return arg.get('value', arg.get('description', arg))
elif subtype == 'array':
# apparently value is *not* the JSON representation for arrays
# instead it's just Array(3) which is useless, however the preview
# properties are the same as object which is useful (just ignore the
# name which is the index)
return '[%s]' % ', '.join(
repr(p['value']) if p['type'] == 'string' else str(p['value'])
for p in arg.get('preview', {}).get('properties', [])
if re.match(r'\d+', p['name'])
)
# all that's left is type=object, subtype=None aka custom or
# non-standard objects, print as TypeName(param=val, ...), sadly because
# of the way Odoo widgets are created they all appear as Class(...)
# nb: preview properties are *not* recursive, the value is *all* we get
return '%s(%s)' % (
arg.get('className') or 'object',
', '.join(
'%s=%s' % (p['name'], repr(p['value']) if p['type'] == 'string' else p['value'])
for p in arg.get('preview', {}).get('properties', [])
if p.get('value') is not None
)
)
LINE_PATTERN = '\tat %(functionName)s (%(url)s:%(lineNumber)d:%(columnNumber)d)\n'
def _format_stack(self, logrecord):
if logrecord['type'] not in ['trace']:
return
trace = logrecord.get('stackTrace')
while trace:
for f in trace['callFrames']:
yield self.LINE_PATTERN % f
trace = trace.get('parent')
def console_formatter(self, args):
""" Formats similarly to the console API:
* if there are no args, don't format (return string as-is)
* %% -> %
* %c -> replace by styling directives (ignore for us)
* other known formatters -> replace by corresponding argument
* leftover known formatters (args exhausted) -> replace by empty string
* unknown formatters -> return as-is
"""
if not args:
return lambda m: m[0]
def replacer(m):
fmt = m[0][1]
if fmt == '%':
return '%'
if fmt in 'sdfoOc':
if not args:
return ''
repl = args.pop(0)
if fmt == 'c':
return ''
return str(self._from_remoteobject(repl))
return m[0]
return replacer
class Opener(requests.Session):
"""
Flushes and clears the current transaction when starting a request.
This is likely necessary when we make a request to the server, as the
request is made with a test cursor, which uses a different cache than this
transaction.
"""
def __init__(self, cr: BaseCursor):
super().__init__()
self.cr = cr
def request(self, *args, **kwargs):
self.cr.flush()
self.cr.clear()
return super().request(*args, **kwargs)
class Transport(xmlrpclib.Transport):
""" see :class:`Opener` """
def __init__(self, cr: BaseCursor):
self.cr = cr
super().__init__()
def request(self, *args, **kwargs):
self.cr.flush()
self.cr.clear()
return super().request(*args, **kwargs)
class HttpCase(TransactionCase):
""" Transactional HTTP TestCase with url_open and Chrome headless helpers. """
registry_test_mode = True
browser = None
browser_size = '1366x768'
touch_enabled = False
allow_end_on_form = False
_logger: logging.Logger = None
@classmethod
def setUpClass(cls):
super().setUpClass()
ICP = cls.env['ir.config_parameter']
ICP.set_param('web.base.url', cls.base_url())
ICP.env.flush_all()
# v8 api with correct xmlrpc exception handling.
cls.xmlrpc_url = f'http://{HOST}:{odoo.tools.config["http_port"]:d}/xmlrpc/2/'
cls._logger = logging.getLogger('%s.%s' % (cls.__module__, cls.__name__))
def setUp(self):
super().setUp()
if self.registry_test_mode:
self.registry.enter_test_mode(self.cr)
self.addCleanup(self.registry.leave_test_mode)
self.xmlrpc_common = xmlrpclib.ServerProxy(self.xmlrpc_url + 'common', transport=Transport(self.cr))
self.xmlrpc_db = xmlrpclib.ServerProxy(self.xmlrpc_url + 'db', transport=Transport(self.cr))
self.xmlrpc_object = xmlrpclib.ServerProxy(self.xmlrpc_url + 'object', transport=Transport(self.cr))
# setup an url opener helper
self.opener = Opener(self.cr)
@classmethod
def start_browser(cls):
# start browser on demand
if cls.browser is None:
cls.browser = ChromeBrowser(cls)
cls.addClassCleanup(cls.terminate_browser)
@classmethod
def terminate_browser(cls):
if cls.browser:
cls.browser.stop()
cls.browser = None
def url_open(self, url, data=None, files=None, timeout=12, headers=None, allow_redirects=True, head=False):
if url.startswith('/'):
url = self.base_url() + url
if head:
return self.opener.head(url, data=data, files=files, timeout=timeout, headers=headers, allow_redirects=False)
if data or files:
return self.opener.post(url, data=data, files=files, timeout=timeout, headers=headers, allow_redirects=allow_redirects)
return self.opener.get(url, timeout=timeout, headers=headers, allow_redirects=allow_redirects)
def _wait_remaining_requests(self, timeout=10):
def get_http_request_threads():
return [t for t in threading.enumerate() if t.name.startswith('odoo.service.http.request.')]
start_time = time.time()
request_threads = get_http_request_threads()
self._logger.info('waiting for threads: %s', request_threads)
for thread in request_threads:
thread.join(timeout - (time.time() - start_time))
request_threads = get_http_request_threads()
for thread in request_threads:
self._logger.info("Stop waiting for thread %s handling request for url %s",
thread.name, getattr(thread, 'url', '<UNKNOWN>'))
if request_threads:
self._logger.info('remaining requests')
odoo.tools.misc.dumpstacks()
def logout(self, keep_db=True):
self.session.logout(keep_db=keep_db)
odoo.http.root.session_store.save(self.session)
def authenticate(self, user, password):
if getattr(self, 'session', None):
odoo.http.root.session_store.delete(self.session)
self.session = session = odoo.http.root.session_store.new()
session.update(odoo.http.get_default_session(), db=get_db_name())
session.context['lang'] = odoo.http.DEFAULT_LANG
if user: # if authenticated
# Flush and clear the current transaction. This is useful, because
# the call below opens a test cursor, which uses a different cache
# than this transaction.
self.cr.flush()
self.cr.clear()
uid = self.registry['res.users'].authenticate(session.db, user, password, {'interactive': False})
env = api.Environment(self.cr, uid, {})
session.uid = uid
session.login = user
session.session_token = uid and security.compute_session_token(session, env)
session.context = dict(env['res.users'].context_get())
odoo.http.root.session_store.save(session)
# Reset the opener: turns out when we set cookies['foo'] we're really
# setting a cookie on domain='' path='/'.
#
# But then our friendly neighborhood server might set a cookie for
# domain='localhost' path='/' (with the same value) which is considered
# a *different* cookie following ours rather than the same.
#
# When we update our cookie, it's done in-place, so the server-set
# cookie is still present and (as it follows ours and is more precise)
# very likely to still be used, therefore our session change is ignored.
#
# An alternative would be to set the cookie to None (unsetting it
# completely) or clear-ing session.cookies.
self.opener = Opener(self.cr)
self.opener.cookies['session_id'] = session.sid
if self.browser:
self._logger.info('Setting session cookie in browser')
self.browser.set_cookie('session_id', session.sid, '/', HOST)
return session
def browser_js(self, url_path, code, ready='', login=None, timeout=60, cookies=None, error_checker=None, watch=False, **kw):
""" Test js code running in the browser
- optionnally log as 'login'
- load page given by url_path
- wait for ready object to be available
- eval(code) inside the page
- open another chrome window to watch code execution if watch is True
To signal success test do: console.log('test successful')
To signal test failure raise an exception or call console.error with a message.
Test will stop when a failure occurs if error_checker is not defined or returns True for this message
"""
if not self.env.registry.loaded:
self._logger.warning('HttpCase test should be in post_install only')
# increase timeout if coverage is running
if any(f.filename.endswith('/coverage/execfile.py') for f in inspect.stack() if f.filename):
timeout = timeout * 1.5
self.start_browser()
if watch and self.browser.dev_tools_frontend_url:
_logger.warning('watch mode is only suitable for local testing - increasing tour timeout to 3600')
timeout = max(timeout*10, 3600)
debug_front_end = f'http://127.0.0.1:{self.browser.devtools_port}{self.browser.dev_tools_frontend_url}'
self.browser._chrome_without_limit([self.browser.executable, debug_front_end])
time.sleep(3)
try:
self.authenticate(login, login)
# Flush and clear the current transaction. This is useful in case
# we make requests to the server, as these requests are made with
# test cursors, which uses different caches than this transaction.
self.cr.flush()
self.cr.clear()
url = werkzeug.urls.url_join(self.base_url(), url_path)
if watch:
parsed = werkzeug.urls.url_parse(url)
qs = parsed.decode_query()
qs['watch'] = '1'
url = parsed.replace(query=werkzeug.urls.url_encode(qs)).to_url()
self._logger.info('Open "%s" in browser', url)
if self.browser.screencasts_dir:
self._logger.info('Starting screencast')
self.browser.start_screencast()
if cookies:
for name, value in cookies.items():
self.browser.set_cookie(name, value, '/', HOST)
self.browser.navigate_to(url, wait_stop=not bool(ready))
# Needed because tests like test01.js (qunit tests) are passing a ready
# code = ""
ready = ready or "document.readyState === 'complete'"
self.assertTrue(self.browser._wait_ready(ready), 'The ready "%s" code was always falsy' % ready)
error = False
try:
self.browser._wait_code_ok(code, timeout, error_checker=error_checker)
except ChromeBrowserException as chrome_browser_exception:
error = chrome_browser_exception
if error: # dont keep initial traceback, keep that outside of except
if code:
message = 'The test code "%s" failed' % code
else:
message = "Some js test failed"
self.fail('%s\n\n%s' % (message, error))
finally:
# clear browser to make it stop sending requests, in case we call
# the method several times in a test method
self.browser.delete_cookie('session_id', domain=HOST)
self.browser.clear()
self._wait_remaining_requests()
@classmethod
def base_url(cls):
return f"http://{HOST}:{odoo.tools.config['http_port']}"
def start_tour(self, url_path, tour_name, step_delay=None, **kwargs):
"""Wrapper for `browser_js` to start the given `tour_name` with the
optional delay between steps `step_delay`. Other arguments from
`browser_js` can be passed as keyword arguments."""
step_delay = ', %s' % step_delay if step_delay else ''
code = kwargs.pop('code', "odoo.startTour('%s'%s)" % (tour_name, step_delay))
ready = kwargs.pop('ready', "odoo.__DEBUG__.services['web_tour.tour'].tours['%s'].ready" % tour_name)
return self.browser_js(url_path=url_path, code=code, ready=ready, **kwargs)
def profile(self, **kwargs):
"""
for http_case, also patch _get_profiler_context_manager in order to profile all requests
"""
sup = super()
_profiler = sup.profile(**kwargs)
def route_profiler(request):
return sup.profile(description=request.httprequest.full_path)
return profiler.Nested(_profiler, patch('odoo.http.Request._get_profiler_context_manager', route_profiler))
# kept for backward compatibility
class HttpSavepointCase(HttpCase):
@classmethod
def __init_subclass__(cls):
super().__init_subclass__()
warnings.warn(
"Deprecated class HttpSavepointCase has been merged into HttpCase",
DeprecationWarning, stacklevel=2,
)
def no_retry(arg):
"""Disable auto retry on decorated test method or test class"""
arg._retry = False
return arg
def users(*logins):
""" Decorate a method to execute it once for each given user. """
@decorator
def _users(func, *args, **kwargs):
self = args[0]
old_uid = self.uid
try:
# retrieve users
Users = self.env['res.users'].with_context(active_test=False)
user_id = {
user.login: user.id
for user in Users.search([('login', 'in', list(logins))])
}
for login in logins:
with self.subTest(login=login):
# switch user and execute func
self.uid = user_id[login]
func(*args, **kwargs)
# Invalidate the cache between subtests, in order to not reuse
# the former user's cache (`test_read_mail`, `test_write_mail`)
self.env.invalidate_all()
finally:
self.uid = old_uid
return _users
@decorator
def warmup(func, *args, **kwargs):
""" Decorate a test method to run it twice: once for a warming up phase, and
a second time for real. The test attribute ``warm`` is set to ``False``
during warm up, and ``True`` once the test is warmed up. Note that the
effects of the warmup phase are rolled back thanks to a savepoint.
"""
self = args[0]
self.env.flush_all()
self.env.invalidate_all()
# run once to warm up the caches
self.warm = False
self.cr.execute('SAVEPOINT test_warmup')
func(*args, **kwargs)
self.env.flush_all()
# run once for real
self.cr.execute('ROLLBACK TO SAVEPOINT test_warmup')
self.env.invalidate_all()
self.warm = True
func(*args, **kwargs)
def can_import(module):
""" Checks if <module> can be imported, returns ``True`` if it can be,
``False`` otherwise.
To use with ``unittest.skipUnless`` for tests conditional on *optional*
dependencies, which may or may be present but must still be tested if
possible.
"""
try:
importlib.import_module(module)
except ImportError:
return False
else:
return True
class Form(object):
""" Server-side form view implementation (partial)
Implements much of the "form view" manipulation flow, such that
server-side tests can more properly reflect the behaviour which would be
observed when manipulating the interface:
* call default_get and the relevant onchanges on "creation"
* call the relevant onchanges on setting fields
* properly handle defaults & onchanges around x2many fields
Saving the form returns the created record if in creation mode.
Regular fields can just be assigned directly to the form, for
:class:`~odoo.fields.Many2one` fields assign a singleton recordset::
# empty recordset => creation mode
f = Form(self.env['sale.order'])
f.partner_id = a_partner
so = f.save()
When editing a record, using the form as a context manager to
automatically save it at the end of the scope::
with Form(so) as f2:
f2.payment_term_id = env.ref('account.account_payment_term_15days')
# f2 is saved here
For :class:`~odoo.fields.Many2many` fields, the field itself is a
:class:`~odoo.tests.common.M2MProxy` and can be altered by adding or
removing records::
with Form(user) as u:
u.groups_id.add(env.ref('account.group_account_manager'))
u.groups_id.remove(id=env.ref('base.group_portal').id)
Finally :class:`~odoo.fields.One2many` are reified as
:class:`~odoo.tests.common.O2MProxy`.
Because the :class:`~odoo.fields.One2many` only exists through its
parent, it is manipulated more directly by creating "sub-forms"
with the :meth:`~odoo.tests.common.O2MProxy.new` and
:meth:`~odoo.tests.common.O2MProxy.edit` methods. These would
normally be used as context managers since they get saved in the
parent record::
with Form(so) as f3:
# add support
with f3.order_line.new() as line:
line.product_id = env.ref('product.product_product_2')
# add a computer
with f3.order_line.new() as line:
line.product_id = env.ref('product.product_product_3')
# we actually want 5 computers
with f3.order_line.edit(1) as line:
line.product_uom_qty = 5
# remove support
f3.order_line.remove(index=0)
# SO is saved here
:param recordp: empty or singleton recordset. An empty recordset will
put the view in "creation" mode and trigger calls to
default_get and on-load onchanges, a singleton will
put it in "edit" mode and only load the view's data.
:type recordp: odoo.models.Model
:param view: the id, xmlid or actual view object to use for
onchanges and view constraints. If none is provided,
simply loads the default view for the model.
:type view: int | str | odoo.model.Model
.. versionadded:: 12.0
"""
def __init__(self, recordp, view=None):
# necessary as we're overriding setattr
assert isinstance(recordp, BaseModel)
env = recordp.env
object.__setattr__(self, '_env', env)
# store model bit only
object.__setattr__(self, '_model', recordp.browse(()))
if isinstance(view, BaseModel):
assert view._name == 'ir.ui.view', "the view parameter must be a view id, xid or record, got %s" % view
view_id = view.id
elif isinstance(view, str):
view_id = env.ref(view).id
else:
view_id = view or False
fvg = recordp.get_view(view_id, 'form')
fvg['tree'] = etree.fromstring(fvg['arch'])
fvg['fields'] = self._get_view_fields(fvg['tree'], recordp)
object.__setattr__(self, '_view', fvg)
self._process_fvg(recordp, fvg)
# ordered?
vals = dict.fromkeys(fvg['fields'], False)
object.__setattr__(self, '_values', vals)
object.__setattr__(self, '_changed', set())
if recordp:
assert recordp['id'], "editing unstored records is not supported"
# always load the id
vals['id'] = recordp['id']
self._init_from_values(recordp)
else:
self._init_from_defaults(self._model)
def _get_view_fields(self, node, model):
level = node.xpath('count(ancestor::field)')
fnames = set(el.get('name') for el in node.xpath('.//field[count(ancestor::field) = %s]' % level))
fields = {fname: info for fname, info in model.fields_get().items() if fname in fnames}
return fields
def _o2m_set_edition_view(self, descr, node, level):
default_view = next(
(m for m in node.get('mode', 'tree').split(',') if m != 'form'),
'tree'
)
refs = self._env['ir.ui.view']._get_view_refs(node)
# always fetch for simplicity, ensure we always have a tree and
# a form view
submodel = self._env[descr['relation']]
views = {view.tag: view for view in node.xpath('./*[descendant::field]')}
for view_type in ['tree', 'form']:
# embedded views should take the priority on externals
if view_type not in views:
sub_fvg = submodel.with_context(**refs).get_view(view_type=view_type)
sub_node = etree.fromstring(sub_fvg['arch'])
views[view_type] = sub_node
node.append(sub_node)
# if the default view is a kanban or a non-editable list, the
# "edition controller" is the form view
edition_view = 'tree' if default_view == 'tree' and views['tree'].get('editable') else 'form'
edition = {
'fields': self._get_view_fields(views[edition_view], submodel),
'tree': views[edition_view],
}
# don't recursively process o2ms in o2ms
self._process_fvg(submodel, edition, level=level-1)
descr['edition_view'] = edition
def __str__(self):
return "<%s %s(%s)>" % (
type(self).__name__,
self._model._name,
self._values.get('id', False),
)
def _process_fvg(self, model, fvg, level=2):
""" Post-processes to augment the view_get with:
* an id field (may not be present if not in the view but needed)
* pre-processed modifiers (map of modifier name to json-loaded domain)
* pre-processed onchanges list
"""
inherited_modifiers = ['invisible']
fvg['fields'].setdefault('id', {'type': 'id'})
# pre-resolve modifiers & bind to arch toplevel
modifiers = fvg['modifiers'] = {'id': {'required': [FALSE_LEAF], 'readonly': [TRUE_LEAF]}}
contexts = fvg['contexts'] = {}
order = fvg['fields_ordered'] = []
field_level = fvg['tree'].xpath('count(ancestor::field)')
eval_context = {
"uid": self._env.user.id,
"tz": self._env.user.tz,
"lang": self._env.user.lang,
"datetime": datetime,
"context_today": lambda: odoo.fields.Date.context_today(self._env.user),
"relativedelta": relativedelta,
"current_date": time.strftime("%Y-%m-%d"),
"allowed_company_ids": [self._env.user.company_id.id],
"context": {},
}
for f in fvg['tree'].xpath('.//field[count(ancestor::field) = %s]' % field_level):
fname = f.get('name')
order.append(fname)
node_modifiers = {}
for modifier, domain in json.loads(f.get('modifiers', '{}')).items():
if isinstance(domain, int):
node_modifiers[modifier] = [TRUE_LEAF] if domain else [FALSE_LEAF]
elif isinstance(domain, str):
node_modifiers[modifier] = normalize_domain(safe_eval(domain, eval_context))
else:
node_modifiers[modifier] = normalize_domain(domain)
for a in f.xpath('ancestor::*[@modifiers][count(ancestor::field) = %s]' % field_level):
ancestor_modifiers = json.loads(a.get('modifiers'))
for modifier in inherited_modifiers:
if modifier in ancestor_modifiers:
domain = ancestor_modifiers[modifier]
ancestor_domain = ([TRUE_LEAF] if domain else [FALSE_LEAF]) if isinstance(domain, int) else normalize_domain(domain)
node_domain = node_modifiers.get(modifier, [])
# Combine the field modifiers with his ancestor modifiers with an OR connector
# e.g. A field is invisible if its own invisible modifier is True
# OR if one of its ancestor invisible modifier is True
node_modifiers[modifier] = expression.OR([ancestor_domain, node_domain])
if fname in modifiers:
# The field is multiple times in the view, combine the modifier domains with an AND connector
# e.g. a field is invisible if all occurences of the field are invisible in the view.
# e.g. a field is readonly if all occurences of the field are readonly in the view.
for modifier in set(node_modifiers.keys()).union(modifiers[fname].keys()):
modifiers[fname][modifier] = expression.AND([
modifiers[fname].get(modifier, [FALSE_LEAF]),
node_modifiers.get(modifier, [FALSE_LEAF]),
])
else:
modifiers[fname] = node_modifiers
ctx = f.get('context')
if ctx:
contexts[fname] = ctx
descr = fvg['fields'].get(fname) or {'type': None}
# FIXME: better widgets support
# NOTE: selection breaks because of m2o widget=selection
if f.get('widget') in ['many2many']:
descr['type'] = f.get('widget')
if level and descr['type'] == 'one2many':
self._o2m_set_edition_view(descr, f, level)
fvg['onchange'] = model._onchange_spec({'arch': etree.tostring(fvg['tree'])})
def _init_from_defaults(self, model):
vals = self._values
vals.clear()
vals['id'] = False
# call onchange with an empty list of fields; this retrieves default
# values, applies onchanges and return the result
self._perform_onchange([])
# fill in whatever fields are still missing with falsy values
vals.update(
(f, _cleanup_from_default(descr['type'], False))
for f, descr in self._view['fields'].items()
if f not in vals
)
# mark all fields as modified (though maybe this should be done on
# save when creating for better reliability?)
self._changed.update(self._view['fields'])
def _init_from_values(self, values):
self._values.update(
record_to_values(self._view['fields'], values))
def __getattr__(self, field):
descr = self._view['fields'].get(field)
assert descr is not None, "%s was not found in the view" % field
v = self._values[field]
if descr['type'] == 'many2one':
Model = self._env[descr['relation']]
if not v:
return Model
return Model.browse(v)
elif descr['type'] == 'many2many':
return M2MProxy(self, field)
elif descr['type'] == 'one2many':
return O2MProxy(self, field)
return v
def _get_modifier(self, field, modifier, *, default=False, view=None, modmap=None, vals=None):
if view is None:
view = self._view
d = (modmap or view['modifiers'])[field].get(modifier, default)
if isinstance(d, bool):
return d
if vals is None:
vals = self._values
stack = []
for it in reversed(d):
if it == '!':
stack.append(not stack.pop())
elif it == '&':
e1 = stack.pop()
e2 = stack.pop()
stack.append(e1 and e2)
elif it == '|':
e1 = stack.pop()
e2 = stack.pop()
stack.append(e1 or e2)
elif isinstance(it, tuple):
if it == TRUE_LEAF:
stack.append(True)
continue
elif it == FALSE_LEAF:
stack.append(False)
continue
f, op, val = it
# hack-ish handling of parent.<field> modifiers
f, n = re.subn(r'^parent\.', '', f, 1)
if n:
field_val = vals['•parent•'][f]
else:
field_val = vals[f]
# apparent artefact of JS data representation: m2m field
# values are assimilated to lists of ids?
# FIXME: SSF should do that internally, but the requirement
# of recursively post-processing to generate lists of
# commands on save (e.g. m2m inside an o2m) means the
# data model needs proper redesign
# we're looking up the "current view" so bits might be
# missing when processing o2ms in the parent (see
# values_to_save:1450 or so)
f_ = view['fields'].get(f, {'type': None})
if f_['type'] == 'many2many':
# field value should be [(6, _, ids)], we want just the ids
field_val = field_val[0][2] if field_val else []
stack.append(self._OPS[op](field_val, val))
else:
raise ValueError("Unknown domain element %s" % [it])
[result] = stack
return result
_OPS = {
'=': operator.eq,
'==': operator.eq,
'!=': operator.ne,
'<': operator.lt,
'<=': operator.le,
'>=': operator.ge,
'>': operator.gt,
'in': lambda a, b: (a in b) if isinstance(b, (tuple, list)) else (b in a),
'not in': lambda a, b: (a not in b) if isinstance(b, (tuple, list)) else (b not in a),
}
def _get_context(self, field):
c = self._view['contexts'].get(field)
if not c:
return {}
# see _getEvalContext
# the context for a field's evals (of domain/context) is the composition of:
# * the parent's values
# * ??? element.context ???
# * the environment's context (?)
# * a few magic values
record_id = self._values.get('id') or False
ctx = dict(self._values_to_save(all_fields=True))
ctx.update(self._env.context)
ctx.update(
id=record_id,
active_id=record_id,
active_ids=[record_id] if record_id else [],
active_model=self._model._name,
current_date=date.today().strftime("%Y-%m-%d"),
)
return safe_eval(c, ctx, {'context': ctx})
def __setattr__(self, field, value):
descr = self._view['fields'].get(field)
assert descr is not None, "%s was not found in the view" % field
assert descr['type'] not in ('many2many', 'one2many'), \
"Can't set an o2m or m2m field, manipulate the corresponding proxies"
assert not self._get_modifier(field, 'readonly'), \
"can't write on readonly field {}".format(field)
assert not self._get_modifier(field, 'invisible'), \
"can't write on invisible field {}".format(field)
if descr['type'] == 'many2one':
assert isinstance(value, BaseModel) and value._name == descr['relation']
# store just the id: that's the output of default_get & (more
# or less) onchange.
value = value.id
self._values[field] = value
self._perform_onchange([field])
# enables with Form(...) as f: f.a = 1; f.b = 2; f.c = 3
# q: how to get recordset?
def __enter__(self):
return self
def __exit__(self, etype, _evalue, _etb):
if not etype:
self.save()
def save(self):
""" Saves the form, returns the created record if applicable
* does not save ``readonly`` fields
* does not save unmodified fields (during edition) any assignment
or onchange return marks the field as modified, even if set to its
current value
:raises AssertionError: if the form has any unfilled required field
"""
id_ = self._values.get('id')
values = self._values_to_save()
if id_:
r = self._model.browse(id_)
if values:
r.write(values)
else:
r = self._model.create(values)
self._values.update(
record_to_values(self._view['fields'], r)
)
self._changed.clear()
self._model.env.flush_all()
self._model.env.clear() # discard cache and pending recomputations
return r
def _values_to_save(self, all_fields=False):
""" Validates values and returns only fields modified since
load/save
:param bool all_fields: if False (the default), checks for required
fields and only save fields which are changed
and not readonly
"""
view = self._view
fields = self._view['fields']
record_values = self._values
changed = self._changed
return self._values_to_save_(
record_values, fields, view,
changed, all_fields
)
def _values_to_save_(
self, record_values, fields, view,
changed, all_fields=False, modifiers_values=None,
parent_link=None
):
""" Validates & extracts values to save, recursively in order to handle
o2ms properly
:param dict record_values: values of the record to extract
:param dict fields: fields_get result
:param view: view tree
:param set changed: set of fields which have been modified (since last save)
:param bool all_fields:
whether to ignore normal filtering and just return everything
:param dict modifiers_values:
defaults to ``record_values``, but o2ms need some additional
massaging
"""
values = {}
for f in fields:
if f == 'id':
continue
get_modifier = functools.partial(
self._get_modifier,
f, view=view,
vals=modifiers_values or record_values
)
descr = fields[f]
v = record_values[f]
# note: maybe `invisible` should not skip `required` if model attribute
if v is False and not (all_fields or f == parent_link or descr['type'] == 'boolean' or get_modifier('invisible') or get_modifier('column_invisible')):
if get_modifier('required'):
raise AssertionError("{} is a required field ({})".format(f, view['modifiers'][f]))
# skip unmodified fields unless all_fields
if not (all_fields or f in changed):
continue
if get_modifier('readonly'):
node = _get_node(view, f)
if not (all_fields or node.get('force_save')):
continue
if descr['type'] == 'one2many':
subview = descr['edition_view']
fields_ = subview['fields']
oldvals = v
v = []
for (c, rid, vs) in oldvals:
if c == 1 and not vs:
c, vs = 4, False
elif c in (0, 1):
vs = vs or {}
missing = fields_.keys() - vs.keys()
# FIXME: maybe do this during initial loading instead?
if missing:
Model = self._env[descr['relation']]
if c == 0:
vs.update(dict.fromkeys(missing, False))
vs.update(
(k, _cleanup_from_default(fields_[k], v))
for k, v in Model.default_get(list(missing)).items()
)
else:
vs.update(record_to_values(
{k: v for k, v in fields_.items() if k not in vs},
Model.browse(rid)
))
vs = self._values_to_save_(
vs, fields_, subview,
vs._changed if isinstance(vs, UpdateDict) else vs.keys(),
all_fields,
modifiers_values={'id': False, **vs, '•parent•': record_values},
# related o2m don't have a relation_field
parent_link=descr.get('relation_field'),
)
v.append((c, rid, vs))
values[f] = v
return values
def _perform_onchange(self, fields, context=None):
assert isinstance(fields, list)
# marks any onchange source as changed
self._changed.update(fields)
# skip calling onchange() if there's no trigger on any of the changed
# fields
spec = self._view['onchange']
if fields and not any(spec[f] for f in fields):
return
record = self._model.browse(self._values.get('id'))
if context is not None:
record = record.with_context(**context)
result = record.onchange(self._onchange_values(), fields, spec)
self._model.env.flush_all()
self._model.env.clear() # discard cache and pending recomputations
if result.get('warning'):
_logger.getChild('onchange').warning("%(title)s %(message)s" % result.get('warning'))
values = result.get('value', {})
# mark onchange output as changed
self._changed.update(values.keys() & self._view['fields'].keys())
self._values.update(
(k, self._cleanup_onchange(
self._view['fields'][k],
v, self._values.get(k),
))
for k, v in values.items()
if k in self._view['fields']
)
return result
def _onchange_values(self):
return self._onchange_values_(self._view['fields'], self._values)
def _onchange_values_(self, fields, record):
""" Recursively cleanup o2m values for onchanges:
* if an o2m command is a 1 (UPDATE) and there is nothing to update, send
a 4 instead (LINK_TO) instead as that's what the webclient sends for
unmodified rows
* if an o2m command is a 1 (UPDATE) and only a subset of its fields have
been modified, only send the modified ones
This needs to be recursive as there are people who put invisible o2ms
inside their o2ms.
"""
values = {}
for k, v in record.items():
if fields[k]['type'] == 'one2many':
subfields = fields[k]['edition_view']['fields']
it = values[k] = []
for (c, rid, vs) in v:
if c == 1 and isinstance(vs, UpdateDict):
vs = dict(vs.changed_items())
if c == 1 and not vs:
it.append((4, rid, False))
elif c in (0, 1):
it.append((c, rid, self._onchange_values_(subfields, vs)))
else:
it.append((c, rid, vs))
else:
values[k] = v
return values
def _cleanup_onchange(self, descr, value, current):
if descr['type'] == 'many2one':
if not value:
return False
# out of onchange, m2o are name-gotten
return value[0]
elif descr['type'] == 'one2many':
# ignore o2ms nested in o2ms
if not descr['edition_view']:
return []
if current is None:
current = []
v = []
c = {t[1] for t in current if t[0] in (1, 2)}
current_values = {c[1]: c[2] for c in current if c[0] == 1}
# which view should this be???
subfields = descr['edition_view']['fields']
# TODO: simplistic, unlikely to work if e.g. there's a 5 inbetween other commands
for command in value:
if command[0] == 0:
v.append((0, 0, {
k: self._cleanup_onchange(subfields[k], v, None)
for k, v in command[2].items()
if k in subfields
}))
elif command[0] == 1:
record_id = command[1]
c.discard(record_id)
stored = current_values.get(record_id)
if stored is None:
record = self._env[descr['relation']].browse(record_id)
stored = UpdateDict(record_to_values(subfields, record))
updates = (
(k, self._cleanup_onchange(subfields[k], v, stored.get(k)))
for k, v in command[2].items()
if k in subfields
)
for field, value in updates:
# if there are values from the onchange which differ
# from current values, update & mark field as changed
if stored.get(field, value) != value:
stored._changed.add(field)
stored[field] = value
v.append((1, record_id, stored))
elif command[0] == 2:
c.discard(command[1])
v.append((2, command[1], False))
elif command[0] == 4:
c.discard(command[1])
v.append((1, command[1], None))
elif command[0] == 5:
v = []
# explicitly mark all non-relinked (or modified) records as deleted
for id_ in c: v.append((2, id_, False))
return v
elif descr['type'] == 'many2many':
# onchange result is a bunch of commands, normalize to single 6
if current is None:
ids = []
else:
ids = list(current[0][2])
for command in value:
if command[0] == 1:
ids.append(command[1])
elif command[0] == 3:
ids.remove(command[1])
elif command[0] == 4:
ids.append(command[1])
elif command[0] == 5:
del ids[:]
elif command[0] == 6:
ids[:] = command[2]
else:
raise ValueError(
"Unsupported M2M command %d" % command[0])
return [(6, False, ids)]
return value
class O2MForm(Form):
# noinspection PyMissingConstructor
def __init__(self, proxy, index=None):
m = proxy._model
object.__setattr__(self, '_proxy', proxy)
object.__setattr__(self, '_index', index)
object.__setattr__(self, '_env', m.env)
object.__setattr__(self, '_model', m)
# copy so we don't risk breaking it too much (?)
fvg = dict(proxy._descr['edition_view'])
object.__setattr__(self, '_view', fvg)
self._process_fvg(m, fvg)
vals = dict.fromkeys(fvg['fields'], False)
object.__setattr__(self, '_values', vals)
object.__setattr__(self, '_changed', set())
if index is None:
self._init_from_defaults(m)
else:
vals = proxy._records[index]
self._values.update(vals)
if hasattr(vals, '_changed'):
self._changed.update(vals._changed)
def _get_modifier(self, field, modifier, *, default=False, view=None, modmap=None, vals=None):
if vals is None:
vals = {**self._values, '•parent•': self._proxy._parent._values}
return super()._get_modifier(field, modifier, default=default, view=view, modmap=modmap, vals=vals)
def _onchange_values(self):
values = super(O2MForm, self)._onchange_values()
# computed o2m may not have a relation_field(?)
descr = self._proxy._descr
if 'relation_field' in descr: # note: should be fine because not recursive
values[descr['relation_field']] = self._proxy._parent._onchange_values()
return values
def save(self):
proxy = self._proxy
commands = proxy._parent._values[proxy._field]
values = self._values_to_save()
if self._index is None:
commands.append((0, 0, values))
else:
index = proxy._command_index(self._index)
(c, id_, vs) = commands[index]
if c == 0:
vs.update(values)
elif c == 1:
if vs is None:
vs = UpdateDict()
assert isinstance(vs, UpdateDict), type(vs)
vs.update(values)
commands[index] = (1, id_, vs)
else:
raise AssertionError("Expected command type 0 or 1, found %s" % c)
# FIXME: should be called when performing on change => value needs to be serialised into parent every time?
proxy._parent._perform_onchange([proxy._field], self._env.context)
def _values_to_save(self, all_fields=False):
""" Validates values and returns only fields modified since
load/save
"""
values = UpdateDict(self._values)
values._changed.update(self._changed)
if all_fields:
return values
for f in self._view['fields']:
if self._get_modifier(f, 'required') and not (self._get_modifier(f, 'column_invisible') or self._get_modifier(f, 'invisible')):
assert self._values[f] is not False, "{} is a required field".format(f)
return values
class UpdateDict(dict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._changed = set()
if args and isinstance(args[0], UpdateDict):
self._changed.update(args[0]._changed)
def changed_items(self):
return (
(k, v) for k, v in self.items()
if k in self._changed
)
def update(self, *args, **kw):
super().update(*args, **kw)
if args and isinstance(args[0], UpdateDict):
self._changed.update(args[0]._changed)
class X2MProxy(object):
_parent = None
_field = None
def _assert_editable(self):
assert not self._parent._get_modifier(self._field, 'readonly'),\
'field %s is not editable' % self._field
assert not self._parent._get_modifier(self._field, 'invisible'),\
'field %s is not visible' % self._field
class O2MProxy(X2MProxy):
""" O2MProxy()
"""
def __init__(self, parent, field):
self._parent = parent
self._field = field
# reify records to a list so they can be manipulated easily?
self._records = []
model = self._model
fields = self._descr['edition_view']['fields']
for (command, rid, values) in self._parent._values[self._field]:
if command == 0:
self._records.append(values)
elif command == 1:
if values is None:
# read based on view info
r = model.browse(rid)
values = UpdateDict(record_to_values(fields, r))
self._records.append(values)
elif command == 2:
pass
else:
raise AssertionError("O2M proxy only supports commands 0, 1 and 2, found %s" % command)
def __len__(self):
return len(self._records)
@property
def _model(self):
model = self._parent._env[self._descr['relation']]
ctx = self._parent._get_context(self._field)
if ctx:
model = model.with_context(**ctx)
return model
@property
def _descr(self):
return self._parent._view['fields'][self._field]
def _command_index(self, for_record):
""" Takes a record index and finds the corresponding record index
(skips all 2s, basically)
:param int for_record:
"""
commands = self._parent._values[self._field]
return next(
cidx
for ridx, cidx in enumerate(
cidx for cidx, (c, _1, _2) in enumerate(commands)
if c in (0, 1)
)
if ridx == for_record
)
def new(self):
""" Returns a :class:`Form` for a new
:class:`~odoo.fields.One2many` record, properly initialised.
The form is created from the list view if editable, or the field's
form view otherwise.
:raises AssertionError: if the field is not editable
"""
self._assert_editable()
return O2MForm(self)
def edit(self, index):
""" Returns a :class:`Form` to edit the pre-existing
:class:`~odoo.fields.One2many` record.
The form is created from the list view if editable, or the field's
form view otherwise.
:raises AssertionError: if the field is not editable
"""
self._assert_editable()
return O2MForm(self, index)
def remove(self, index):
""" Removes the record at ``index`` from the parent form.
:raises AssertionError: if the field is not editable
"""
self._assert_editable()
# remove reified record from local list & either remove 0 from
# commands list or replace 1 (update) by 2 (remove)
cidx = self._command_index(index)
commands = self._parent._values[self._field]
(command, rid, _) = commands[cidx]
if command == 0:
# record not saved yet -> just remove the command
del commands[cidx]
elif command == 1:
# record already saved, replace by 2
commands[cidx] = (2, rid, 0)
else:
raise AssertionError("Expected command 0 or 1, got %s" % commands[cidx])
# remove reified record
del self._records[index]
self._parent._perform_onchange([self._field])
class M2MProxy(X2MProxy, collections.abc.Sequence):
""" M2MProxy()
Behaves as a :class:`~collection.Sequence` of recordsets, can be
indexed or sliced to get actual underlying recordsets.
"""
def __init__(self, parent, field):
self._parent = parent
self._field = field
def __getitem__(self, it):
p = self._parent
model = p._view['fields'][self._field]['relation']
return p._env[model].browse(self._get_ids()[it])
def __len__(self):
return len(self._get_ids())
def __iter__(self):
return iter(self[:])
def __contains__(self, record):
relation_ = self._parent._view['fields'][self._field]['relation']
assert isinstance(record, BaseModel)\
and record._name == relation_
return record.id in self._get_ids()
def add(self, record):
""" Adds ``record`` to the field, the record must already exist.
The addition will only be finalized when the parent record is saved.
"""
self._assert_editable()
parent = self._parent
relation_ = parent._view['fields'][self._field]['relation']
assert isinstance(record, BaseModel) and record._name == relation_,\
"trying to assign a '{}' object to a '{}' field".format(
record._name,
relation_,
)
self._get_ids().append(record.id)
parent._perform_onchange([self._field])
def _get_ids(self):
return self._parent._values[self._field][0][2]
def remove(self, id=None, index=None):
""" Removes a record at a certain index or with a provided id from
the field.
"""
self._assert_editable()
assert (id is None) ^ (index is None), \
"can remove by either id or index"
if id is None:
# remove by index
del self._get_ids()[index]
else:
self._get_ids().remove(id)
self._parent._perform_onchange([self._field])
def clear(self):
""" Removes all existing records in the m2m
"""
self._assert_editable()
self._get_ids()[:] = []
self._parent._perform_onchange([self._field])
def record_to_values(fields, record):
r = {}
# don't read the id explicitly, not sure why but if any of the "magic" hr
# field is read alongside `id` then it blows up e.g.
# james.read(['barcode']) works fine but james.read(['id', 'barcode'])
# triggers an ACL error on barcode, likewise km_home_work or
# emergency_contact or whatever. Since we always get the id anyway, just
# remove it from the fields to read
to_read = list(fields.keys() - {'id'})
if not to_read:
return r
for f, v in record.read(to_read)[0].items():
descr = fields[f]
if descr['type'] == 'many2one':
v = v and v[0]
elif descr['type'] == 'many2many':
v = [(6, 0, v or [])]
elif descr['type'] == 'one2many':
v = [(1, r, None) for r in v or []]
elif descr['type'] == 'datetime' and isinstance(v, datetime):
v = odoo.fields.Datetime.to_string(v)
elif descr['type'] == 'date' and isinstance(v, date):
v = odoo.fields.Date.to_string(v)
r[f] = v
return r
def _cleanup_from_default(type_, value):
if not value:
if type_ == 'many2many':
return [(6, False, [])]
elif type_ == 'one2many':
return []
elif type_ in ('integer', 'float'):
return 0
return value
if type_ == 'one2many':
return [c for c in value if c[0] != 6]
elif type_ == 'datetime' and isinstance(value, datetime):
return odoo.fields.Datetime.to_string(value)
elif type_ == 'date' and isinstance(value, date):
return odoo.fields.Date.to_string(value)
return value
def _get_node(view, f, *arg):
""" Find etree node for the field ``f`` in the view's arch
"""
return next((
n for n in view['tree'].iter('field')
if n.get('name') == f
), *arg)
def tagged(*tags):
"""A decorator to tag BaseCase objects.
Tags are stored in a set that can be accessed from a 'test_tags' attribute.
A tag prefixed by '-' will remove the tag e.g. to remove the 'standard' tag.
By default, all Test classes from odoo.tests.common have a test_tags
attribute that defaults to 'standard' and 'at_install'.
When using class inheritance, the tags ARE inherited.
"""
include = {t for t in tags if not t.startswith('-')}
exclude = {t[1:] for t in tags if t.startswith('-')}
def tags_decorator(obj):
obj.test_tags = (getattr(obj, 'test_tags', set()) | include) - exclude
return obj
return tags_decorator