オープンソース化された並列化テンプレートクラスライブラリ「Intel Threading Building Blocks」入門 4ページ

要素数が分からないデータ群に対する反復処理の並列実装(whileループの並列化)

 先で解説したparallel_forを使用した並列処理は、あらかじめ処理するデータの個数が決まっている場合にのみ利用できる。一方、次に紹介するparallel_whileは入力ストリームに対する処理など、処理したいデータの個数が事前に分からない場合でも利用できる並列アルゴリズムだ。

 たとえば次のリスト4は、ファイルから1行ずつテキストを読み出し、正規表現で指定した特定のキーワードを含む行の行番号とその内容を出力するというプログラムだ。この場合、データの個数(ここでは行数)を調べるにはストリームを最後まで読まなければならず、データが非常に多い場合は現実的ではない。parallel_whileはこのような処理を並列化するのに有用である。

リスト4 ストリーム入力に対する処理の実装例

#include <iostream>
#include <fstream>
#include <string>
#inclue "boost/regex.hpp"

bool SearchText(ifstream& is, string keyword) {
    boost::regex rex;
    boost::match_results<const char*> results;
    string buf;
    int lineNumber = 1;

    rex = keyword;
    while( is && getline(is, buf) ) {
        if( boost::regex_search(buf.c_str(), results, rex) ) {
            cout << lineNumber << ": " << buf << endl;
        }
        lineNumber++;
    }
    cout << "read " << lineNumber - 1 << " lines." << endl;
    return true;
}

 リスト4を、parallel_whileを使用してTBBで並列実装したものが次のリスト5だ。

リスト5 リスト4の処理をTBBで並列実装した例

#include <iostream>
#include <fstream>
#include <string>

class LinedString {
public:
    string contents;
    int lineNumber;
};

class LinedStream {
    istream& is;
    int num;

public:
    bool pop_if_present( LinedString& lstr ) {
        if( getline(is, lstr.contents) ) {
            num = num++;
            lstr.lineNumber = num;
        } else {
            return false;
        }
        return true;
    }

    LinedStream(istream& inputStream) :
        is(inputStream),
        num(1)
    {}
};

class Searcher {
    string keyword;
public:
    void operator()( LinedString lstr ) const;
    Searcher(string key): keyword(key) {}
    typedef LinedString argument_type;
};

void Searcher::operator()( LinedString lstr ) const {
    boost::regex rex;
    boost::match_results<const char*> results;

    rex = keyword;
    if( boost::regex_search(lstr.contents.c_str(), results, rex) ) {
        cout << lstr.lineNumber << ": " << lstr.contents << endl;
    }
}

bool ParallelSearchText(istream& is, string keyword) {
    tbb::parallel_while<Searcher> w;
    LinedStream stream(is);
    Searcher body(keyword);
    w.run( stream, body );
    return true;
}

 parallel_forなどと同様、parallel_whileでも関数オブジェクトとして実行すべき処理を渡すが、parallel_forと異なるのはparallel_whileは関数ではなくテンプレートクラスになっている点だ。テンプレート引数としては実行すべき処理を実装したクラスを取る。このparallel_while型のオブジェクトを生成し、引数に入力ストリームと関数オブジェクトを与えて「run」メンバー関数を実行することで、並列処理が行われる。

 リスト5では、並列実行すべき処理を実装したクラスとして「Searcher」クラスを、入力ストリームとして「LinedStream」クラスを用意している。

bool ParallelSearchText(istream& is, string keyword) {
    tbb::parallel_while<Searcher> w;
    LinedStream stream(is);
    Searcher body(keyword);
    w.run( stream, body );
    return true;
}

 parallel_whileに与える入力ストリームは、ストリームからデータを取り出す「pop_if_present」というメンバー関数を備えている必要がある。pop_if_present関数は引数に「argument_type」(後述)で指定した型のオブジェクト(の参照)を取り、次に処理すべきデータをこのオブジェクトに格納してtrueを返す。また、もし次に処理すべきデータがない場合はfalseを返せば良い。

 なお、リスト5の例では「行番号」と「その行に含まれる文字列」という2つのデータを格納するクラス「LinedString」を用意し、これをargument_typeとして使用している。

class LinedString {
public:
    string contents;
    int lineNumber;
};

class LinedStream {
    istream& is;
    int num;

public:
    bool pop_if_present( LinedString& lstr ) {
        if( getline(is, lstr.contents) ) {
            num = num++;
            lstr.lineNumber = num;
        } else {
            return false;
        }
        return true;
    }

    LinedStream(istream& inputStream) :
        is(inputStream),
        num(1)
    {}
};

 並列実行すべき処理を実装するクラスには、operator()に加えてその引数の型を「argument_type」として定義しておく必要がある。

 リスト5の場合、入力ストリームからデータをLinedString型で取り出すので、argument_typeをLinedStreamとして定義している。また、operator()内では取り出したLinedStreamオブジェクトに対する処理を記述している。

class Searcher {
    string keyword;
public:
    void operator()( LinedString lstr ) const;
    Searcher(string key): keyword(key) {}
    typedef LinedString argument_type; // operator()の引数として利用する型を「argument_type」として定義しておく
};

void Searcher::operator()( LinedString lstr ) const {
    boost::regex rex;
    boost::match_results<const char*> results;

    rex = keyword;
    if( boost::regex_search(lstr.contents.c_str(), results, rex) ) {
        cout << lstr.lineNumber << ": " << lstr.contents << endl;
    }
}

 なお、parallel_whileを使用した並列化の場合、ループ内で実行する処理の内容によってはあまりパフォーマンス向上が見られない場合がある。たとえば今回の例は1行分のテキストに対する正規表現検索という比較的軽めの処理であるため、次の表2のように並列化の効果は若干にとどまっている。

表2 10万行のテキスト(Webサーバーのアクセスログファイル)に対し、「MSIE [78]\..; Windows NT 5.1;」という正規表現にマッチする行を検索するのにかかった時間
関数 処理時間
SearchText(非並列版) 1389ミリ秒
ParallelSearchText(並列版) 1294ミリ秒