Page List

Search on the blog

2018年4月16日月曜日

Apache Beam: 複数のパイプラインをあげる

 Apache Beamで構築するグラフはDAGじゃないといけないという縛りがありますが、DAGであれば連結なグラフじゃなくてもいいらしいです。(これに気づかなくて数ヶ月悩んでました。)
 
 実際にそのようなサンプルを書いて動作確認してみました。

package com.kenjih.multiple_pipe_line;

import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Main {

  private static final Logger LOG = LoggerFactory.getLogger(Main.class);

  private static class AddPrefixDoFn extends DoFn<String, String> {

    private final String prefix;

    private AddPrefixDoFn(String prefix) {
      this.prefix = prefix;
    }

    @ProcessElement
    public void processElement(ProcessContext context) {
      String element = context.element();
      context.output(prefix + element);
    }
  }

  private String createTopicPath(String projectId, String topicName) {
    return String.format("projects/%s/topics/%s",
        projectId,
        topicName);
  }

  public void run(DataflowPipelineOptions options) {
    Pipeline pipeline = Pipeline.create(options);
    String projectId = options.getProject();

    // a data pipeline
    pipeline
        .apply("read-1",
            PubsubIO.readStrings().fromTopic(createTopicPath(projectId, "kh-test-in-1")))
        .apply("transform-1",
            ParDo.of(new AddPrefixDoFn("transform-1: ")))
        .apply("write-1",
            PubsubIO.writeStrings().to(createTopicPath(projectId, "kh-test-out-1")));

    // another data pipeline
    pipeline
        .apply("read-2",
            PubsubIO.readStrings().fromTopic(createTopicPath(projectId, "kh-test-in-2")))
        .apply("transform-2",
            ParDo.of(new AddPrefixDoFn("transform-2: ")))
        .apply("write-2",
            PubsubIO.writeStrings().to(createTopicPath(projectId, "kh-test-out-2")));

    // yet another data pipeline
    pipeline
        .apply("read-3",
            PubsubIO.readStrings().fromTopic(createTopicPath(projectId, "kh-test-in-3")))
        .apply("transform-3",
            ParDo.of(new AddPrefixDoFn("transform-3: ")))
        .apply("write-3",
            PubsubIO.writeStrings().to(createTopicPath(projectId, "kh-test-out-3")));

    pipeline.run();
  }

  public static void main(String[] args) {
    LOG.info("begin the application");

    DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
        .as(DataflowPipelineOptions.class);
    options.setStreaming(true);

    new Main().run(options);

    LOG.info("end the application");
  }

}

GCPのコンソール画面で見ると以下のようになっています。
BigQueryIOのDynamicDestinationsのようなものがPubSubIOにはなくて困っていましたが、上のように独立したパイプラインを作ることができるので、それを応用することでDynamicDestinations的なことができそうです(パイプライン生成時に静的に決めないといけないのでDynamicとは違いますが、複数の可変的な汎用パイプラインをうまく扱えそうという意味です)。

0 件のコメント:

コメントを投稿