Solving Conformant Planning Problems with the \(K_0\) Compilation

For the purpose of this tutorial we will look at the seminal paper by Palacios and Geffner “Compiling Uncertainty Away in Conformant Planning Problems with Bounded Width”, and see how we can implement the \(K_0\) compilation of conformant into classical problems.

Definition (Translation \(K_0\)). For a conformant planning problem \(P=\langle F,I,O,G\rangle\), the translation \(K_0(P) = \langle F', I', O', G'\rangle\) is classical planning problem with - \(F' = \{ KL, K\neg L\, \mid\, L \in F \}\) - \(I' = \{ KL \, \mid \, L \text{ is a unit clause in } I\}\) - \(G' = \{ KL \, \mid \, L \in G\}\) - \(O' = O\) but with each precondition \(L\) for \(a \in O\) replaced by \(KL\), and each conditional effect \(a: C \rightarrow L\) replaced by \(a: KC \rightarrow KL\) and \(a: \neg K \neg C \rightarrow \neg K \neg L\).

Loading a blocks instance

[1]:
import tarski.evaluators
from tarski.grounding.lp_grounding import LPGroundingStrategy
from tarski.theories import Theory
from tarski.syntax import *
from tarski.io import PDDLReader

reader = PDDLReader(raise_on_error=True)
reader.parse_domain('./benchmarks/blocksworld.pddl')
problem = reader.parse_instance('./benchmarks/probBLOCKS-4-2.pddl')
lang = problem.language

We will need to ground actions and state variables:

[2]:
grounder = LPGroundingStrategy(problem)
actions =  grounder.ground_actions()
lpvariables = grounder.ground_state_variables()
---------------------------------------------------------------------------
CommandNotFoundError                      Traceback (most recent call last)
<ipython-input-2-a10384ae9b78> in <module>
      1 grounder = LPGroundingStrategy(problem)
----> 2 actions =  grounder.ground_actions()
      3 lpvariables = grounder.ground_state_variables()

~/checkouts/readthedocs.org/user_builds/tarski/checkouts/devel/src/tarski/grounding/lp_grounding.py in ground_actions(self)
     50             raise RuntimeError('Cannot retrieve set of ground actions from LPGroundingStrategy '
     51                                'configured with ground_actions=False')
---> 52         model = self._solve_lp()
     53         # This will take care of the case where there is not ground action from some schema
     54         groundings = dict()

~/checkouts/readthedocs.org/user_builds/tarski/checkouts/devel/src/tarski/grounding/lp_grounding.py in _solve_lp(self)
     66         if self.model is None:
     67             lp, tr = create_reachability_lp(self.problem, self.do_ground_actions, self.include_variable_inequalities)
---> 68             model_filename, theory_filename = run_clingo(lp)
     69             self.model = parse_model(filename=model_filename, symbol_mapping=tr)
     70

~/checkouts/readthedocs.org/user_builds/tarski/checkouts/devel/src/tarski/reachability/clingo_wrapper.py in run_clingo(lp)
     32     gringo_command = get_gringo_command()
     33     if gringo_command is None:
---> 34         raise CommandNotFoundError("gringo")
     35
     36     with tempfile.NamedTemporaryFile(mode='w+t', delete=False) as f:

CommandNotFoundError: Necessary command "gringo" could not be found

Constructing the fluent set \(F'\)

To construct the set of fluents \(F'\) we need to create a fresh first-order language to accomodate the new symbols

[3]:
kp_lang = tarski.language("K0(P)", theories=[Theory.EQUALITY])

In this tutorial we will explore a compact encoding enabled by the modeling capabilities of Tarski. We start creating a sort (type) for the literals of each of the atoms in the original conformant problem \(P\). We call this type P-literals

[4]:
p_lits = kp_lang.sort("P-literals", kp_lang.Object)

Now we enumerate the atoms in \(F\) and we keep matching lists P_lits and reified_P_lits that will facilitate later the compilation process

[5]:
P_lits = []
reified_P_lits = []
for atom_index, atoms in lpvariables.enumerate():
    f_atom = Atom(atoms.symbol, atoms.binding)
    fp_lit = f_atom
    fn_lit = neg(f_atom)
    P_lits += [fp_lit, fn_lit]
    reified_P_lits += [kp_lang.constant(str(fp_lit), p_lits), kp_lang.constant(str(fn_lit), p_lits)]
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-5-5843cd5eb48a> in <module>
      1 P_lits = []
      2 reified_P_lits = []
----> 3 for atom_index, atoms in lpvariables.enumerate():
      4     f_atom = Atom(atoms.symbol, atoms.binding)
      5     fp_lit = f_atom

NameError: name 'lpvariables' is not defined

so we end up, for every pair of literals \(L\), \(\neg L\), with objects\(L\)” and “\(\neg l\)”. To obtain \(F'\) we need to add a new predicate

[6]:
K = kp_lang.predicate("K", p_lits)

from which we can easily define the \(KL\), \(K \neg L\), \(\neg K L\) and \(\neg K \neg L\) literals

[7]:
K(reified_P_lits[0])
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-7-2ed7a4fc5dcc> in <module>
----> 1 K(reified_P_lits[0])

IndexError: list index out of range
[8]:
neg(K(reified_P_lits[0]))
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-8-8ef69c244eba> in <module>
----> 1 neg(K(reified_P_lits[0]))

IndexError: list index out of range
[9]:
K(reified_P_lits[1])
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-9-434ec89c63c4> in <module>
----> 1 K(reified_P_lits[1])

IndexError: list index out of range
[10]:
neg(K(reified_P_lits[1]))
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-10-e1ac5ca3c3b4> in <module>
----> 1 neg(K(reified_P_lits[1]))

IndexError: list index out of range

Excursion: Inspecting grounded initial states

We go on a little excursion to show how we can enumerate the literals that are true in the initial state of a grounded STRIPS problem. First, we need to select what algorithm we want to use to evaluate expressions

[11]:
reader.problem.init.evaluator = tarski.evaluators.simple.evaluate

The simple evaluator is a straightforward depth-first traversal that processes each of the nodes of the tree formed by the syntactic elements of the expression to be evaluated, returning either a expression or a value.

Once the evaluator algorithm is selected, we can use the random access iterator to evaluate expressions as we do below

[12]:
I = []
for atom_index, atoms in lpvariables.enumerate():
    atom = Atom(atoms.symbol, atoms.binding)
    if reader.problem.init[atom]:
        I += [atom]
    else:
        I += [neg(atom)]
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-12-576f2fda3369> in <module>
      1 I = []
----> 2 for atom_index, atoms in lpvariables.enumerate():
      3     atom = Atom(atoms.symbol, atoms.binding)
      4     if reader.problem.init[atom]:
      5         I += [atom]

NameError: name 'lpvariables' is not defined

to obtain the list of literals true in the initial state.

[13]:
for p in I:
    print(p)

Constructing the initial state \(I'\)

For the purpose of this tutorial, we will consider a quite difficult conformant problem, where the only information we have initially is that the robot hand is empty.

In order to construct that initial state, we need to access the relevant predicate and object symbols

[14]:
handempty = reader.problem.language.get('handempty')
holding = reader.problem.language.get('holding')
a, b, c, d = reader.problem.language.get('a', 'b', 'c', 'd')

so we can write directly the literals corresponding to the specification above.

[15]:
I = [handempty(), neg(holding(a)), neg(holding(b)), neg(holding(c)), neg(holding(d))]

We obtain \(I'\) by first computing the set of literals of the original conformant problem \(P\) that are unit clauses

[16]:
unit_clauses = set()
K_I = []
for l0 in I:
    p_l0 = kp_lang.get(str(l0))
    K_I += [K(p_l0)]
    unit_clauses.add(symref(l0))
---------------------------------------------------------------------------
UndefinedElement                          Traceback (most recent call last)
<ipython-input-16-e9f8259d81e7> in <module>
      2 K_I = []
      3 for l0 in I:
----> 4     p_l0 = kp_lang.get(str(l0))
      5     K_I += [K(p_l0)]
      6     unit_clauses.add(symref(l0))

~/checkouts/readthedocs.org/user_builds/tarski/checkouts/devel/src/tarski/fol.py in get(self, first, *args)
    425
    426         if not args:  # The user asked for one single element, return it directly
--> 427             return access_next(first)
    428
    429         # Otherwise, the user asked for multiple elements, return them as a tuple for easier unpacking

~/checkouts/readthedocs.org/user_builds/tarski/checkouts/devel/src/tarski/fol.py in access_next(elem)
    422                 return self._global_index[elem]
    423             except KeyError:
--> 424                 raise err.UndefinedElement(elem) from None
    425
    426         if not args:  # The user asked for one single element, return it directly

UndefinedElement: Undefined element "handempty()" (error msg: None)

We use the set unit_clauses to then determine which \(\neg K L\) and \(\neg K \neg L\) fluents we need to have in our initial state as well

[17]:
for atom_index, atoms in lpvariables.enumerate():
    lp = Atom(atoms.symbol, atoms.binding)
    p_lp = kp_lang.get(str(lp))
    ln = neg(lp)
    p_ln = kp_lang.get(str(ln))
    if lp not in unit_clauses:
        K_I += [neg(K(p_lp))]
    if ln not in unit_clauses:
        K_I += [neg(K(p_ln))]
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-17-db82797714fa> in <module>
----> 1 for atom_index, atoms in lpvariables.enumerate():
      2     lp = Atom(atoms.symbol, atoms.binding)
      3     p_lp = kp_lang.get(str(lp))
      4     ln = neg(lp)
      5     p_ln = kp_lang.get(str(ln))

NameError: name 'lpvariables' is not defined
[18]:
for k_p in K_I:
    print(k_p)

Constructing the goal state \(G'\)

Constructing the goal state proceeds very much as for initial states, but way simpler, as we do not need to complete with the logical implications of literals as we do for initial states

[19]:
on, ontable, clear = reader.problem.language.get('on', 'ontable', 'clear')
G = [clear(a), on(a, b), on(b, c), on(c, d), ontable(d)]
[20]:
K_G = []
for l_G in G:
    p_l_G = kp_lang.get(str(l_G))
    K_G += [K(p_l_G)]
---------------------------------------------------------------------------
UndefinedElement                          Traceback (most recent call last)
<ipython-input-20-95c965795dcb> in <module>
      1 K_G = []
      2 for l_G in G:
----> 3     p_l_G = kp_lang.get(str(l_G))
      4     K_G += [K(p_l_G)]

~/checkouts/readthedocs.org/user_builds/tarski/checkouts/devel/src/tarski/fol.py in get(self, first, *args)
    425
    426         if not args:  # The user asked for one single element, return it directly
--> 427             return access_next(first)
    428
    429         # Otherwise, the user asked for multiple elements, return them as a tuple for easier unpacking

~/checkouts/readthedocs.org/user_builds/tarski/checkouts/devel/src/tarski/fol.py in access_next(elem)
    422                 return self._global_index[elem]
    423             except KeyError:
--> 424                 raise err.UndefinedElement(elem) from None
    425
    426         if not args:  # The user asked for one single element, return it directly

UndefinedElement: Undefined element "clear(a)" (error msg: None)

Constructing the action set \(O'\)

Constructing the set of operators is a bit more involved. We start importing a helper function, ground_schema, that instantiates action schemas as per the given variable binding.

[21]:
from tarski.syntax.transform.action_grounding import ground_schema

We use the same interface discussed above, to get access to the set of bindings identified by the grounding procedure, and just call the helper function on the schemata and the bindings.

[22]:
O = []
for name, ops in actions.items():
    print('Action schema', name, 'got', len(ops), 'ground actions')
    schema = reader.problem.get_action(name)
    print(list(schema.parameters.vars()))
    for op in ops:
        ground_action = ground_schema(schema, op)
        O += [ground_action]
        print(ground_action)
        print(ground_action.precondition, ground_action.effects)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-22-9f39f8128f09> in <module>
      1 O = []
----> 2 for name, ops in actions.items():
      3     print('Action schema', name, 'got', len(ops), 'ground actions')
      4     schema = reader.problem.get_action(name)
      5     print(list(schema.parameters.vars()))

NameError: name 'actions' is not defined

Creating the precondition and effect formulas of the operators in \(O'\) requires 1) creating copies of a ground operator, 2) substitute formulas (i.e. wherever it says \(L\) it needs to say \(KL\)) and 3) create new add and del effects for the new operators.

[23]:
import copy
from tarski.syntax.transform.substitutions import substitute_expression
from tarski.fstrips import Action, AddEffect, DelEffect

We start creating a dictionary where we map literals \(L\) to their corresponding \(K\)-literal

[24]:
subst = {}
for p, Kp in zip(P_lits, [K(p) for p in reified_P_lits]):
    subst[symref(p)] = Kp

note the role played by the two lists P_lits and reified_P_lits we created above.

We note that any effect, the construction of the \(KC\) and \(KL\) formulas is always the same, so we introduce a function that does the translation

[25]:
def make_K_condition_and_effect(subst, eff, K_prec):
    KC = [K_prec]
    if not isinstance(eff.condition, Tautology):
        KC += [substitute_expression(eff.condition, subst)]
    KC = land(*KC)
    KL = subst[symref(eff.atom)]
    KnL = subst[symref(neg(eff.atom))]

    return KC, KL, KnL

We finally put together the substitution rule for the \(P\)-literals, and construct the new set of operators as follows

[26]:
K_O = []
for op in O:
    K_prec = substitute_expression(op.precondition, subst)
    K_effs = []
    for eff in op.effects:
        KC, KL, KnL = make_K_condition_and_effect(subst, eff, K_prec)
        if isinstance(eff, AddEffect):
            K_effs += [AddEffect(KC, KL)]
            K_effs += [DelEffect(KC, KnL)]
        elif isinstance(eff, DelEffect):
            K_effs += [DelEffect(KC, KL)]
            K_effs += [AddEffect(KC, KnL)]
        else:
            raise RuntimeError("Effect type not supported by compilation!")
    K_O += [Action(kp_lang, op.name, VariableBinding(), K_prec, K_effs)]

[ ]: