Search on the blog

2012年3月3日土曜日

スライド区間に対するクエリのまとめ

問題設定

 スライド区間に対するクエリ処理の種類とそれに対するアルゴリズムをいくつか紹介します。まず、「スライド区間」の定義ですが、窓関数のようなものを想像してください。例えば、[2,5,10,-1,0,4,...]というデータに対してサイズ3の窓関数をスライドさせると、[2,5,10], [5,10,-1],[10,-1,0],.....と複数の連続する区間ができます。これらの区間に対して以下のようなクエリを考えます。
  1. 平均値/合計値
  2. 最小値/最大値
  3. 中央値/x番目に小さい値
 データサイズが小さい場合は愚直な方法で計算しても問題ありませんが、データサイズが大きい場合やオンライン処理で次々にデータが来る場合は高速な処理が求められます。天気予報システムや金融システムなどではこのような需要があると思います。例えば、ある時間幅内での最高気温・最低気温・平均気温を計算したり、リアルタイムに株価の高値・低値を計算するなどです。

 以下では、i番目のデータをdata[i]、データサイズをN、窓サイズをKと表記します。

平均値/合計値

 平均値/合計値の計算は簡単です。合計値が分かればそれをKで割ることで平均値が分かるので、以下では合計値に絞って議論します。単純な実装だとすべての窓に対して毎回合計値をフロムスクラッチから計算します。この場合、1クエリにつきO(K)、合計でO(NK)の計算量が必要になります。しかし、以下の関係を利用すると1クエリがO(1)で捌けます。



 1つ前の区間の和から一番古いデータを引いて、新しいデータを足したものが、現在の区間の和になるというわけです。文字列検索のRabin-Karp Algorithmで使用されるローリングハッシュと同様のやり方です。

最小値/最大値
 次に、最小値/最大値クエリを考えます。PKU2823 Sliding Windowがこの問題そのものなので、この問題を解いてみます。この場合も愚直な実装をすると、1クエリにつきO(K)の時間がかかってしまいます。高速に計算するためには、主に以下の方法が考えられます。
  • multisetを用いた方法
  • 両端キューを用いた方法
  • 平方分割によるバケット法
  • セグメント木
 平方分割とセグメント木はデータ値集合が有限個の場合のみ利用できます。この2つはより一般的なx番目に小さな値を取り出すクエリに利用できるので、次の節で説明します。ここではmultisetを用いた方法と両端キューを用いた方法を説明します。
 
 まず、multisetを用いた方法です。以下のように各区間内の要素をmultisetに格納しておくことで、区間の更新(古いデータの削除、新しいデータの挿入)とクエリ処理がO(lg K)で可能です。
const int SIZE = 1e6;
int data[SIZE+1];
int maximum[SIZE+1];
int minimum[SIZE+1];

int main() {
    int N, K;

    while (~scanf("%d %d", &N, &K)) {
        for (int i = 0; i < N; i++)
            scanf("%d", data+i);

        multiset<int> nums;
        int p = 0;
        multiset<int>::iterator itr;
        for (int i = 0; i < N; i++) {
            nums.insert(data[i]);

            if (i >= K)
                nums.erase(nums.find(data[i-K]));
            if (i >= K-1) {
                minimum[p] = *nums.begin();
                itr = nums.end();
                maximum[p] = *(--itr);
                ++p;
            }
        }

        for (int i = 0; i < p; i++)
            printf("%d ", minimum[i]);
        puts("");
        for (int i = 0; i < p; i++)
            printf("%d ", maximum[i]);
        puts("");
    }

    return 0;
}


この考え方は後で説明するバケット法に似ていると思います。予め決められた数のバケツを線形に並べて用意しておくのではなくて、木構造でバケツに値を入れていくようなイメージです。

 残念ながら、上記のO(lg K)の処理ではTime Limit Exceededになってしまいましたので、より高速なアルゴリズムが必要となります。両端キュー(deque)を利用すると、平均O(1)で1つのクエリを捌くことができます。両端キューとは、スタックとキューの両方の性質を持ったデータ構造で以下の処理がO(1)で可能です。

push_back()キューの最後尾にデータを挿入する
push_front()キューの先頭にデータを挿入する
pop_back()キューの最後尾のデータをポップする
pop_front()キューの先頭のデータをポップする

 それでは両端キューを利用したクエリ処理の方法について説明します。以下では最小値クエリについて説明します。データは、キューの最後尾に足していきます。ただし、現在追加したデータより大きな値を持つ値は無駄なのでポップしていきます。そして、キューの先頭に最小値があるような状態を保ちます。分かりにくいので例を使って説明します。K=4として、[1,3,5,2,4,6]に対する処理を考えます。キューにデータを詰めていって[1,3,5]の状態まで来たとします。次に2を最後尾にプッシュします。[1,3,5,2]となりますが、2は3,5より時系列で見て新しいデータなので3,5より窓区間内に残る期間が長いです。つまり、3,5の立場からすると、自分たちがポップされるまではずっと区間内に2がいることになります。よってこの時点で、3,5をキュー内に保持する意味はありません。ということで、3,5はポップして2をプッシュします。このような処理を繰り返していきます。以下のコードで無事ACです。(※ちなみに、クエリ処理とは関係ないですが、出力処理が重かったのでstringstreamに一旦バッファリングしてから一気に吐き出すという処理を行っています。この方法は時間制限が厳しく、出力が多いオンラインジャッジの問題には有効な技だと思います。)
const int SIZE = 1e6;
int data[SIZE+1];

int main() {
    int N, K;

    while (~scanf("%d %d", &N, &K)) {
        int *p = data;
        for (int i = 0; i < N; i++)
            scanf("%d", p++);

        // minimum values
        deque<int> nums;
        ostringstream ss;
        for (int i = 0; i < N; i++) {
            while (!nums.empty() && data[i] < nums.back())
                nums.pop_back();
            nums.push_back(data[i]);
            if (i >= K && nums.front() == data[i-K])
                nums.pop_front();
            if (i >= K-1)
                ss << nums.front() << " ";
        }
        cout << ss.str() << endl;

        // maximum values
        ss.str("");
        while (!nums.empty())
            nums.pop_back();
        for (int i = 0; i < N; i++) {
            while (!nums.empty() && data[i] > nums.back())
                nums.pop_back();
            nums.push_back(data[i]);
            if (i >= K && nums.front() == data[i-K])
                nums.pop_front();
            if (i >= K-1)
                ss << nums.front() << " ";
        }
        cout << ss.str() << endl;
    }

    return 0;
}

中央値/x番目に小さい値
 最後に中央値、より一般的にx番目に小さい値を取り出すクエリについて考えます。

 まず、平方分割を用いた方法です。平方分割の前に単純なバケット法を利用した場合のクエリ処理を説明します。例えば、データの取り得る値が1,2,3,...,8だった場合は、1,2,3,...8の8個のバケツを用意します。そして、それぞれのバケツにその値が窓内にいくつ表れるかを保持します。

 上図のように、窓が一つずれると古いデータの値のバケツの値を1減らし、新しく来たデータのバケツの値を1増やします。バケツを走査することで、x番目に小さい値が分かります。上図ではK=4としていますが、K=100でも、K=100000でも8つのバケツを走査するだけでクエリが捌けます。一般にデータの値域のサイズをrとすると、この方法では一つのクエリにつきO(r)の計算が必要になります。

 rが大きくなると単純なバケット法では十分な速度が期待できません。よって何らかの工夫が必要となります。データの値域が1,2,3,....1000000のときを考えます。このとき1000000個のバケツを1000個ずつのグループに分け、各グループにおいて自分のグループ内のバケツにデータがいくつ入っているかを記憶させておけばO(sqrt(r))で効率的なバケツの走査ができます。一般にr個のバケツを分ける分け方は、半分にするとか、10個ずつにするとか、いろいろ考えられますが、sqrt(r)個ずつに分けるのが最適で、そのような分け方を平方分割と言います。

 バケツの走査の仕方には、平方分割よりも効率的なものがあります。二分探索のようなやり方で区間を半分にして、
  • 前半の区間にx個以上要素があれば、前半の区間を探す
  • そうでなければ、後半の区間から(x-前半の区間のバケツ内に含まれるデータの数)番目に小さいデータを探す
というような方法が可能です。これはセグメント木で実装することができ、各クエリがO(ln r)で捌けます。

 SRM 310 FloatingMedianがスライド区間の中央値を求める問題ですので、それを平方分割で解いたプログラムと、セグメント木で解いたプログラムを載せておきます。


const int MOD = 65536;
const int SIZE = 256;

int bucket[SIZE][SIZE];
int nums[SIZE];

class FloatingMedian {
    void inc(int x) {
        ++bucket[x/SIZE][x%SIZE];
        ++nums[x/SIZE];
    }

    void dec(int x) {
        --bucket[x/SIZE][x%SIZE];
        --nums[x/SIZE];
    }

    int get(int x) {
        int p = 0;
        for (; p < SIZE; p++) {
            if (nums[p] < x)
                x -= nums[p];
            else
                break;
        }

        REP(i, SIZE) {
            x -= bucket[p][i];
            if (x <= 0)
                return p*SIZE+i;
        }
        return -1;
    }

public:
    long long sumOfMedians(int seed, int mul, int add, int N, int K) {
        memset(bucket, 0, sizeof(bucket));
        memset(nums, 0, sizeof(nums));

        vector<long long>t;
        t.push_back(seed);
        FOR (i, 1, N)
            t.push_back((t[i-1]*mul+add)%MOD);

        long long ret = 0;
        REP(i, N) {
            inc(t[i]);
            if (i >= K)
                dec(t[i-K]);
            if (i >= K-1)
                ret += get((K+1)/2);
        }

        return ret;
    }
};

const int MOD = 65536;
int seg[1<<18];

class FloatingMedian {
    void inc(int k, int left, int right, int x) {
        if (left == right)
            seg[k]++;
        else {
            int mid = (left+right)/2;
            if (x <= mid)
                inc(2*k+1, left, mid, x);
            else
                inc(2*k+2, mid+1, right, x);
            seg[k] = seg[2*k+1] + seg[2*k+2];
        }
    }

    void dec(int k, int left, int right, int x) {
        if (left == right)
            seg[k]--;
        else {
            int mid = (left+right)/2;
            if (x <= mid)
                dec(2*k+1, left, mid, x);
            else
                dec(2*k+2, mid+1, right, x);
            seg[k] = seg[2*k+1] + seg[2*k+2];
        }
    }

    int get(int k, int left, int right, int x) {
        if (left == right)
            return left;
        else {
            int mid = (left+right)/2;
            if (seg[2*k+1] >= x)
                return get(2*k+1, left, mid, x);
            else
                return get(2*k+2, mid+1, right, x-seg[2*k+1]);
        }
    }


public:
    long long sumOfMedians(int seed, int mul, int add, int N, int K) {
        memset(seg, 0, sizeof(seg));

        vector<long long>t;
        t.push_back(seed);
        FOR (i, 1, N)
            t.push_back((t[i-1]*mul+add)%MOD);

        long long ret = 0;
        REP(i, N) {
            inc(0, 0, 65535, t[i]);
            if (i >= K)
                dec(0, 0, 65535, t[i-K]);

            if (i >= K-1)
                ret += get(0, 0, 65535, (K+1)/2);
        }
        return ret;
    }
};


0 件のコメント:

コメントを投稿