今回はDLQとしてキューじゃないけどGCSを使ってみた。UnboundedなデータをBoundedな場所に格納することになるのでややこしそうだけど、windowを使えば簡単に実現できる。
サンプルコード
package com.kenjih.sample.side_output; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * GCS-based Dead letter queue example * * @see <a href="https://stackoverflow.com/questions/45173668/gcp-dataflow-2-0-pubsub-to-gcs/45256314#45256314">GCP Dataflow 2.0 PubSub to GCS</a> * @see <a href="https://cloud.google.com/blog/big-data/2016/01/handling-invalid-inputs-in-dataflow">Handling Invalid Inputs in Dataflow</a> */ public class Main { private static final Logger LOG = LoggerFactory.getLogger(Main.class); private static class ParseToIntDoFn extends DoFn<String, String> { private final TupleTag<String> successTag; private final TupleTag<String> invalidTag; public ParseToIntDoFn(TupleTag<String> successTag, TupleTag<String> invalidTag) { this.successTag = successTag; this.invalidTag = invalidTag; } @ProcessElement public void processElement(ProcessContext c) { String s = c.element(); try { Integer i = Integer.parseInt(s); c.output(successTag, i.toString()); } catch (Exception e) { LOG.error("cannot convert {} to an integer", s); c.output(invalidTag, s); } } } private static String createTopicPath(String projectId, String topicName) { return String.format("projects/%s/topics/%s", projectId, topicName); } public static void main(String[] args) { LOG.info("begin the application"); DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(DataflowPipelineOptions.class); options.setStreaming(true); Pipeline pipeline = Pipeline.create(options); String projectId = options.getProject(); final TupleTag<String> successTag = new TupleTag<String>() {}; final TupleTag<String> invalidTag = new TupleTag<String>() {}; PCollection<String> input = pipeline.apply("read", PubsubIO.readStrings().fromTopic(createTopicPath(projectId, "kh-test-in-1"))); PCollectionTuple outputTuple = input.apply("parse", ParDo.of(new ParseToIntDoFn(successTag, invalidTag)) .withOutputTags(successTag, TupleTagList.of(invalidTag))); // write successful data to PubSub PCollection<String> success = outputTuple.get(successTag); success .apply("success-write", PubsubIO.writeStrings() .to(createTopicPath(projectId, "kh-test-out-1"))); // write invalid data to GCS PCollection<String> invalid = outputTuple.get(invalidTag); invalid .apply("windowing", Window.into(FixedWindows.of(Duration.standardMinutes(1)))) .apply("invalid-write", TextIO.write() .to("gs://kh-test/dead-letter/") .withNumShards(3) .withWindowedWrites()); pipeline.run(); LOG.info("end the application"); } }
Dataflow Runnerでの実行した結果
0 件のコメント:
コメントを投稿