# -*- coding: utf-8 -*- """ The module :mod:`odoo.tests.common` provides unittest test cases and a few helpers and classes to write tests. """ import base64 import collections import concurrent.futures import contextlib import difflib import functools import importlib import inspect import itertools import json import logging import operator import os import pathlib import platform import pprint import re import shutil import signal import subprocess import sys import tempfile import threading import time import unittest from . import case import warnings from collections import defaultdict from concurrent.futures import Future, CancelledError, wait try: from concurrent.futures import InvalidStateError except ImportError: InvalidStateError = NotImplementedError from contextlib import contextmanager, ExitStack from datetime import datetime, date from dateutil.relativedelta import relativedelta from itertools import zip_longest as izip_longest from unittest.mock import patch from xmlrpc import client as xmlrpclib import requests import werkzeug.urls from lxml import etree, html import odoo from odoo import api from odoo.models import BaseModel from odoo.exceptions import AccessError from odoo.modules.registry import Registry from odoo.osv import expression from odoo.osv.expression import normalize_domain, TRUE_LEAF, FALSE_LEAF from odoo.service import security from odoo.sql_db import BaseCursor, Cursor from odoo.tools import float_compare, single_email_re, profiler, lower_logging from odoo.tools.misc import find_in_path from odoo.tools.safe_eval import safe_eval try: # the behaviour of decorator changed in 5.0.5 changing the structure of the traceback when # an error is raised inside a method using a decorator. # this is not a hudge problem for test execution but this makes error message # more difficult to read and breaks test_with_decorators # This also changes the error format making runbot error matching fail # This also breaks the first frame meaning that the module detection will also fail on runbot # In 5.1 decoratorx was introduced and it looks like it has the same behaviour of old decorator from decorator import decoratorx as decorator except ImportError: from decorator import decorator try: import websocket except ImportError: # chrome headless tests will be skipped websocket = None _logger = logging.getLogger(__name__) # The odoo library is supposed already configured. ADDONS_PATH = odoo.tools.config['addons_path'] HOST = '127.0.0.1' # Useless constant, tests are aware of the content of demo data ADMIN_USER_ID = odoo.SUPERUSER_ID CHECK_BROWSER_SLEEP = 0.1 # seconds CHECK_BROWSER_ITERATIONS = 100 BROWSER_WAIT = CHECK_BROWSER_SLEEP * CHECK_BROWSER_ITERATIONS # seconds def get_db_name(): db = odoo.tools.config['db_name'] # If the database name is not provided on the command-line, # use the one on the thread (which means if it is provided on # the command-line, this will break when installing another # database from XML-RPC). if not db and hasattr(threading.current_thread(), 'dbname'): return threading.current_thread().dbname return db standalone_tests = defaultdict(list) def standalone(*tags): """ Decorator for standalone test functions. This is somewhat dedicated to tests that install, upgrade or uninstall some modules, which is currently forbidden in regular test cases. The function is registered under the given ``tags`` and the corresponding Odoo module name. """ def register(func): # register func by odoo module name if func.__module__.startswith('odoo.addons.'): module = func.__module__.split('.')[2] standalone_tests[module].append(func) # register func with aribitrary name, if any for tag in tags: standalone_tests[tag].append(func) standalone_tests['all'].append(func) return func return register # For backwards-compatibility - get_db_name() should be used instead DB = get_db_name() def new_test_user(env, login='', groups='base.group_user', context=None, **kwargs): """ Helper function to create a new test user. It allows to quickly create users given its login and groups (being a comma separated list of xml ids). Kwargs are directly propagated to the create to further customize the created user. User creation uses a potentially customized environment using the context parameter allowing to specify a custom context. It can be used to force a specific behavior and/or simplify record creation. An example is to use mail-related context keys in mail tests to speedup record creation. Some specific fields are automatically filled to avoid issues * groups_id: it is filled using groups function parameter; * name: "login (groups)" by default as it is required; * email: it is either the login (if it is a valid email) or a generated string 'x.x@example.com' (x being the first login letter). This is due to email being required for most odoo operations; """ if not login: raise ValueError('New users require at least a login') if not groups: raise ValueError('New users require at least user groups') if context is None: context = {} groups_id = [(6, 0, [env.ref(g.strip()).id for g in groups.split(',')])] create_values = dict(kwargs, login=login, groups_id=groups_id) # automatically generate a name as "Login (groups)" to ease user comprehension if not create_values.get('name'): create_values['name'] = '%s (%s)' % (login, groups) # automatically give a password equal to login if not create_values.get('password'): create_values['password'] = login + 'x' * (8 - len(login)) # generate email if not given as most test require an email if 'email' not in create_values: if single_email_re.match(login): create_values['email'] = login else: create_values['email'] = '%s.%s@example.com' % (login[0], login[0]) # ensure company_id + allowed company constraint works if not given at create if 'company_id' in create_values and 'company_ids' not in create_values: create_values['company_ids'] = [(4, create_values['company_id'])] return env['res.users'].with_context(**context).create(create_values) class RecordCapturer: def __init__(self, model, domain): self._model = model self._domain = domain def __enter__(self): self._before = self._model.search(self._domain) self._after = None return self def __exit__(self, exc_type, exc_value, exc_traceback): if exc_type is None: self._after = self._model.search(self._domain) - self._before @property def records(self): if self._after is None: return self._model.search(self._domain) - self._before return self._after class MetaCase(type): """ Metaclass of test case classes to assign default 'test_tags': 'standard', 'at_install' and the name of the module. """ def __init__(cls, name, bases, attrs): super(MetaCase, cls).__init__(name, bases, attrs) # assign default test tags if cls.__module__.startswith('odoo.addons.'): if getattr(cls, 'test_tags', None) is None: cls.test_tags = {'standard', 'at_install'} cls.test_module = cls.__module__.split('.')[2] cls.test_class = cls.__name__ cls.test_sequence = 0 def _normalize_arch_for_assert(arch_string, parser_method="xml"): """Takes some xml and normalize it to make it comparable to other xml in particular, blank text is removed, and the output is pretty-printed :param str arch_string: the string representing an XML arch :param str parser_method: an string representing which lxml.Parser class to use when normalizing both archs. Takes either "xml" or "html" :return: the normalized arch :rtype str: """ Parser = None if parser_method == 'xml': Parser = etree.XMLParser elif parser_method == 'html': Parser = etree.HTMLParser parser = Parser(remove_blank_text=True) arch_string = etree.fromstring(arch_string, parser=parser) return etree.tostring(arch_string, pretty_print=True, encoding='unicode') class BaseCase(case.TestCase, metaclass=MetaCase): """ Subclass of TestCase for Odoo-specific code. This class is abstract and expects self.registry, self.cr and self.uid to be initialized by subclasses. """ longMessage = True # more verbose error message by default: https://www.odoo.com/r/Vmh warm = True # False during warm-up phase (see :func:`warmup`) _python_version = sys.version_info def __init__(self, methodName='runTest'): super().__init__(methodName) self.addTypeEqualityFunc(etree._Element, self.assertTreesEqual) self.addTypeEqualityFunc(html.HtmlElement, self.assertTreesEqual) def run(self, result): testMethod = getattr(self, self._testMethodName) if getattr(testMethod, '_retry', True) and getattr(self, '_retry', True): tests_run_count = int(os.environ.get('ODOO_TEST_FAILURE_RETRIES', 0)) + 1 else: tests_run_count = 1 _logger.info('Auto retry disabled for %s', self) failure = False for retry in range(tests_run_count): if retry: _logger.runbot(f'Retrying a failed test: {self}') if retry < tests_run_count-1: with warnings.catch_warnings(), \ result.soft_fail(), \ lower_logging(25, logging.INFO) as quiet_log: super().run(result) failure = result.had_failure or quiet_log.had_error_log else: # last try super().run(result) if not failure: break def cursor(self): return self.registry.cursor() @property def uid(self): """ Get the current uid. """ return self.env.uid @uid.setter def uid(self, user): """ Set the uid by changing the test's environment. """ self.env = self.env(user=user) def ref(self, xid): """ Returns database ID for the provided :term:`external identifier`, shortcut for ``_xmlid_lookup`` :param xid: fully-qualified :term:`external identifier`, in the form :samp:`{module}.{identifier}` :raise: ValueError if not found :returns: registered id """ return self.browse_ref(xid).id def browse_ref(self, xid): """ Returns a record object for the provided :term:`external identifier` :param xid: fully-qualified :term:`external identifier`, in the form :samp:`{module}.{identifier}` :raise: ValueError if not found :returns: :class:`~odoo.models.BaseModel` """ assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'" return self.env.ref(xid) def patch(self, obj, key, val): """ Do the patch ``setattr(obj, key, val)``, and prepare cleanup. """ patcher = patch.object(obj, key, val) # this is unittest.mock.patch patcher.start() self.addCleanup(patcher.stop) @classmethod def classPatch(cls, obj, key, val): """ Do the patch ``setattr(obj, key, val)``, and prepare cleanup. """ patcher = patch.object(obj, key, val) # this is unittest.mock.patch patcher.start() cls.addClassCleanup(patcher.stop) def startPatcher(self, patcher): mock = patcher.start() self.addCleanup(patcher.stop) return mock @classmethod def startClassPatcher(cls, patcher): mock = patcher.start() cls.addClassCleanup(patcher.stop) return mock @contextmanager def with_user(self, login): """ Change user for a given test, like with self.with_user() ... """ old_uid = self.uid try: user = self.env['res.users'].sudo().search([('login', '=', login)]) assert user, "Login %s not found" % login # switch user self.uid = user.id self.env = self.env(user=self.uid) yield finally: # back self.uid = old_uid self.env = self.env(user=self.uid) @contextmanager def debug_mode(self): """ Enable the effects of group 'base.group_no_one'; mainly useful with :class:`Form`. """ origin_user_has_groups = BaseModel.user_has_groups def user_has_groups(self, groups): group_set = set(groups.split(',')) if '!base.group_no_one' in group_set: return False elif 'base.group_no_one' in group_set: group_set.remove('base.group_no_one') return not group_set or origin_user_has_groups(self, ','.join(group_set)) return origin_user_has_groups(self, groups) with patch('odoo.models.BaseModel.user_has_groups', user_has_groups): yield @contextmanager def _assertRaises(self, exception, *, msg=None): """ Context manager that clears the environment upon failure. """ with ExitStack() as init: if hasattr(self, 'env'): init.enter_context(self.env.cr.savepoint()) if issubclass(exception, AccessError): # The savepoint() above calls flush(), which leaves the # record cache with lots of data. This can prevent # access errors to be detected. In order to avoid this # issue, we clear the cache before proceeding. self.env.cr.clear() with ExitStack() as inner: cm = inner.enter_context(super().assertRaises(exception, msg=msg)) # *moves* the cleanups from init to inner, this ensures the # savepoint gets rolled back when `yield` raises `exception`, # but still allows the initialisation to be protected *and* not # interfered with by `assertRaises`. inner.push(init.pop_all()) yield cm def assertRaises(self, exception, func=None, *args, **kwargs): if func: with self._assertRaises(exception): func(*args, **kwargs) else: return self._assertRaises(exception, **kwargs) if sys.version_info < (3, 10): # simplified backport of assertNoLogs() @contextmanager def assertNoLogs(self, logger: str, level: str): # assertLogs ensures there is at least one log record when # exiting the context manager. We insert one dummy record just # so we pass that silly test while still capturing the logs. with self.assertLogs(logger, level) as capture: logging.getLogger(logger).log(getattr(logging, level), "Dummy log record") yield if len(capture.output) > 1: raise self.failureException(f"Unexpected logs found: {capture.output[1:]}") @contextmanager def assertQueries(self, expected, flush=True): """ Check the queries made by the current cursor. ``expected`` is a list of strings representing the expected queries being made. Query strings are matched against each other, ignoring case and whitespaces. """ Cursor_execute = Cursor.execute actual_queries = [] def execute(self, query, params=None, log_exceptions=None): actual_queries.append(query) return Cursor_execute(self, query, params, log_exceptions) def get_unaccent_wrapper(cr): return lambda x: x if flush: self.env.flush_all() self.env.cr.flush() with patch('odoo.sql_db.Cursor.execute', execute): with patch('odoo.osv.expression.get_unaccent_wrapper', get_unaccent_wrapper): yield actual_queries if flush: self.env.flush_all() self.env.cr.flush() self.assertEqual( len(actual_queries), len(expected), "\n---- actual queries:\n%s\n---- expected queries:\n%s" % ( "\n".join(actual_queries), "\n".join(expected), ) ) for actual_query, expect_query in zip(actual_queries, expected): self.assertEqual( "".join(actual_query.lower().split()), "".join(expect_query.lower().split()), "\n---- actual query:\n%s\n---- not like:\n%s" % (actual_query, expect_query), ) @contextmanager def assertQueryCount(self, default=0, flush=True, **counters): """ Context manager that counts queries. It may be invoked either with one value, or with a set of named arguments like ``login=value``:: with self.assertQueryCount(42): ... with self.assertQueryCount(admin=3, demo=5): ... The second form is convenient when used with :func:`users`. """ if self.warm: # mock random in order to avoid random bus gc with patch('random.random', lambda: 1): login = self.env.user.login expected = counters.get(login, default) if flush: self.env.flush_all() self.env.cr.flush() count0 = self.cr.sql_log_count yield if flush: self.env.flush_all() self.env.cr.flush() count = self.cr.sql_log_count - count0 if count != expected: # add some info on caller to allow semi-automatic update of query count frame, filename, linenum, funcname, lines, index = inspect.stack()[2] if "/odoo/addons/" in filename: filename = filename.rsplit("/odoo/addons/", 1)[1] if count > expected: msg = "Query count more than expected for user %s: %d > %d in %s at %s:%s" # add a subtest in order to continue the test_method in case of failures with self.subTest(): self.fail(msg % (login, count, expected, funcname, filename, linenum)) else: logger = logging.getLogger(type(self).__module__) msg = "Query count less than expected for user %s: %d < %d in %s at %s:%s" logger.info(msg, login, count, expected, funcname, filename, linenum) else: # flush before and after during warmup, in order to reproduce the # same operations, otherwise the caches might not be ready! if flush: self.env.flush_all() self.env.cr.flush() yield if flush: self.env.flush_all() self.env.cr.flush() def assertRecordValues(self, records, expected_values): ''' Compare a recordset with a list of dictionaries representing the expected results. This method performs a comparison element by element based on their index. Then, the order of the expected values is extremely important. Note that: - Comparison between falsy values is supported: False match with None. - Comparison between monetary field is also treated according the currency's rounding. - Comparison between x2many field is done by ids. Then, empty expected ids must be []. - Comparison between many2one field id done by id. Empty comparison can be done using any falsy value. :param records: The records to compare. :param expected_values: List of dicts expected to be exactly matched in records ''' def _compare_candidate(record, candidate, field_names): ''' Compare all the values in `candidate` with a record. :param record: record being compared :param candidate: dict of values to compare :return: A dictionary will encountered difference in values. ''' diff = {} for field_name in field_names: record_value = record[field_name] field = record._fields[field_name] field_type = field.type if field_type == 'monetary': # Compare monetary field. currency_field_name = record._fields[field_name].get_currency_field(record) record_currency = record[currency_field_name] if field_name not in candidate: diff[field_name] = (record_value, None) elif record_currency: if record_currency.compare_amounts(candidate[field_name], record_value): diff[field_name] = (record_value, record_currency.round(candidate[field_name])) elif candidate[field_name] != record_value: diff[field_name] = (record_value, candidate[field_name]) elif field_type == 'float' and field.get_digits(record.env): prec = field.get_digits(record.env)[1] if float_compare(candidate[field_name], record_value, precision_digits=prec) != 0: diff[field_name] = (record_value, candidate[field_name]) elif field_type in ('one2many', 'many2many'): # Compare x2many relational fields. # Empty comparison must be an empty list to be True. if field_name not in candidate: diff[field_name] = (sorted(record_value.ids), None) elif set(record_value.ids) != set(candidate[field_name]): diff[field_name] = (sorted(record_value.ids), sorted(candidate[field_name])) elif field_type == 'many2one': # Compare many2one relational fields. # Every falsy value is allowed to compare with an empty record. if field_name not in candidate: diff[field_name] = (record_value.id, None) elif (record_value or candidate[field_name]) and record_value.id != candidate[field_name]: diff[field_name] = (record_value.id, candidate[field_name]) else: # Compare others fields if not both interpreted as falsy values. if field_name not in candidate: diff[field_name] = (record_value, None) elif (candidate[field_name] or record_value) and record_value != candidate[field_name]: diff[field_name] = (record_value, candidate[field_name]) return diff # Compare records with candidates. different_values = [] field_names = list(expected_values[0].keys()) for index, record in enumerate(records): is_additional_record = index >= len(expected_values) candidate = {} if is_additional_record else expected_values[index] diff = _compare_candidate(record, candidate, field_names) if diff: different_values.append((index, 'additional_record' if is_additional_record else 'regular_diff', diff)) for index in range(len(records), len(expected_values)): diff = {} for field_name in field_names: diff[field_name] = (None, expected_values[index][field_name]) different_values.append((index, 'missing_record', diff)) # Build error message. if not different_values: return errors = ['The records and expected_values do not match.'] if len(records) != len(expected_values): errors.append('Wrong number of records to compare: %d records versus %d expected values.' % (len(records), len(expected_values))) for index, diff_type, diff in different_values: if diff_type == 'regular_diff': errors.append('\n==== Differences at index %s ====' % index) record_diff = ['%s:%s' % (k, v[0]) for k, v in diff.items()] candidate_diff = ['%s:%s' % (k, v[1]) for k, v in diff.items()] errors.append('\n'.join(difflib.unified_diff(record_diff, candidate_diff))) elif diff_type == 'additional_record': errors += [ '\n==== Additional record ====', pprint.pformat(dict((k, v[0]) for k, v in diff.items())), ] elif diff_type == 'missing_record': errors += [ '\n==== Missing record ====', pprint.pformat(dict((k, v[1]) for k, v in diff.items())), ] self.fail('\n'.join(errors)) # turns out this thing may not be quite as useful as we thought... def assertItemsEqual(self, a, b, msg=None): self.assertCountEqual(a, b, msg=None) def assertTreesEqual(self, n1, n2, msg=None): self.assertIsNotNone(n1, msg) self.assertIsNotNone(n2, msg) self.assertEqual(n1.tag, n2.tag, msg) # Because lxml.attrib is an ordereddict for which order is important # to equality, even though *we* don't care self.assertEqual(dict(n1.attrib), dict(n2.attrib), msg) self.assertEqual((n1.text or u'').strip(), (n2.text or u'').strip(), msg) self.assertEqual((n1.tail or u'').strip(), (n2.tail or u'').strip(), msg) for c1, c2 in izip_longest(n1, n2): self.assertTreesEqual(c1, c2, msg) def _assertXMLEqual(self, original, expected, parser="xml"): """Asserts that two xmls archs are equal :param original: the xml arch to test :type original: str :param expected: the xml arch of reference :type expected: str :param parser: an string representing which lxml.Parser class to use when normalizing both archs. Takes either "xml" or "html" :type parser: str """ if original: original = _normalize_arch_for_assert(original, parser) if expected: expected = _normalize_arch_for_assert(expected, parser) self.assertEqual(original, expected) def assertXMLEqual(self, original, expected): return self._assertXMLEqual(original, expected) def assertHTMLEqual(self, original, expected): return self._assertXMLEqual(original, expected, 'html') def profile(self, description='', **kwargs): test_method = getattr(self, '_testMethodName', 'Unknown test method') if not hasattr(self, 'profile_session'): self.profile_session = profiler.make_session(test_method) return profiler.Profiler( description='%s uid:%s %s %s' % (test_method, self.env.user.id, 'warm' if self.warm else 'cold', description), db=self.env.cr.dbname, profile_session=self.profile_session, **kwargs) savepoint_seq = itertools.count() class TransactionCase(BaseCase): """ Test class in which all test methods are run in a single transaction, but each test method is run in a sub-transaction managed by a savepoint. The transaction's cursor is always closed without committing. The data setup common to all methods should be done in the class method `setUpClass`, so that it is done once for all test methods. This is useful for test cases containing fast tests but with significant database setup common to all cases (complex in-db test data). After being run, each test method cleans up the record cache and the registry cache. However, there is no cleanup of the registry models and fields. If a test modifies the registry (custom models and/or fields), it should prepare the necessary cleanup (`self.registry.reset_changes()`). """ registry: Registry = None env: api.Environment = None cr: Cursor = None @classmethod def _gc_filestore(cls): # attachment can be created or unlink during the tests. # they can addup during test and take some disc space. # since cron are not running during tests, we need to gc manually # We need to check the status of the file system outside of the test cursor with odoo.registry(get_db_name()).cursor() as cr: gc_env = api.Environment(cr, odoo.SUPERUSER_ID, {}) gc_env['ir.attachment']._gc_file_store_unsafe() @classmethod def setUpClass(cls): super().setUpClass() cls.addClassCleanup(cls._gc_filestore) cls.registry = odoo.registry(get_db_name()) cls.addClassCleanup(cls.registry.reset_changes) cls.addClassCleanup(cls.registry.clear_caches) cls.cr = cls.registry.cursor() cls.addClassCleanup(cls.cr.close) cls.env = api.Environment(cls.cr, odoo.SUPERUSER_ID, {}) def setUp(self): super().setUp() # restore environments after the test to avoid invoking flush() with an # invalid environment (inexistent user id) from another test envs = self.env.all.envs for env in list(envs): self.addCleanup(env.clear) # restore the set of known environments as it was at setUp self.addCleanup(envs.update, list(envs)) self.addCleanup(envs.clear) self.addCleanup(self.registry.clear_caches) # This prevents precommit functions and data from piling up # until cr.flush is called in 'assertRaises' clauses # (these are not cleared in self.env.clear or envs.clear) cr = self.env.cr def _reset(cb, funcs, data): cb._funcs = funcs cb.data = data for callback in [cr.precommit, cr.postcommit, cr.prerollback, cr.postrollback]: self.addCleanup(_reset, callback, collections.deque(callback._funcs), dict(callback.data)) # flush everything in setUpClass before introducing a savepoint self.env.flush_all() self._savepoint_id = next(savepoint_seq) self.cr.execute('SAVEPOINT test_%d' % self._savepoint_id) self.addCleanup(self.cr.execute, 'ROLLBACK TO SAVEPOINT test_%d' % self._savepoint_id) self.patch(self.registry['res.partner'], '_get_gravatar_image', lambda *a: False) class SavepointCase(TransactionCase): @classmethod def __init_subclass__(cls): super().__init_subclass__() warnings.warn( "Deprecated class SavepointCase has been merged into TransactionCase", DeprecationWarning, stacklevel=2, ) class SingleTransactionCase(BaseCase): """ TestCase in which all test methods are run in the same transaction, the transaction is started with the first test method and rolled back at the end of the last. """ @classmethod def __init_subclass__(cls): super().__init_subclass__() if issubclass(cls, TransactionCase): _logger.warning("%s inherits from both TransactionCase and SingleTransactionCase") @classmethod def setUpClass(cls): super().setUpClass() cls.registry = odoo.registry(get_db_name()) cls.addClassCleanup(cls.registry.reset_changes) cls.addClassCleanup(cls.registry.clear_caches) cls.cr = cls.registry.cursor() cls.addClassCleanup(cls.cr.close) cls.env = api.Environment(cls.cr, odoo.SUPERUSER_ID, {}) def setUp(self): super(SingleTransactionCase, self).setUp() self.env.flush_all() class ChromeBrowserException(Exception): pass def fmap(future, map_fun): """Maps a future's result through a callback. Resolves to the application of ``map_fun`` to the result of ``future``. .. warning:: this does *not* recursively resolve futures, if that's what you need see :func:`fchain` """ fmap_future = Future() @future.add_done_callback def _(f): try: fmap_future.set_result(map_fun(f.result())) except Exception as e: fmap_future.set_exception(e) return fmap_future def fchain(future, next_callback): """Chains a future's result to a new future through a callback. Corresponds to the ``bind`` monadic operation (aka flatmap aka then... kinda). """ new_future = Future() @future.add_done_callback def _(f): try: n = next_callback(f.result()) @n.add_done_callback def _(f): try: new_future.set_result(f.result()) except Exception as e: new_future.set_exception(e) except Exception as e: new_future.set_exception(e) return new_future class ChromeBrowser: """ Helper object to control a Chrome headless process. """ remote_debugging_port = 0 # 9222, change it in a non-git-tracked file def __init__(self, test_class): self._logger = test_class._logger self.test_class = test_class if websocket is None: self._logger.warning("websocket-client module is not installed") raise unittest.SkipTest("websocket-client module is not installed") self.devtools_port = None self.ws_url = '' # WebSocketUrl self.ws = None # websocket self.user_data_dir = tempfile.mkdtemp(suffix='_chrome_odoo') self.chrome_pid = None otc = odoo.tools.config self.screenshots_dir = os.path.join(otc['screenshots'], get_db_name(), 'screenshots') self.screencasts_dir = None self.screencasts_frames_dir = None if otc['screencasts']: self.screencasts_dir = os.path.join(otc['screencasts'], get_db_name(), 'screencasts') self.screencasts_frames_dir = os.path.join(self.screencasts_dir, 'frames') os.makedirs(self.screencasts_frames_dir, exist_ok=True) self.screencast_frames = [] os.makedirs(self.screenshots_dir, exist_ok=True) self.window_size = test_class.browser_size self.touch_enabled = test_class.touch_enabled self.sigxcpu_handler = None self._chrome_start() self._find_websocket() self._logger.info('Websocket url found: %s', self.ws_url) self._open_websocket() self._request_id = itertools.count() self._result = Future() self.error_checker = None self.had_failure = False # maps request_id to Futures self._responses = {} # maps frame ids to callbacks self._frames = {} self._handlers = { 'Runtime.consoleAPICalled': self._handle_console, 'Runtime.exceptionThrown': self._handle_exception, 'Page.frameStoppedLoading': self._handle_frame_stopped_loading, 'Page.screencastFrame': self._handle_screencast_frame, } self._receiver = threading.Thread( target=self._receive, name="WebSocket events consumer", args=(get_db_name(),) ) self._receiver.start() self._logger.info('Enable chrome headless console log notification') self._websocket_send('Runtime.enable') self._logger.info('Chrome headless enable page notifications') self._websocket_send('Page.enable') if os.name == 'posix': self.sigxcpu_handler = signal.getsignal(signal.SIGXCPU) signal.signal(signal.SIGXCPU, self.signal_handler) def signal_handler(self, sig, frame): if sig == signal.SIGXCPU: _logger.info('CPU time limit reached, stopping Chrome and shutting down') self.stop() os._exit(0) def stop(self): if self.chrome_pid is not None: self._logger.info("Closing chrome headless with pid %s", self.chrome_pid) self._websocket_send('Browser.close') self._logger.info("Closing websocket connection") self.ws.close() self._logger.info("Terminating chrome headless with pid %s", self.chrome_pid) os.kill(self.chrome_pid, signal.SIGTERM) if self.user_data_dir and os.path.isdir(self.user_data_dir) and self.user_data_dir != '/': self._logger.info('Removing chrome user profile "%s"', self.user_data_dir) shutil.rmtree(self.user_data_dir, ignore_errors=True) # Restore previous signal handler if self.sigxcpu_handler and os.name == 'posix': signal.signal(signal.SIGXCPU, self.sigxcpu_handler) @property def executable(self): system = platform.system() if system == 'Linux': for bin_ in ['google-chrome', 'chromium', 'chromium-browser', 'google-chrome-stable']: try: return find_in_path(bin_) except IOError: continue elif system == 'Darwin': bins = [ '/Applications/Google Chrome.app/Contents/MacOS/Google Chrome', '/Applications/Chromium.app/Contents/MacOS/Chromium', ] for bin_ in bins: if os.path.exists(bin_): return bin_ elif system == 'Windows': bins = [ '%ProgramFiles%\\Google\\Chrome\\Application\\chrome.exe', '%ProgramFiles(x86)%\\Google\\Chrome\\Application\\chrome.exe', '%LocalAppData%\\Google\\Chrome\\Application\\chrome.exe', ] for bin_ in bins: bin_ = os.path.expandvars(bin_) if os.path.exists(bin_): return bin_ raise unittest.SkipTest("Chrome executable not found") def _chrome_without_limit(self, cmd): if os.name == 'posix' and platform.system() != 'Darwin': # since the introduction of pointer compression in Chrome 80 (v8 v8.0), # the memory reservation algorithm requires more than 8GiB of # virtual mem for alignment this exceeds our default memory limits. def preexec(): import resource resource.setrlimit(resource.RLIMIT_AS, (resource.RLIM_INFINITY, resource.RLIM_INFINITY)) else: preexec = None # pylint: disable=subprocess-popen-preexec-fn return subprocess.Popen(cmd, stderr=subprocess.DEVNULL, preexec_fn=preexec) def _spawn_chrome(self, cmd): proc = self._chrome_without_limit(cmd) port_file = pathlib.Path(self.user_data_dir, 'DevToolsActivePort') for _ in range(CHECK_BROWSER_ITERATIONS): time.sleep(CHECK_BROWSER_SLEEP) if port_file.is_file() and port_file.stat().st_size > 5: with port_file.open('r', encoding='utf-8') as f: self.devtools_port = int(f.readline()) return proc.pid raise unittest.SkipTest(f'Failed to detect chrome devtools port after {BROWSER_WAIT :.1f}s.') def _chrome_start(self): if self.chrome_pid is not None: return switches = { '--headless': '', '--no-default-browser-check': '', '--no-first-run': '', '--disable-extensions': '', '--disable-background-networking' : '', '--disable-background-timer-throttling' : '', '--disable-backgrounding-occluded-windows': '', '--disable-renderer-backgrounding' : '', '--disable-breakpad': '', '--disable-client-side-phishing-detection': '', '--disable-crash-reporter': '', '--disable-default-apps': '', '--disable-dev-shm-usage': '', '--disable-device-discovery-notifications': '', '--disable-namespace-sandbox': '', '--user-data-dir': self.user_data_dir, '--disable-translate': '', # required for tours that use Youtube autoplay conditions (namely website_slides' "course_tour") '--autoplay-policy': 'no-user-gesture-required', '--window-size': self.window_size, '--remote-debugging-address': HOST, '--remote-debugging-port': str(self.remote_debugging_port), '--no-sandbox': '', '--disable-gpu': '', '--remote-allow-origins': '*', # '--enable-precise-memory-info': '', # uncomment to debug memory leaks in qunit suite # '--js-flags': '--expose-gc', # uncomment to debug memory leaks in qunit suite } if self.touch_enabled: # enable Chrome's Touch mode, useful to detect touch capabilities using # "'ontouchstart' in window" switches['--touch-events'] = '' cmd = [self.executable] cmd += ['%s=%s' % (k, v) if v else k for k, v in switches.items()] url = 'about:blank' cmd.append(url) try: self.chrome_pid = self._spawn_chrome(cmd) except OSError: raise unittest.SkipTest("%s not found" % cmd[0]) self._logger.info('Chrome pid: %s', self.chrome_pid) def _find_websocket(self): version = self._json_command('version') self._logger.info('Browser version: %s', version['Browser']) infos = self._json_command('', get_key=0) # Infos about the first tab self.ws_url = infos['webSocketDebuggerUrl'] self.dev_tools_frontend_url = infos.get('devtoolsFrontendUrl') self._logger.info('Chrome headless temporary user profile dir: %s', self.user_data_dir) def _json_command(self, command, timeout=3, get_key=None): """Queries browser state using JSON Available commands: ``''`` return list of tabs with their id ``list`` (or ``json/``) list tabs ``new`` open a new tab :samp:`activate/{id}` activate a tab :samp:`close/{id}` close a tab ``version`` get chrome and dev tools version ``protocol`` get the full protocol """ command = '/'.join(['json', command]).strip('/') url = werkzeug.urls.url_join('http://%s:%s/' % (HOST, self.devtools_port), command) self._logger.info("Issuing json command %s", url) delay = 0.1 tries = 0 failure_info = None while timeout > 0: try: os.kill(self.chrome_pid, 0) except ProcessLookupError: message = 'Chrome crashed at startup' break try: r = requests.get(url, timeout=3) if r.ok: res = r.json() if get_key is None: return res else: return res[get_key] except requests.ConnectionError as e: failure_info = str(e) message = 'Connection Error while trying to connect to Chrome debugger' except requests.exceptions.ReadTimeout as e: failure_info = str(e) message = 'Connection Timeout while trying to connect to Chrome debugger' break except (KeyError, IndexError): message = 'Key "%s" not found in json result "%s" after connecting to Chrome debugger' % (get_key, res) time.sleep(delay) timeout -= delay delay = delay * 1.5 tries += 1 self._logger.error("%s after %s tries" % (message, tries)) if failure_info: self._logger.info(failure_info) self.stop() raise unittest.SkipTest("Error during Chrome headless connection") def _open_websocket(self): self.ws = websocket.create_connection(self.ws_url, enable_multithread=True, suppress_origin=True) if self.ws.getstatus() != 101: raise unittest.SkipTest("Cannot connect to chrome dev tools") self.ws.settimeout(0.01) def _receive(self, dbname): threading.current_thread().dbname = dbname # So CDT uses a streamed JSON-RPC structure, meaning a request is # {id, method, params} and eventually a {id, result | error} should # arrive the other way, however for events it uses "notifications" # meaning request objects without an ``id``, but *coming from the server while True: # or maybe until `self._result` is `done()`? try: msg = self.ws.recv() self._logger.debug('\n<- %s', msg) except websocket.WebSocketTimeoutException: continue except Exception as e: # if the socket is still connected something bad happened, # otherwise the client was just shut down if self.ws.connected: self._result.set_exception(e) raise self._result.cancel() return res = json.loads(msg) request_id = res.get('id') try: if request_id is None: handler = self._handlers.get(res['method']) if handler: handler(**res['params']) else: f = self._responses.pop(request_id, None) if f: if 'result' in res: f.set_result(res['result']) else: f.set_exception(ChromeBrowserException(res['error']['message'])) except Exception: _logger.exception("While processing message %s", msg) def _websocket_request(self, method, *, params=None, timeout=10.0): assert threading.get_ident() != self._receiver.ident,\ "_websocket_request must not be called from the consumer thread" if self.ws is None: return f = self._websocket_send(method, params=params, with_future=True) try: return f.result(timeout=timeout) except concurrent.futures.TimeoutError: raise TimeoutError(f'{method}({params or ""})') def _websocket_send(self, method, *, params=None, with_future=False): """send chrome devtools protocol commands through websocket If ``with_future`` is set, returns a ``Future`` for the operation. """ if self.ws is None: return result = None request_id = next(self._request_id) if with_future: result = self._responses[request_id] = Future() payload = {'method': method, 'id': request_id} if params: payload['params'] = params self._logger.debug('\n-> %s', payload) self.ws.send(json.dumps(payload)) return result def _handle_console(self, type, args=None, stackTrace=None, **kw): # pylint: disable=redefined-builtin # console formatting differs somewhat from Python's, if args[0] has # format modifiers that many of args[1:] get formatted in, missing # args are replaced by empty strings and extra args are concatenated # (space-separated) # # current version modifies the args in place which could and should # probably be improved if args: arg0, args = str(self._from_remoteobject(args[0])), args[1:] else: arg0, args = '', [] formatted = [re.sub(r'%[%sdfoOc]', self.console_formatter(args), arg0)] # formatter consumes args it uses, leaves unformatted args untouched formatted.extend(str(self._from_remoteobject(arg)) for arg in args) message = ' '.join(formatted) stack = ''.join(self._format_stack({'type': type, 'stackTrace': stackTrace})) if stack: message += '\n' + stack log_type = type self._logger.getChild('browser').log( self._TO_LEVEL.get(log_type, logging.INFO), "%s", message # might still have % characters ) if log_type == 'error': self.had_failure = True if not self.error_checker or self.error_checker(message): self.take_screenshot() self._save_screencast() try: self._result.set_exception(ChromeBrowserException(message)) except CancelledError: ... except InvalidStateError: self._logger.warning( "Trying to set result to failed (%s) but found the future settled (%s)", message, self._result ) elif 'test successful' in message: if self.test_class.allow_end_on_form: self._result.set_result(True) return qs = fchain( self._websocket_send('DOM.getDocument', params={'depth': 0}, with_future=True), lambda d: self._websocket_send("DOM.querySelector", params={ 'nodeId': d['root']['nodeId'], 'selector': '.o_legacy_form_view.o_form_editable, .o_form_dirty', }, with_future=True) ) @qs.add_done_callback def _qs_result(fut): node_id = 0 with contextlib.suppress(Exception): node_id = fut.result()['nodeId'] if node_id: self.take_screenshot("unsaved_form_") self._result.set_exception(ChromeBrowserException("""\ Tour finished with an open form view in edition mode. Form views in edition mode are automatically saved when the page is closed, \ which leads to stray network requests and inconsistencies.""")) return try: self._result.set_result(True) except Exception: # if the future was already failed, we're happy, # otherwise swap for a new failed if self._result.exception() is None: self._result = Future() self._result.set_exception(ChromeBrowserException( "Tried to make the tour successful twice." )) def _handle_exception(self, exceptionDetails, timestamp): message = exceptionDetails['text'] exception = exceptionDetails.get('exception') if exception: message += str(self._from_remoteobject(exception)) exceptionDetails['type'] = 'trace' # fake this so _format_stack works stack = ''.join(self._format_stack(exceptionDetails)) if stack: message += '\n' + stack self.take_screenshot() self._save_screencast() try: self._result.set_exception(ChromeBrowserException(message)) except CancelledError: ... except InvalidStateError: self._logger.warning( "Trying to set result to failed (%s) but found the future settled (%s)", message, self._result ) def _handle_frame_stopped_loading(self, frameId): wait = self._frames.pop(frameId, None) if wait: wait() def _handle_screencast_frame(self, sessionId, data, metadata): self._websocket_send('Page.screencastFrameAck', params={'sessionId': sessionId}) outfile = os.path.join(self.screencasts_frames_dir, 'frame_%05d.b64' % len(self.screencast_frames)) with open(outfile, 'w') as f: f.write(data) self.screencast_frames.append({ 'file_path': outfile, 'timestamp': metadata.get('timestamp') }) _TO_LEVEL = { 'debug': logging.DEBUG, 'log': logging.INFO, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, # TODO: what do with # dir, dirxml, table, trace, clear, startGroup, startGroupCollapsed, # endGroup, assert, profile, profileEnd, count, timeEnd } def take_screenshot(self, prefix='sc_', suffix=None): def handler(f): base_png = f.result(timeout=0)['data'] if not base_png: self._logger.warning("Couldn't capture screenshot: expected image data, got ?? error ??") return decoded = base64.b64decode(base_png, validate=True) fname = '{}{:%Y%m%d_%H%M%S_%f}{}.png'.format( prefix, datetime.now(), suffix or '_%s' % self.test_class.__name__) full_path = os.path.join(self.screenshots_dir, fname) with open(full_path, 'wb') as f: f.write(decoded) self._logger.runbot('Screenshot in: %s', full_path) self._logger.info('Asking for screenshot') f = self._websocket_send('Page.captureScreenshot', with_future=True) f.add_done_callback(handler) return f def _save_screencast(self, prefix='failed'): # could be encododed with something like that # ffmpeg -framerate 3 -i frame_%05d.png output.mp4 if not self.screencast_frames: self._logger.debug('No screencast frames to encode') return None for f in self.screencast_frames: with open(f['file_path'], 'rb') as b64_file: frame = base64.decodebytes(b64_file.read()) os.unlink(f['file_path']) f['file_path'] = f['file_path'].replace('.b64', '.png') with open(f['file_path'], 'wb') as png_file: png_file.write(frame) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f') fname = '%s_screencast_%s.mp4' % (prefix, timestamp) outfile = os.path.join(self.screencasts_dir, fname) try: ffmpeg_path = find_in_path('ffmpeg') except IOError: ffmpeg_path = None if ffmpeg_path: nb_frames = len(self.screencast_frames) concat_script_path = os.path.join(self.screencasts_dir, fname.replace('.mp4', '.txt')) with open(concat_script_path, 'w') as concat_file: for i in range(nb_frames): frame_file_path = os.path.join(self.screencasts_frames_dir, self.screencast_frames[i]['file_path']) end_time = time.time() if i == nb_frames - 1 else self.screencast_frames[i+1]['timestamp'] duration = end_time - self.screencast_frames[i]['timestamp'] concat_file.write("file '%s'\nduration %s\n" % (frame_file_path, duration)) concat_file.write("file '%s'" % frame_file_path) # needed by the concat plugin r = subprocess.run([ffmpeg_path, '-intra', '-f', 'concat','-safe', '0', '-i', concat_script_path, '-pix_fmt', 'yuv420p', outfile]) self._logger.log(25, 'Screencast in: %s', outfile) else: outfile = outfile.strip('.mp4') shutil.move(self.screencasts_frames_dir, outfile) self._logger.runbot('Screencast frames in: %s', outfile) def start_screencast(self): assert self.screencasts_dir self._websocket_send('Page.startScreencast') def set_cookie(self, name, value, path, domain): params = {'name': name, 'value': value, 'path': path, 'domain': domain} self._websocket_request('Network.setCookie', params=params) return def delete_cookie(self, name, **kwargs): params = {k: v for k, v in kwargs.items() if k in ['url', 'domain', 'path']} params['name'] = name self._websocket_request('Network.deleteCookies', params=params) return def _wait_ready(self, ready_code, timeout=60): self._logger.info('Evaluate ready code "%s"', ready_code) start_time = time.time() result = None while True: taken = time.time() - start_time if taken > timeout: break result = self._websocket_request('Runtime.evaluate', params={ 'expression': "try { %s } catch {}" % ready_code, 'awaitPromise': True, }, timeout=timeout-taken)['result'] if result == {'type': 'boolean', 'value': True}: time_to_ready = time.time() - start_time if taken > 2: self._logger.info('The ready code tooks too much time : %s', time_to_ready) return True self.take_screenshot(prefix='sc_failed_ready_') self._logger.info('Ready code last try result: %s', result) return False def _wait_code_ok(self, code, timeout, error_checker=None): self.error_checker = error_checker self._logger.info('Evaluate test code "%s"', code) start = time.time() res = self._websocket_request('Runtime.evaluate', params={ 'expression': code, 'awaitPromise': True, }, timeout=timeout)['result'] if res.get('subtype') == 'error': raise ChromeBrowserException("Running code returned an error: %s" % res) err = ChromeBrowserException("failed") try: # if the runcode was a promise which took some time to execute, # discount that from the timeout if self._result.result(time.time() - start + timeout) and not self.had_failure: return except CancelledError: # regular-ish shutdown return except Exception as e: err = e self.take_screenshot() self._save_screencast() if isinstance(err, ChromeBrowserException): raise err if isinstance(err, concurrent.futures.TimeoutError): raise ChromeBrowserException('Script timeout exceeded') from err raise ChromeBrowserException("Unknown error") from err def navigate_to(self, url, wait_stop=False): self._logger.info('Navigating to: "%s"', url) nav_result = self._websocket_request('Page.navigate', params={'url': url}, timeout=20.0) self._logger.info("Navigation result: %s", nav_result) if wait_stop: frame_id = nav_result['frameId'] e = threading.Event() self._frames[frame_id] = e.set self._logger.info('Waiting for frame %r to stop loading', frame_id) e.wait(10) def clear(self): self._websocket_send('Page.stopScreencast') if self.screencasts_dir and os.path.isdir(self.screencasts_frames_dir): shutil.rmtree(self.screencasts_frames_dir) self.screencast_frames = [] self._websocket_request('Page.stopLoading') self._websocket_request('Runtime.evaluate', params={'expression': """ ('serviceWorker' in navigator) && navigator.serviceWorker.getRegistrations().then( registrations => Promise.all(registrations.map(r => r.unregister())) ) """, 'awaitPromise': True}) # wait for the screenshot or whatever wait(self._responses.values(), 10) self._logger.info('Deleting cookies and clearing local storage') self._websocket_request('Network.clearBrowserCache') self._websocket_request('Network.clearBrowserCookies') self._websocket_request('Runtime.evaluate', params={'expression': 'try {localStorage.clear(); sessionStorage.clear();} catch(e) {}'}) self.navigate_to('about:blank', wait_stop=True) # hopefully after navigating to about:blank there's no event left self._frames.clear() # wait for the clearing requests to finish in case the browser is re-used wait(self._responses.values(), 10) self._responses.clear() self._result.cancel() self._result = Future() self.had_failure = False def _from_remoteobject(self, arg): """ attempts to make a CDT RemoteObject comprehensible """ objtype = arg['type'] subtype = arg.get('subtype') if objtype == 'undefined': # the undefined remoteobject is literally just {type: undefined}... return 'undefined' elif objtype != 'object' or subtype not in (None, 'array'): # value is the json representation for json object # otherwise fallback on the description which is "a string # representation of the object" e.g. the traceback for errors, the # source for functions, ... finally fallback on the entire arg mess return arg.get('value', arg.get('description', arg)) elif subtype == 'array': # apparently value is *not* the JSON representation for arrays # instead it's just Array(3) which is useless, however the preview # properties are the same as object which is useful (just ignore the # name which is the index) return '[%s]' % ', '.join( repr(p['value']) if p['type'] == 'string' else str(p['value']) for p in arg.get('preview', {}).get('properties', []) if re.match(r'\d+', p['name']) ) # all that's left is type=object, subtype=None aka custom or # non-standard objects, print as TypeName(param=val, ...), sadly because # of the way Odoo widgets are created they all appear as Class(...) # nb: preview properties are *not* recursive, the value is *all* we get return '%s(%s)' % ( arg.get('className') or 'object', ', '.join( '%s=%s' % (p['name'], repr(p['value']) if p['type'] == 'string' else p['value']) for p in arg.get('preview', {}).get('properties', []) if p.get('value') is not None ) ) LINE_PATTERN = '\tat %(functionName)s (%(url)s:%(lineNumber)d:%(columnNumber)d)\n' def _format_stack(self, logrecord): if logrecord['type'] not in ['trace']: return trace = logrecord.get('stackTrace') while trace: for f in trace['callFrames']: yield self.LINE_PATTERN % f trace = trace.get('parent') def console_formatter(self, args): """ Formats similarly to the console API: * if there are no args, don't format (return string as-is) * %% -> % * %c -> replace by styling directives (ignore for us) * other known formatters -> replace by corresponding argument * leftover known formatters (args exhausted) -> replace by empty string * unknown formatters -> return as-is """ if not args: return lambda m: m[0] def replacer(m): fmt = m[0][1] if fmt == '%': return '%' if fmt in 'sdfoOc': if not args: return '' repl = args.pop(0) if fmt == 'c': return '' return str(self._from_remoteobject(repl)) return m[0] return replacer class Opener(requests.Session): """ Flushes and clears the current transaction when starting a request. This is likely necessary when we make a request to the server, as the request is made with a test cursor, which uses a different cache than this transaction. """ def __init__(self, cr: BaseCursor): super().__init__() self.cr = cr def request(self, *args, **kwargs): self.cr.flush() self.cr.clear() return super().request(*args, **kwargs) class Transport(xmlrpclib.Transport): """ see :class:`Opener` """ def __init__(self, cr: BaseCursor): self.cr = cr super().__init__() def request(self, *args, **kwargs): self.cr.flush() self.cr.clear() return super().request(*args, **kwargs) class HttpCase(TransactionCase): """ Transactional HTTP TestCase with url_open and Chrome headless helpers. """ registry_test_mode = True browser = None browser_size = '1366x768' touch_enabled = False allow_end_on_form = False _logger: logging.Logger = None @classmethod def setUpClass(cls): super().setUpClass() ICP = cls.env['ir.config_parameter'] ICP.set_param('web.base.url', cls.base_url()) ICP.env.flush_all() # v8 api with correct xmlrpc exception handling. cls.xmlrpc_url = f'http://{HOST}:{odoo.tools.config["http_port"]:d}/xmlrpc/2/' cls._logger = logging.getLogger('%s.%s' % (cls.__module__, cls.__name__)) def setUp(self): super().setUp() if self.registry_test_mode: self.registry.enter_test_mode(self.cr) self.addCleanup(self.registry.leave_test_mode) self.xmlrpc_common = xmlrpclib.ServerProxy(self.xmlrpc_url + 'common', transport=Transport(self.cr)) self.xmlrpc_db = xmlrpclib.ServerProxy(self.xmlrpc_url + 'db', transport=Transport(self.cr)) self.xmlrpc_object = xmlrpclib.ServerProxy(self.xmlrpc_url + 'object', transport=Transport(self.cr)) # setup an url opener helper self.opener = Opener(self.cr) @classmethod def start_browser(cls): # start browser on demand if cls.browser is None: cls.browser = ChromeBrowser(cls) cls.addClassCleanup(cls.terminate_browser) @classmethod def terminate_browser(cls): if cls.browser: cls.browser.stop() cls.browser = None def url_open(self, url, data=None, files=None, timeout=12, headers=None, allow_redirects=True, head=False): if url.startswith('/'): url = self.base_url() + url if head: return self.opener.head(url, data=data, files=files, timeout=timeout, headers=headers, allow_redirects=False) if data or files: return self.opener.post(url, data=data, files=files, timeout=timeout, headers=headers, allow_redirects=allow_redirects) return self.opener.get(url, timeout=timeout, headers=headers, allow_redirects=allow_redirects) def _wait_remaining_requests(self, timeout=10): def get_http_request_threads(): return [t for t in threading.enumerate() if t.name.startswith('odoo.service.http.request.')] start_time = time.time() request_threads = get_http_request_threads() self._logger.info('waiting for threads: %s', request_threads) for thread in request_threads: thread.join(timeout - (time.time() - start_time)) request_threads = get_http_request_threads() for thread in request_threads: self._logger.info("Stop waiting for thread %s handling request for url %s", thread.name, getattr(thread, 'url', '')) if request_threads: self._logger.info('remaining requests') odoo.tools.misc.dumpstacks() def logout(self, keep_db=True): self.session.logout(keep_db=keep_db) odoo.http.root.session_store.save(self.session) def authenticate(self, user, password): if getattr(self, 'session', None): odoo.http.root.session_store.delete(self.session) self.session = session = odoo.http.root.session_store.new() session.update(odoo.http.get_default_session(), db=get_db_name()) session.context['lang'] = odoo.http.DEFAULT_LANG if user: # if authenticated # Flush and clear the current transaction. This is useful, because # the call below opens a test cursor, which uses a different cache # than this transaction. self.cr.flush() self.cr.clear() uid = self.registry['res.users'].authenticate(session.db, user, password, {'interactive': False}) env = api.Environment(self.cr, uid, {}) session.uid = uid session.login = user session.session_token = uid and security.compute_session_token(session, env) session.context = dict(env['res.users'].context_get()) odoo.http.root.session_store.save(session) # Reset the opener: turns out when we set cookies['foo'] we're really # setting a cookie on domain='' path='/'. # # But then our friendly neighborhood server might set a cookie for # domain='localhost' path='/' (with the same value) which is considered # a *different* cookie following ours rather than the same. # # When we update our cookie, it's done in-place, so the server-set # cookie is still present and (as it follows ours and is more precise) # very likely to still be used, therefore our session change is ignored. # # An alternative would be to set the cookie to None (unsetting it # completely) or clear-ing session.cookies. self.opener = Opener(self.cr) self.opener.cookies['session_id'] = session.sid if self.browser: self._logger.info('Setting session cookie in browser') self.browser.set_cookie('session_id', session.sid, '/', HOST) return session def browser_js(self, url_path, code, ready='', login=None, timeout=60, cookies=None, error_checker=None, watch=False, **kw): """ Test js code running in the browser - optionnally log as 'login' - load page given by url_path - wait for ready object to be available - eval(code) inside the page - open another chrome window to watch code execution if watch is True To signal success test do: console.log('test successful') To signal test failure raise an exception or call console.error with a message. Test will stop when a failure occurs if error_checker is not defined or returns True for this message """ if not self.env.registry.loaded: self._logger.warning('HttpCase test should be in post_install only') # increase timeout if coverage is running if any(f.filename.endswith('/coverage/execfile.py') for f in inspect.stack() if f.filename): timeout = timeout * 1.5 self.start_browser() if watch and self.browser.dev_tools_frontend_url: _logger.warning('watch mode is only suitable for local testing - increasing tour timeout to 3600') timeout = max(timeout*10, 3600) debug_front_end = f'http://127.0.0.1:{self.browser.devtools_port}{self.browser.dev_tools_frontend_url}' self.browser._chrome_without_limit([self.browser.executable, debug_front_end]) time.sleep(3) try: self.authenticate(login, login) # Flush and clear the current transaction. This is useful in case # we make requests to the server, as these requests are made with # test cursors, which uses different caches than this transaction. self.cr.flush() self.cr.clear() url = werkzeug.urls.url_join(self.base_url(), url_path) if watch: parsed = werkzeug.urls.url_parse(url) qs = parsed.decode_query() qs['watch'] = '1' url = parsed.replace(query=werkzeug.urls.url_encode(qs)).to_url() self._logger.info('Open "%s" in browser', url) if self.browser.screencasts_dir: self._logger.info('Starting screencast') self.browser.start_screencast() if cookies: for name, value in cookies.items(): self.browser.set_cookie(name, value, '/', HOST) self.browser.navigate_to(url, wait_stop=not bool(ready)) # Needed because tests like test01.js (qunit tests) are passing a ready # code = "" ready = ready or "document.readyState === 'complete'" self.assertTrue(self.browser._wait_ready(ready), 'The ready "%s" code was always falsy' % ready) error = False try: self.browser._wait_code_ok(code, timeout, error_checker=error_checker) except ChromeBrowserException as chrome_browser_exception: error = chrome_browser_exception if error: # dont keep initial traceback, keep that outside of except if code: message = 'The test code "%s" failed' % code else: message = "Some js test failed" self.fail('%s\n\n%s' % (message, error)) finally: # clear browser to make it stop sending requests, in case we call # the method several times in a test method self.browser.delete_cookie('session_id', domain=HOST) self.browser.clear() self._wait_remaining_requests() @classmethod def base_url(cls): return f"http://{HOST}:{odoo.tools.config['http_port']}" def start_tour(self, url_path, tour_name, step_delay=None, **kwargs): """Wrapper for `browser_js` to start the given `tour_name` with the optional delay between steps `step_delay`. Other arguments from `browser_js` can be passed as keyword arguments.""" step_delay = ', %s' % step_delay if step_delay else '' code = kwargs.pop('code', "odoo.startTour('%s'%s)" % (tour_name, step_delay)) ready = kwargs.pop('ready', "odoo.__DEBUG__.services['web_tour.tour'].tours['%s'].ready" % tour_name) return self.browser_js(url_path=url_path, code=code, ready=ready, **kwargs) def profile(self, **kwargs): """ for http_case, also patch _get_profiler_context_manager in order to profile all requests """ sup = super() _profiler = sup.profile(**kwargs) def route_profiler(request): return sup.profile(description=request.httprequest.full_path) return profiler.Nested(_profiler, patch('odoo.http.Request._get_profiler_context_manager', route_profiler)) # kept for backward compatibility class HttpSavepointCase(HttpCase): @classmethod def __init_subclass__(cls): super().__init_subclass__() warnings.warn( "Deprecated class HttpSavepointCase has been merged into HttpCase", DeprecationWarning, stacklevel=2, ) def no_retry(arg): """Disable auto retry on decorated test method or test class""" arg._retry = False return arg def users(*logins): """ Decorate a method to execute it once for each given user. """ @decorator def _users(func, *args, **kwargs): self = args[0] old_uid = self.uid try: # retrieve users Users = self.env['res.users'].with_context(active_test=False) user_id = { user.login: user.id for user in Users.search([('login', 'in', list(logins))]) } for login in logins: with self.subTest(login=login): # switch user and execute func self.uid = user_id[login] func(*args, **kwargs) # Invalidate the cache between subtests, in order to not reuse # the former user's cache (`test_read_mail`, `test_write_mail`) self.env.invalidate_all() finally: self.uid = old_uid return _users @decorator def warmup(func, *args, **kwargs): """ Decorate a test method to run it twice: once for a warming up phase, and a second time for real. The test attribute ``warm`` is set to ``False`` during warm up, and ``True`` once the test is warmed up. Note that the effects of the warmup phase are rolled back thanks to a savepoint. """ self = args[0] self.env.flush_all() self.env.invalidate_all() # run once to warm up the caches self.warm = False self.cr.execute('SAVEPOINT test_warmup') func(*args, **kwargs) self.env.flush_all() # run once for real self.cr.execute('ROLLBACK TO SAVEPOINT test_warmup') self.env.invalidate_all() self.warm = True func(*args, **kwargs) def can_import(module): """ Checks if can be imported, returns ``True`` if it can be, ``False`` otherwise. To use with ``unittest.skipUnless`` for tests conditional on *optional* dependencies, which may or may be present but must still be tested if possible. """ try: importlib.import_module(module) except ImportError: return False else: return True class Form(object): """ Server-side form view implementation (partial) Implements much of the "form view" manipulation flow, such that server-side tests can more properly reflect the behaviour which would be observed when manipulating the interface: * call default_get and the relevant onchanges on "creation" * call the relevant onchanges on setting fields * properly handle defaults & onchanges around x2many fields Saving the form returns the created record if in creation mode. Regular fields can just be assigned directly to the form, for :class:`~odoo.fields.Many2one` fields assign a singleton recordset:: # empty recordset => creation mode f = Form(self.env['sale.order']) f.partner_id = a_partner so = f.save() When editing a record, using the form as a context manager to automatically save it at the end of the scope:: with Form(so) as f2: f2.payment_term_id = env.ref('account.account_payment_term_15days') # f2 is saved here For :class:`~odoo.fields.Many2many` fields, the field itself is a :class:`~odoo.tests.common.M2MProxy` and can be altered by adding or removing records:: with Form(user) as u: u.groups_id.add(env.ref('account.group_account_manager')) u.groups_id.remove(id=env.ref('base.group_portal').id) Finally :class:`~odoo.fields.One2many` are reified as :class:`~odoo.tests.common.O2MProxy`. Because the :class:`~odoo.fields.One2many` only exists through its parent, it is manipulated more directly by creating "sub-forms" with the :meth:`~odoo.tests.common.O2MProxy.new` and :meth:`~odoo.tests.common.O2MProxy.edit` methods. These would normally be used as context managers since they get saved in the parent record:: with Form(so) as f3: # add support with f3.order_line.new() as line: line.product_id = env.ref('product.product_product_2') # add a computer with f3.order_line.new() as line: line.product_id = env.ref('product.product_product_3') # we actually want 5 computers with f3.order_line.edit(1) as line: line.product_uom_qty = 5 # remove support f3.order_line.remove(index=0) # SO is saved here :param recordp: empty or singleton recordset. An empty recordset will put the view in "creation" mode and trigger calls to default_get and on-load onchanges, a singleton will put it in "edit" mode and only load the view's data. :type recordp: odoo.models.Model :param view: the id, xmlid or actual view object to use for onchanges and view constraints. If none is provided, simply loads the default view for the model. :type view: int | str | odoo.model.Model .. versionadded:: 12.0 """ def __init__(self, recordp, view=None): # necessary as we're overriding setattr assert isinstance(recordp, BaseModel) env = recordp.env object.__setattr__(self, '_env', env) # store model bit only object.__setattr__(self, '_model', recordp.browse(())) if isinstance(view, BaseModel): assert view._name == 'ir.ui.view', "the view parameter must be a view id, xid or record, got %s" % view view_id = view.id elif isinstance(view, str): view_id = env.ref(view).id else: view_id = view or False fvg = recordp.get_view(view_id, 'form') fvg['tree'] = etree.fromstring(fvg['arch']) fvg['fields'] = self._get_view_fields(fvg['tree'], recordp) object.__setattr__(self, '_view', fvg) self._process_fvg(recordp, fvg) # ordered? vals = dict.fromkeys(fvg['fields'], False) object.__setattr__(self, '_values', vals) object.__setattr__(self, '_changed', set()) if recordp: assert recordp['id'], "editing unstored records is not supported" # always load the id vals['id'] = recordp['id'] self._init_from_values(recordp) else: self._init_from_defaults(self._model) def _get_view_fields(self, node, model): level = node.xpath('count(ancestor::field)') fnames = set(el.get('name') for el in node.xpath('.//field[count(ancestor::field) = %s]' % level)) fields = {fname: info for fname, info in model.fields_get().items() if fname in fnames} return fields def _o2m_set_edition_view(self, descr, node, level): default_view = next( (m for m in node.get('mode', 'tree').split(',') if m != 'form'), 'tree' ) refs = self._env['ir.ui.view']._get_view_refs(node) # always fetch for simplicity, ensure we always have a tree and # a form view submodel = self._env[descr['relation']] views = {view.tag: view for view in node.xpath('./*[descendant::field]')} for view_type in ['tree', 'form']: # embedded views should take the priority on externals if view_type not in views: sub_fvg = submodel.with_context(**refs).get_view(view_type=view_type) sub_node = etree.fromstring(sub_fvg['arch']) views[view_type] = sub_node node.append(sub_node) # if the default view is a kanban or a non-editable list, the # "edition controller" is the form view edition_view = 'tree' if default_view == 'tree' and views['tree'].get('editable') else 'form' edition = { 'fields': self._get_view_fields(views[edition_view], submodel), 'tree': views[edition_view], } # don't recursively process o2ms in o2ms self._process_fvg(submodel, edition, level=level-1) descr['edition_view'] = edition def __str__(self): return "<%s %s(%s)>" % ( type(self).__name__, self._model._name, self._values.get('id', False), ) def _process_fvg(self, model, fvg, level=2): """ Post-processes to augment the view_get with: * an id field (may not be present if not in the view but needed) * pre-processed modifiers (map of modifier name to json-loaded domain) * pre-processed onchanges list """ inherited_modifiers = ['invisible'] fvg['fields'].setdefault('id', {'type': 'id'}) # pre-resolve modifiers & bind to arch toplevel modifiers = fvg['modifiers'] = {'id': {'required': [FALSE_LEAF], 'readonly': [TRUE_LEAF]}} contexts = fvg['contexts'] = {} order = fvg['fields_ordered'] = [] field_level = fvg['tree'].xpath('count(ancestor::field)') eval_context = { "uid": self._env.user.id, "tz": self._env.user.tz, "lang": self._env.user.lang, "datetime": datetime, "context_today": lambda: odoo.fields.Date.context_today(self._env.user), "relativedelta": relativedelta, "current_date": time.strftime("%Y-%m-%d"), "allowed_company_ids": [self._env.user.company_id.id], "context": {}, } for f in fvg['tree'].xpath('.//field[count(ancestor::field) = %s]' % field_level): fname = f.get('name') order.append(fname) node_modifiers = {} for modifier, domain in json.loads(f.get('modifiers', '{}')).items(): if isinstance(domain, int): node_modifiers[modifier] = [TRUE_LEAF] if domain else [FALSE_LEAF] elif isinstance(domain, str): node_modifiers[modifier] = normalize_domain(safe_eval(domain, eval_context)) else: node_modifiers[modifier] = normalize_domain(domain) for a in f.xpath('ancestor::*[@modifiers][count(ancestor::field) = %s]' % field_level): ancestor_modifiers = json.loads(a.get('modifiers')) for modifier in inherited_modifiers: if modifier in ancestor_modifiers: domain = ancestor_modifiers[modifier] ancestor_domain = ([TRUE_LEAF] if domain else [FALSE_LEAF]) if isinstance(domain, int) else normalize_domain(domain) node_domain = node_modifiers.get(modifier, []) # Combine the field modifiers with his ancestor modifiers with an OR connector # e.g. A field is invisible if its own invisible modifier is True # OR if one of its ancestor invisible modifier is True node_modifiers[modifier] = expression.OR([ancestor_domain, node_domain]) if fname in modifiers: # The field is multiple times in the view, combine the modifier domains with an AND connector # e.g. a field is invisible if all occurences of the field are invisible in the view. # e.g. a field is readonly if all occurences of the field are readonly in the view. for modifier in set(node_modifiers.keys()).union(modifiers[fname].keys()): modifiers[fname][modifier] = expression.AND([ modifiers[fname].get(modifier, [FALSE_LEAF]), node_modifiers.get(modifier, [FALSE_LEAF]), ]) else: modifiers[fname] = node_modifiers ctx = f.get('context') if ctx: contexts[fname] = ctx descr = fvg['fields'].get(fname) or {'type': None} # FIXME: better widgets support # NOTE: selection breaks because of m2o widget=selection if f.get('widget') in ['many2many']: descr['type'] = f.get('widget') if level and descr['type'] == 'one2many': self._o2m_set_edition_view(descr, f, level) fvg['onchange'] = model._onchange_spec({'arch': etree.tostring(fvg['tree'])}) def _init_from_defaults(self, model): vals = self._values vals.clear() vals['id'] = False # call onchange with an empty list of fields; this retrieves default # values, applies onchanges and return the result self._perform_onchange([]) # fill in whatever fields are still missing with falsy values vals.update( (f, _cleanup_from_default(descr['type'], False)) for f, descr in self._view['fields'].items() if f not in vals ) # mark all fields as modified (though maybe this should be done on # save when creating for better reliability?) self._changed.update(self._view['fields']) def _init_from_values(self, values): self._values.update( record_to_values(self._view['fields'], values)) def __getattr__(self, field): descr = self._view['fields'].get(field) assert descr is not None, "%s was not found in the view" % field v = self._values[field] if descr['type'] == 'many2one': Model = self._env[descr['relation']] if not v: return Model return Model.browse(v) elif descr['type'] == 'many2many': return M2MProxy(self, field) elif descr['type'] == 'one2many': return O2MProxy(self, field) return v def _get_modifier(self, field, modifier, *, default=False, view=None, modmap=None, vals=None): if view is None: view = self._view d = (modmap or view['modifiers'])[field].get(modifier, default) if isinstance(d, bool): return d if vals is None: vals = self._values stack = [] for it in reversed(d): if it == '!': stack.append(not stack.pop()) elif it == '&': e1 = stack.pop() e2 = stack.pop() stack.append(e1 and e2) elif it == '|': e1 = stack.pop() e2 = stack.pop() stack.append(e1 or e2) elif isinstance(it, tuple): if it == TRUE_LEAF: stack.append(True) continue elif it == FALSE_LEAF: stack.append(False) continue f, op, val = it # hack-ish handling of parent. modifiers f, n = re.subn(r'^parent\.', '', f, 1) if n: field_val = vals['•parent•'][f] else: field_val = vals[f] # apparent artefact of JS data representation: m2m field # values are assimilated to lists of ids? # FIXME: SSF should do that internally, but the requirement # of recursively post-processing to generate lists of # commands on save (e.g. m2m inside an o2m) means the # data model needs proper redesign # we're looking up the "current view" so bits might be # missing when processing o2ms in the parent (see # values_to_save:1450 or so) f_ = view['fields'].get(f, {'type': None}) if f_['type'] == 'many2many': # field value should be [(6, _, ids)], we want just the ids field_val = field_val[0][2] if field_val else [] stack.append(self._OPS[op](field_val, val)) else: raise ValueError("Unknown domain element %s" % [it]) [result] = stack return result _OPS = { '=': operator.eq, '==': operator.eq, '!=': operator.ne, '<': operator.lt, '<=': operator.le, '>=': operator.ge, '>': operator.gt, 'in': lambda a, b: (a in b) if isinstance(b, (tuple, list)) else (b in a), 'not in': lambda a, b: (a not in b) if isinstance(b, (tuple, list)) else (b not in a), } def _get_context(self, field): c = self._view['contexts'].get(field) if not c: return {} # see _getEvalContext # the context for a field's evals (of domain/context) is the composition of: # * the parent's values # * ??? element.context ??? # * the environment's context (?) # * a few magic values record_id = self._values.get('id') or False ctx = dict(self._values_to_save(all_fields=True)) ctx.update(self._env.context) ctx.update( id=record_id, active_id=record_id, active_ids=[record_id] if record_id else [], active_model=self._model._name, current_date=date.today().strftime("%Y-%m-%d"), ) return safe_eval(c, ctx, {'context': ctx}) def __setattr__(self, field, value): descr = self._view['fields'].get(field) assert descr is not None, "%s was not found in the view" % field assert descr['type'] not in ('many2many', 'one2many'), \ "Can't set an o2m or m2m field, manipulate the corresponding proxies" assert not self._get_modifier(field, 'readonly'), \ "can't write on readonly field {}".format(field) assert not self._get_modifier(field, 'invisible'), \ "can't write on invisible field {}".format(field) if descr['type'] == 'many2one': assert isinstance(value, BaseModel) and value._name == descr['relation'] # store just the id: that's the output of default_get & (more # or less) onchange. value = value.id self._values[field] = value self._perform_onchange([field]) # enables with Form(...) as f: f.a = 1; f.b = 2; f.c = 3 # q: how to get recordset? def __enter__(self): return self def __exit__(self, etype, _evalue, _etb): if not etype: self.save() def save(self): """ Saves the form, returns the created record if applicable * does not save ``readonly`` fields * does not save unmodified fields (during edition) — any assignment or onchange return marks the field as modified, even if set to its current value :raises AssertionError: if the form has any unfilled required field """ id_ = self._values.get('id') values = self._values_to_save() if id_: r = self._model.browse(id_) if values: r.write(values) else: r = self._model.create(values) self._values.update( record_to_values(self._view['fields'], r) ) self._changed.clear() self._model.env.flush_all() self._model.env.clear() # discard cache and pending recomputations return r def _values_to_save(self, all_fields=False): """ Validates values and returns only fields modified since load/save :param bool all_fields: if False (the default), checks for required fields and only save fields which are changed and not readonly """ view = self._view fields = self._view['fields'] record_values = self._values changed = self._changed return self._values_to_save_( record_values, fields, view, changed, all_fields ) def _values_to_save_( self, record_values, fields, view, changed, all_fields=False, modifiers_values=None, parent_link=None ): """ Validates & extracts values to save, recursively in order to handle o2ms properly :param dict record_values: values of the record to extract :param dict fields: fields_get result :param view: view tree :param set changed: set of fields which have been modified (since last save) :param bool all_fields: whether to ignore normal filtering and just return everything :param dict modifiers_values: defaults to ``record_values``, but o2ms need some additional massaging """ values = {} for f in fields: if f == 'id': continue get_modifier = functools.partial( self._get_modifier, f, view=view, vals=modifiers_values or record_values ) descr = fields[f] v = record_values[f] # note: maybe `invisible` should not skip `required` if model attribute if v is False and not (all_fields or f == parent_link or descr['type'] == 'boolean' or get_modifier('invisible') or get_modifier('column_invisible')): if get_modifier('required'): raise AssertionError("{} is a required field ({})".format(f, view['modifiers'][f])) # skip unmodified fields unless all_fields if not (all_fields or f in changed): continue if get_modifier('readonly'): node = _get_node(view, f) if not (all_fields or node.get('force_save')): continue if descr['type'] == 'one2many': subview = descr['edition_view'] fields_ = subview['fields'] oldvals = v v = [] for (c, rid, vs) in oldvals: if c == 1 and not vs: c, vs = 4, False elif c in (0, 1): vs = vs or {} missing = fields_.keys() - vs.keys() # FIXME: maybe do this during initial loading instead? if missing: Model = self._env[descr['relation']] if c == 0: vs.update(dict.fromkeys(missing, False)) vs.update( (k, _cleanup_from_default(fields_[k], v)) for k, v in Model.default_get(list(missing)).items() ) else: vs.update(record_to_values( {k: v for k, v in fields_.items() if k not in vs}, Model.browse(rid) )) vs = self._values_to_save_( vs, fields_, subview, vs._changed if isinstance(vs, UpdateDict) else vs.keys(), all_fields, modifiers_values={'id': False, **vs, '•parent•': record_values}, # related o2m don't have a relation_field parent_link=descr.get('relation_field'), ) v.append((c, rid, vs)) values[f] = v return values def _perform_onchange(self, fields, context=None): assert isinstance(fields, list) # marks any onchange source as changed self._changed.update(fields) # skip calling onchange() if there's no trigger on any of the changed # fields spec = self._view['onchange'] if fields and not any(spec[f] for f in fields): return record = self._model.browse(self._values.get('id')) if context is not None: record = record.with_context(**context) result = record.onchange(self._onchange_values(), fields, spec) self._model.env.flush_all() self._model.env.clear() # discard cache and pending recomputations if result.get('warning'): _logger.getChild('onchange').warning("%(title)s %(message)s" % result.get('warning')) values = result.get('value', {}) # mark onchange output as changed self._changed.update(values.keys() & self._view['fields'].keys()) self._values.update( (k, self._cleanup_onchange( self._view['fields'][k], v, self._values.get(k), )) for k, v in values.items() if k in self._view['fields'] ) return result def _onchange_values(self): return self._onchange_values_(self._view['fields'], self._values) def _onchange_values_(self, fields, record): """ Recursively cleanup o2m values for onchanges: * if an o2m command is a 1 (UPDATE) and there is nothing to update, send a 4 instead (LINK_TO) instead as that's what the webclient sends for unmodified rows * if an o2m command is a 1 (UPDATE) and only a subset of its fields have been modified, only send the modified ones This needs to be recursive as there are people who put invisible o2ms inside their o2ms. """ values = {} for k, v in record.items(): if fields[k]['type'] == 'one2many': subfields = fields[k]['edition_view']['fields'] it = values[k] = [] for (c, rid, vs) in v: if c == 1 and isinstance(vs, UpdateDict): vs = dict(vs.changed_items()) if c == 1 and not vs: it.append((4, rid, False)) elif c in (0, 1): it.append((c, rid, self._onchange_values_(subfields, vs))) else: it.append((c, rid, vs)) else: values[k] = v return values def _cleanup_onchange(self, descr, value, current): if descr['type'] == 'many2one': if not value: return False # out of onchange, m2o are name-gotten return value[0] elif descr['type'] == 'one2many': # ignore o2ms nested in o2ms if not descr['edition_view']: return [] if current is None: current = [] v = [] c = {t[1] for t in current if t[0] in (1, 2)} current_values = {c[1]: c[2] for c in current if c[0] == 1} # which view should this be??? subfields = descr['edition_view']['fields'] # TODO: simplistic, unlikely to work if e.g. there's a 5 inbetween other commands for command in value: if command[0] == 0: v.append((0, 0, { k: self._cleanup_onchange(subfields[k], v, None) for k, v in command[2].items() if k in subfields })) elif command[0] == 1: record_id = command[1] c.discard(record_id) stored = current_values.get(record_id) if stored is None: record = self._env[descr['relation']].browse(record_id) stored = UpdateDict(record_to_values(subfields, record)) updates = ( (k, self._cleanup_onchange(subfields[k], v, stored.get(k))) for k, v in command[2].items() if k in subfields ) for field, value in updates: # if there are values from the onchange which differ # from current values, update & mark field as changed if stored.get(field, value) != value: stored._changed.add(field) stored[field] = value v.append((1, record_id, stored)) elif command[0] == 2: c.discard(command[1]) v.append((2, command[1], False)) elif command[0] == 4: c.discard(command[1]) v.append((1, command[1], None)) elif command[0] == 5: v = [] # explicitly mark all non-relinked (or modified) records as deleted for id_ in c: v.append((2, id_, False)) return v elif descr['type'] == 'many2many': # onchange result is a bunch of commands, normalize to single 6 if current is None: ids = [] else: ids = list(current[0][2]) for command in value: if command[0] == 1: ids.append(command[1]) elif command[0] == 3: ids.remove(command[1]) elif command[0] == 4: ids.append(command[1]) elif command[0] == 5: del ids[:] elif command[0] == 6: ids[:] = command[2] else: raise ValueError( "Unsupported M2M command %d" % command[0]) return [(6, False, ids)] return value class O2MForm(Form): # noinspection PyMissingConstructor def __init__(self, proxy, index=None): m = proxy._model object.__setattr__(self, '_proxy', proxy) object.__setattr__(self, '_index', index) object.__setattr__(self, '_env', m.env) object.__setattr__(self, '_model', m) # copy so we don't risk breaking it too much (?) fvg = dict(proxy._descr['edition_view']) object.__setattr__(self, '_view', fvg) self._process_fvg(m, fvg) vals = dict.fromkeys(fvg['fields'], False) object.__setattr__(self, '_values', vals) object.__setattr__(self, '_changed', set()) if index is None: self._init_from_defaults(m) else: vals = proxy._records[index] self._values.update(vals) if hasattr(vals, '_changed'): self._changed.update(vals._changed) def _get_modifier(self, field, modifier, *, default=False, view=None, modmap=None, vals=None): if vals is None: vals = {**self._values, '•parent•': self._proxy._parent._values} return super()._get_modifier(field, modifier, default=default, view=view, modmap=modmap, vals=vals) def _onchange_values(self): values = super(O2MForm, self)._onchange_values() # computed o2m may not have a relation_field(?) descr = self._proxy._descr if 'relation_field' in descr: # note: should be fine because not recursive values[descr['relation_field']] = self._proxy._parent._onchange_values() return values def save(self): proxy = self._proxy commands = proxy._parent._values[proxy._field] values = self._values_to_save() if self._index is None: commands.append((0, 0, values)) else: index = proxy._command_index(self._index) (c, id_, vs) = commands[index] if c == 0: vs.update(values) elif c == 1: if vs is None: vs = UpdateDict() assert isinstance(vs, UpdateDict), type(vs) vs.update(values) commands[index] = (1, id_, vs) else: raise AssertionError("Expected command type 0 or 1, found %s" % c) # FIXME: should be called when performing on change => value needs to be serialised into parent every time? proxy._parent._perform_onchange([proxy._field], self._env.context) def _values_to_save(self, all_fields=False): """ Validates values and returns only fields modified since load/save """ values = UpdateDict(self._values) values._changed.update(self._changed) if all_fields: return values for f in self._view['fields']: if self._get_modifier(f, 'required') and not (self._get_modifier(f, 'column_invisible') or self._get_modifier(f, 'invisible')): assert self._values[f] is not False, "{} is a required field".format(f) return values class UpdateDict(dict): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._changed = set() if args and isinstance(args[0], UpdateDict): self._changed.update(args[0]._changed) def changed_items(self): return ( (k, v) for k, v in self.items() if k in self._changed ) def update(self, *args, **kw): super().update(*args, **kw) if args and isinstance(args[0], UpdateDict): self._changed.update(args[0]._changed) class X2MProxy(object): _parent = None _field = None def _assert_editable(self): assert not self._parent._get_modifier(self._field, 'readonly'),\ 'field %s is not editable' % self._field assert not self._parent._get_modifier(self._field, 'invisible'),\ 'field %s is not visible' % self._field class O2MProxy(X2MProxy): """ O2MProxy() """ def __init__(self, parent, field): self._parent = parent self._field = field # reify records to a list so they can be manipulated easily? self._records = [] model = self._model fields = self._descr['edition_view']['fields'] for (command, rid, values) in self._parent._values[self._field]: if command == 0: self._records.append(values) elif command == 1: if values is None: # read based on view info r = model.browse(rid) values = UpdateDict(record_to_values(fields, r)) self._records.append(values) elif command == 2: pass else: raise AssertionError("O2M proxy only supports commands 0, 1 and 2, found %s" % command) def __len__(self): return len(self._records) @property def _model(self): model = self._parent._env[self._descr['relation']] ctx = self._parent._get_context(self._field) if ctx: model = model.with_context(**ctx) return model @property def _descr(self): return self._parent._view['fields'][self._field] def _command_index(self, for_record): """ Takes a record index and finds the corresponding record index (skips all 2s, basically) :param int for_record: """ commands = self._parent._values[self._field] return next( cidx for ridx, cidx in enumerate( cidx for cidx, (c, _1, _2) in enumerate(commands) if c in (0, 1) ) if ridx == for_record ) def new(self): """ Returns a :class:`Form` for a new :class:`~odoo.fields.One2many` record, properly initialised. The form is created from the list view if editable, or the field's form view otherwise. :raises AssertionError: if the field is not editable """ self._assert_editable() return O2MForm(self) def edit(self, index): """ Returns a :class:`Form` to edit the pre-existing :class:`~odoo.fields.One2many` record. The form is created from the list view if editable, or the field's form view otherwise. :raises AssertionError: if the field is not editable """ self._assert_editable() return O2MForm(self, index) def remove(self, index): """ Removes the record at ``index`` from the parent form. :raises AssertionError: if the field is not editable """ self._assert_editable() # remove reified record from local list & either remove 0 from # commands list or replace 1 (update) by 2 (remove) cidx = self._command_index(index) commands = self._parent._values[self._field] (command, rid, _) = commands[cidx] if command == 0: # record not saved yet -> just remove the command del commands[cidx] elif command == 1: # record already saved, replace by 2 commands[cidx] = (2, rid, 0) else: raise AssertionError("Expected command 0 or 1, got %s" % commands[cidx]) # remove reified record del self._records[index] self._parent._perform_onchange([self._field]) class M2MProxy(X2MProxy, collections.abc.Sequence): """ M2MProxy() Behaves as a :class:`~collection.Sequence` of recordsets, can be indexed or sliced to get actual underlying recordsets. """ def __init__(self, parent, field): self._parent = parent self._field = field def __getitem__(self, it): p = self._parent model = p._view['fields'][self._field]['relation'] return p._env[model].browse(self._get_ids()[it]) def __len__(self): return len(self._get_ids()) def __iter__(self): return iter(self[:]) def __contains__(self, record): relation_ = self._parent._view['fields'][self._field]['relation'] assert isinstance(record, BaseModel)\ and record._name == relation_ return record.id in self._get_ids() def add(self, record): """ Adds ``record`` to the field, the record must already exist. The addition will only be finalized when the parent record is saved. """ self._assert_editable() parent = self._parent relation_ = parent._view['fields'][self._field]['relation'] assert isinstance(record, BaseModel) and record._name == relation_,\ "trying to assign a '{}' object to a '{}' field".format( record._name, relation_, ) self._get_ids().append(record.id) parent._perform_onchange([self._field]) def _get_ids(self): return self._parent._values[self._field][0][2] def remove(self, id=None, index=None): """ Removes a record at a certain index or with a provided id from the field. """ self._assert_editable() assert (id is None) ^ (index is None), \ "can remove by either id or index" if id is None: # remove by index del self._get_ids()[index] else: self._get_ids().remove(id) self._parent._perform_onchange([self._field]) def clear(self): """ Removes all existing records in the m2m """ self._assert_editable() self._get_ids()[:] = [] self._parent._perform_onchange([self._field]) def record_to_values(fields, record): r = {} # don't read the id explicitly, not sure why but if any of the "magic" hr # field is read alongside `id` then it blows up e.g. # james.read(['barcode']) works fine but james.read(['id', 'barcode']) # triggers an ACL error on barcode, likewise km_home_work or # emergency_contact or whatever. Since we always get the id anyway, just # remove it from the fields to read to_read = list(fields.keys() - {'id'}) if not to_read: return r for f, v in record.read(to_read)[0].items(): descr = fields[f] if descr['type'] == 'many2one': v = v and v[0] elif descr['type'] == 'many2many': v = [(6, 0, v or [])] elif descr['type'] == 'one2many': v = [(1, r, None) for r in v or []] elif descr['type'] == 'datetime' and isinstance(v, datetime): v = odoo.fields.Datetime.to_string(v) elif descr['type'] == 'date' and isinstance(v, date): v = odoo.fields.Date.to_string(v) r[f] = v return r def _cleanup_from_default(type_, value): if not value: if type_ == 'many2many': return [(6, False, [])] elif type_ == 'one2many': return [] elif type_ in ('integer', 'float'): return 0 return value if type_ == 'one2many': return [c for c in value if c[0] != 6] elif type_ == 'datetime' and isinstance(value, datetime): return odoo.fields.Datetime.to_string(value) elif type_ == 'date' and isinstance(value, date): return odoo.fields.Date.to_string(value) return value def _get_node(view, f, *arg): """ Find etree node for the field ``f`` in the view's arch """ return next(( n for n in view['tree'].iter('field') if n.get('name') == f ), *arg) def tagged(*tags): """A decorator to tag BaseCase objects. Tags are stored in a set that can be accessed from a 'test_tags' attribute. A tag prefixed by '-' will remove the tag e.g. to remove the 'standard' tag. By default, all Test classes from odoo.tests.common have a test_tags attribute that defaults to 'standard' and 'at_install'. When using class inheritance, the tags ARE inherited. """ include = {t for t in tags if not t.startswith('-')} exclude = {t[1:] for t in tags if t.startswith('-')} def tags_decorator(obj): obj.test_tags = (getattr(obj, 'test_tags', set()) | include) - exclude return obj return tags_decorator