Page List

Search on the blog

2018年4月18日水曜日

Apache Beam: 正常データ、異常データを別々に処理

 Beam 1.xのときは、side outputを使って異常系のデータをDLQに別途出力するということができた。Beam 2.xになってからside outputがなくなっていたので、どうやるかを試してみた。以下のようにPTransformの中でタグをつけて出力すれば良さそう。

 今回はDLQとしてキューじゃないけどGCSを使ってみた。UnboundedなデータをBoundedな場所に格納することになるのでややこしそうだけど、windowを使えば簡単に実現できる。

サンプルコード

package com.kenjih.sample.side_output;

import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * GCS-based Dead letter queue example
 *
 * @see <a href="https://stackoverflow.com/questions/45173668/gcp-dataflow-2-0-pubsub-to-gcs/45256314#45256314">GCP Dataflow 2.0 PubSub to GCS</a>
 * @see <a href="https://cloud.google.com/blog/big-data/2016/01/handling-invalid-inputs-in-dataflow">Handling Invalid Inputs in Dataflow</a>
 */
public class Main {

  private static final Logger LOG = LoggerFactory.getLogger(Main.class);

  private static class ParseToIntDoFn extends DoFn<String, String> {

    private final TupleTag<String> successTag;
    private final TupleTag<String> invalidTag;

    public ParseToIntDoFn(TupleTag<String> successTag, TupleTag<String> invalidTag) {
      this.successTag = successTag;
      this.invalidTag = invalidTag;
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
      String s = c.element();
      try {
        Integer i = Integer.parseInt(s);
        c.output(successTag, i.toString());
      } catch (Exception e) {
        LOG.error("cannot convert {} to an integer", s);
        c.output(invalidTag, s);
      }
    }
  }

  private static String createTopicPath(String projectId, String topicName) {
    return String.format("projects/%s/topics/%s", projectId, topicName);
  }

  public static void main(String[] args) {
    LOG.info("begin the application");

    DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
        .as(DataflowPipelineOptions.class);
    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);
    String projectId = options.getProject();

    final TupleTag<String> successTag = new TupleTag<String>() {};
    final TupleTag<String> invalidTag = new TupleTag<String>() {};

    PCollection<String> input =
        pipeline.apply("read",
            PubsubIO.readStrings().fromTopic(createTopicPath(projectId, "kh-test-in-1")));

    PCollectionTuple outputTuple =
        input.apply("parse", ParDo.of(new ParseToIntDoFn(successTag, invalidTag))
            .withOutputTags(successTag, TupleTagList.of(invalidTag)));

    // write successful data to PubSub
    PCollection<String> success = outputTuple.get(successTag);
    success
        .apply("success-write", PubsubIO.writeStrings()
            .to(createTopicPath(projectId, "kh-test-out-1")));

    // write invalid data to GCS
    PCollection<String> invalid = outputTuple.get(invalidTag);
    invalid
        .apply("windowing", Window.into(FixedWindows.of(Duration.standardMinutes(1))))
        .apply("invalid-write", TextIO.write()
            .to("gs://kh-test/dead-letter/")
            .withNumShards(3)
            .withWindowedWrites());

    pipeline.run();

    LOG.info("end the application");
  }


}

Dataflow Runnerでの実行した結果

0 件のコメント:

コメントを投稿