A - Exploring Another Space Editorial by broad

暫定30位解法

最終提出はこちらです。初めて解説を書きます。いろいろと調べながら理論的に考えて暫定30位でした。間違ってたらご指摘ください。

概要

ベイズ推定を行う。 ある入口を選んだ時、出口に繋がってる確率の表を求め、出口の系列を予測する。つまり、

\[ ll[Index_{入口}][Index_{出口}]= P_{Index_{入口}}(Index_{出口}|計測結果) \]

を求めたい。 このままでは求めることができないのでベイズの定理によって、式変形する。以下、\({Index_{入口}}\)は省略する。

\[ P(Index_{出口}|計測結果)=\frac{p(計測結果|Index_{出口})p(Index_{出口})}{ p(計測結果)}\]

ここで右辺について考える。

\[p(計測結果|Index_{出口})=\prod_ap(a_{yx}|Index_{出口})= \prod_{a} \sigma(x=a_{yx},mean=T,std=S)\]

ただし、\(TはIndex_{出口}からyxだけ移動した座標の設定温度\)\(a_{yx}\)Index_{入口} y xを実行し、得られた測定温度を表す。

\[p(Index_{出口})=\frac{1}{N}\]

入口と出口が等確率で紐づいているとする。定数(最終的な式で分母分子で割り算すると消える)なので以降無視できる。

\[p(計測結果)=\sum p(計測結果|Index_{出口})p(Index_{出口}) \]

対数尤度を取らないとアンダーフローするので、尤度を対数とって保存しておく。確率に戻す時は定数倍(同じ数を尤度に足す)してから\(p(計測結果)\)で割ることで\(p(計測結果|Index_{出口})\)が求まる。

参考 https://qiita.com/rindo/items/fe4c32eddc3ed880433b

アルゴリズム詳細

配置フェーズ

中央の大きなピラミッドと、そのピラミッドのど真ん中の高さ、上下左右の四箇所のピラミッドの高さと幅をoptunaでパラメータ調整。きつかった。いいPCほしい。 どうやら、Si=1-20、21-28、29-30の3グループで適切なパラメタの式が変わる模様。たぶんもっといい配置がある。

温度計測フェーズ

A前半(ざっくり計測)

\(Index_{入口}\)の選択

0から順次選ぶ。

②yxの選択

選んだ現在自信( \(P_{Index_{入口}}(Index_{出口}|計測結果)\))のある出口番号の設定温度が出来るだけバラけるようにする。

②ー1理想(計算量的にボツ)

確率変数XYを、

X=\(Index_{入口}\)から\(Index_{出口}\)に紐づいている(現在の自信)

Y= yxを選択して計測した時に得られる温度

とおいて相互情報量I(X;Y)(計測によって得られる情報量)を最大にするようなyxを選びたい。相互情報量は次のように言い換えることができる。

\[I(X;Y) =H(X)-H(X| Y) =H(Y)-H(Y| X)\]

2項目\(H(X)-H(X| Y)\)は、今のエントロピーから更新後のエントロピーを引いたもの(得られる情報量)とみなせるが、計算量的に計算してられない。

3項目を計算することを考える。H(Y| X) は正規分布なのでほぼ定数(0-1000への丸め誤差は除く)なので、H(Y)を最大にするようなyxを選択すればいい。

\[y,x=argmax_{yx} H(Y)\]

\[H(Y)=\sum_{t}-p(t)log_2p(t)\]

\[p(a_{yx})=\sum_{Index_{出口}}p(a_{yx}|Index_{出口})p(Index_{出口})\]

ここで\(p(Index_{出口})=p(Index_{出口}|計測結果)\)である。(あってる?)

が、O(10^4)のループ内で適切なyxを求める計算にO(L*L*1000*N)かかるので全然計算量が足りない。(泣。。一日使って考えたのに。。)

②ー2現実(最終解法)

今最も自信がある上位3つの出口(本当は全部やりたいが計算量。。)について、 KLダイバージェンス(2つの確率分布の差異を示す指標)の期待値が最大なものを選択。 (上と同じ値を計算したかったが未証明)

\[{y,x}=argmax_{yx} \sum_i\sum_j p(Index_{出口i})p(Index_{出口j})KL(\sigma(mean=T_{iyx},std=S)||\sigma(mean=T_{jyx},std=S))\]

③計測、尤度update

選択したIndex_{入口} y xをJudgeに与える。返ってきた\(a_{yx}\)をベイズ推定の更新式に入れてllの表(log likelihood)を更新

④終了判断

出口の確率の最大値が0.8を超えたら次の入口へ。越えなければもう一回同じ入口を①で選ぶ。

B後半(出力系列の考慮)

⓪出力系列生成

自信\(P_{Index_{入口}}(Index_{出口}|計測結果)\)が最大のものを出口が重複しないように貪欲に選ぶ。

①index(入口の番号)の選択

出力系列の中で自信\(P_{Index_{入口}}(Index_{出口}|計測結果)\)が最小となる入口を選ぶ。

②yxの選択

A前半と同じ

③計測

A前半と同じ

④終了判断

全ての系列内の自信が0.9以上の時終了

回答フェーズ

⓪出力系列生成

自信\(P_{Index_{入口}}(Index_{出口}|計測結果)\)が最大のものを出口が重複しないように貪欲に選ぶ。

①回答を出力して終了する。

総括

他にも、温度設定をエントロピーを基にして設定できないか?とか、他の入口で計測した結果を別の入口にも考慮できないか?など考えてました。 かなり計算量に苦労して思った通り実装ができなかったのですが、「A②理想」において計算量を落としてきちんと計算する方法を見つけられず妥協したのが悔しい点です。

(参考)optunaの使い方

最後に、スレッド上限付きのAHCで使えるoptunaのコードがなかったので作りました。なぜかtimeoutの処理でプロセスをkillできないのですが、timeoutを考慮しないのであれば動きます。(立ち上がりの時間が含まれてしまうのでそもそも考慮できないが。)

以下参考にしました。ありがとうございます。

AHCでoptunaを使う

subprocessの上限つき実行

subprocess公式ドキュメント

import optuna
import subprocess
import json
import time
CWD="./AtCoder/othercontests/AHC/AHC022/tools"
subprocess.run("g++ -Wall  -Wextra -I /home/AtCoder/ac-library/include -O2 ../AHC022_cmd.cpp",cwd=CWD,shell=True)
subprocess.run("cargo build --release --bin tester",cwd=CWD,shell=True)

def objective(trial):
    S=Si*Si
    pw1=trial.suggest_int('pw1', 0, 100)
    h1=trial.suggest_int('h1', 0, 1000)
    pw2=trial.suggest_int('pw2', 0, 100)
    h2=trial.suggest_int('h2', 0, 1000)
    h00=trial.suggest_int('h00', 0, 1000)

    proc_pool = {}
    isrunning ={}
    proc_num = 8

    # print("S",S)
    command=f"./target/release/tester ./a.out  {pw1} {pw2} {h1} {h2} {h00}"
    
    N_seed=100
    score=0
    for seed in range(N_seed):
        #waiting
        while len(isrunning) >= proc_num:
            ended=[]
            for procno in isrunning:
                if proc_pool[procno]["popen"].poll() is not None:
                    ended.append(procno)
                if time.time()-isrunning[procno]['stime']> 10:
                    #print("TIMEOUT ERROR:",proc_pool[procno]['command'])
                    proc_pool[procno]["popen"].kill()#killできてないっぽいので、そのまま実行してしまう。。
                    ended.append(procno)
            for procno in ended:
                isrunning.pop(procno)
            time.sleep(0.01)
        #ADD task
        dataset=f"ins/{S}/{seed:04}.txt"
        command2=f"{command} <  {dataset}"

        isrunning[seed]={"stime":time.time()}
        proc_pool[seed]={"popen":subprocess.Popen(command2,cwd=CWD,shell=True,stdout=subprocess.DEVNULL,stderr=subprocess.PIPE,text=True),
              "command":command2,
            }

    #結果まとめ
    count=0
    for i in proc_pool:
        proc =proc_pool[i]["popen"]
        ret=""
        try:
            ret =proc.communicate(timeout=5)#処理は終わっているのでtimeoutしないはず。
            ret= json.loads(ret[1])
            score+=ret['score']
            if(ret['score']<1000):
                print("{} {}".format(Si,i))
            count+=1
        except json.JSONDecodeError as e:
            print("JSON ERROR")
            score+=0
            print(ret)
        except Exception as e:
            print(ret)
            score+=0
            print(e)
            print("ERROR:",proc_pool[procno]['command'])
    
    return score/count

Si_list=[1,5,10,15,20,25,30]
for Si in Si_list:
    study = optuna.create_study(
        storage="sqlite:///optuna.sqlite3",
        study_name=f"S-{Si}",
        load_if_exists=True,
        direction="maximize",
    )

    study.optimize(objective, n_trials=200)

posted:
last update: