aboutsummaryrefslogtreecommitdiff
path: root/imago/engine/monteCarlo.py
blob: 0587cfc8f718984fe7a2469db571a3a2533c9c10 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""Monte Carlo Tree Search module."""

import sys
import random

from imago.data.enums import Player
from imago.engine.decisionAlgorithm import DecisionAlgorithm

class MCTS(DecisionAlgorithm):
    """Monte Carlo tree."""

    def __init__(self, move):
        self.root = MCTSNode(move, None)

    def forceNextMove(self, coords):
        """Selects given move as next move."""
        if coords == "pass":
            raise NotImplementedError("Pass not implemented for MCTS algorithm.")
        for node in self.root.children:
            if (node.move.getRow() == coords[0]
            and node.move.getCol() == coords[1]):
                self.root = node
                return
        self.root = self.root.expansionForCoords(coords)

    def pickMove(self):
        """
        Performs an exploratory cycle, updates the root to the best node and returns its
        corresponding move."""
        #NOTE: with only one selection-expansion the match is
        #      completely random
        for _ in range(5):
            self.root.selection().expansion().simulation(10, 20)
        selectedNode = self._selectBestNextNode()
        return selectedNode.move.coords

    def _selectBestNextNode(self):
        """Returns best ucb node available for the current player."""

        # Assumes at least one expansion has occured
        bestUCB = -sys.maxsize - 1
        bestNode = None
        for node in self.root.children:
            ucb = node.ucbForPlayer()
            if ucb > bestUCB:
                bestUCB = ucb
                bestNode = node

        return bestNode

    def clearBoard(self):
        """Empties move history."""
        while self.root.parent is not None:
            self.root = self.root.parent

class MCTSNode:
    """Monte Carlo tree node."""

    def __init__(self, move, parent):
        self.visits = 0
        self.score = 0
        self.move = move
        self.parent = parent
        self.children = set()
        self.unexploredVertices = move.getPlayableVertices()

    def ucbForPlayer(self):
        """
        Returns Upper Confidence Bound of node changing the symbol if the move is for the
        wite player."""
        return self._ucbForSpecificPlayer(self.move.getPlayer())

    def _ucbForSpecificPlayer(self, player):
        """
        Returns Upper Confidence Bound of node from the perspective of the given
        player."""
        if player == Player.WHITE:
            return self._ucb() * -1
        return self._ucb()

    def _ucb(self):
        """Returns Upper Confidence Bound of node"""
        # UCB = meanVictories + 1/visits
        if self.visits == 0:
            return 0
        mean = self.score / self.visits
        adjust = 1/self.visits
        return mean + adjust

    def selection(self):
        """Select the most promising node with unexplored children."""
        bestNode = self._selectionRec(self)
        return bestNode

    def _selectionRec(self, bestNode):
        """Searches this node and its children for the node with the best UCB value for
        the current player."""

        #TODO: En cada nivel debe escoger como mejor el que sea mejor para quien le toque
        #      jugar, ya que esa es la partida esperada. Esto es lo que hacía con
        #      ucbForPlayer()

        #      ¿Cada nuevo nodo expandido tiene una puntuación inicial? ¿Se le hacen
        #      exploraciones?

        #      Backpropagation: ¿Es correcto acumular la puntuación de cada nodo hacia
        #      arriba? ¿Sería mejor acumular hacia arriba el valor del mejor nodo? ¿O del
        #      juego perfecto? Estos dos últimos valores no son el mismo.

        player = self.move.getPlayer()

        # Check if node has unexplored children and better UCB than previously explored
        if len(self.unexploredVertices) > 0:
            if self._ucbForSpecificPlayer(
                    player) > bestNode._ucbForSpecificPlayer(player):
                bestNode = self

        # Recursively search children for better UCB
        for child in self.children:
            bestChildNode = child._selectionRec(bestNode)
            if bestChildNode._ucbForSpecificPlayer(
                    player) > bestNode._ucbForSpecificPlayer(player):
                bestNode = bestChildNode

        return bestNode

    def expansion(self):
        """Pick an unexplored vertex from this node and add it as a new MCTSNode."""
        newVertex = random.choice(list(self.unexploredVertices))
        return self.expansionForCoords(newVertex)

    def expansionForCoords(self, coords):
        """
        Adds a move for the given coordinates as a new node to the children of this
        node."""
        newMove = self.move.addMove(coords[0], coords[1])
        newNode = MCTSNode(newMove, self)
        self.children.add(newNode)
        self.unexploredVertices.remove((coords[0], coords[1]))
        return newNode

    def simulation(self, nMatches, scoreDiffHeur):
        """Play random matches to accumulate reward information on the node."""
        scoreAcc = 0
        for _ in range(nMatches):
            result = self._randomMatch(scoreDiffHeur)
            self.visits += 1
            scoreDiff = result[0]-result[1]
            if scoreDiff != 0:
                scoreAcc += scoreDiff / abs(scoreDiff)
        # Backup
        node = self
        while node is not None:
            node.score += scoreAcc
            node.visits += nMatches
            node = node.parent

    def _randomMatch(self, scoreDiffHeur):
        """Play a random match and return the resulting score."""
        #IMPORTANT: the score heuristic doesn't work for the first move of the game, since
        #the black player holds all except for one vertex!
        currentMove = self.move
        score = currentMove.board.score()
        while currentMove.getGameLength() < 5 or abs(score[0] - score[1]) < scoreDiffHeur:
            if currentMove.isPass and currentMove.previousMove.isPass:
                return score
            sensibleMoves = currentMove.getSensibleVertices()
            if len(sensibleMoves) == 0:
                currentMove = currentMove.addPass()
            else:
                selectedMove = random.choice(list(sensibleMoves))
                currentMove = currentMove.addMoveByCoords(selectedMove)
            score = currentMove.board.score()
            print("Current move: %s" % (str(currentMove)))
            print("Current move game length: ", currentMove.getGameLength())
            print("Score of the board: %d, %d (%d)"
                    % (score[0],
                        score[1],
                        score[0]-score[1])
                )
            currentMove.printBoard()
        return score

    def _printBoardInfo(self):
        """Prints the visits and score for debugging purposes."""
        print("Visits: %d" % self.visits)
        print("Score: %d" % self.score)