実際にそのようなサンプルを書いて動作確認してみました。
package com.kenjih.multiple_pipe_line; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Main { private static final Logger LOG = LoggerFactory.getLogger(Main.class); private static class AddPrefixDoFn extends DoFn<String, String> { private final String prefix; private AddPrefixDoFn(String prefix) { this.prefix = prefix; } @ProcessElement public void processElement(ProcessContext context) { String element = context.element(); context.output(prefix + element); } } private String createTopicPath(String projectId, String topicName) { return String.format("projects/%s/topics/%s", projectId, topicName); } public void run(DataflowPipelineOptions options) { Pipeline pipeline = Pipeline.create(options); String projectId = options.getProject(); // a data pipeline pipeline .apply("read-1", PubsubIO.readStrings().fromTopic(createTopicPath(projectId, "kh-test-in-1"))) .apply("transform-1", ParDo.of(new AddPrefixDoFn("transform-1: "))) .apply("write-1", PubsubIO.writeStrings().to(createTopicPath(projectId, "kh-test-out-1"))); // another data pipeline pipeline .apply("read-2", PubsubIO.readStrings().fromTopic(createTopicPath(projectId, "kh-test-in-2"))) .apply("transform-2", ParDo.of(new AddPrefixDoFn("transform-2: "))) .apply("write-2", PubsubIO.writeStrings().to(createTopicPath(projectId, "kh-test-out-2"))); // yet another data pipeline pipeline .apply("read-3", PubsubIO.readStrings().fromTopic(createTopicPath(projectId, "kh-test-in-3"))) .apply("transform-3", ParDo.of(new AddPrefixDoFn("transform-3: "))) .apply("write-3", PubsubIO.writeStrings().to(createTopicPath(projectId, "kh-test-out-3"))); pipeline.run(); } public static void main(String[] args) { LOG.info("begin the application"); DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(DataflowPipelineOptions.class); options.setStreaming(true); new Main().run(options); LOG.info("end the application"); } }
GCPのコンソール画面で見ると以下のようになっています。
BigQueryIOのDynamicDestinationsのようなものがPubSubIOにはなくて困っていましたが、上のように独立したパイプラインを作ることができるので、それを応用することでDynamicDestinations的なことができそうです(パイプライン生成時に静的に決めないといけないのでDynamicとは違いますが、複数の可変的な汎用パイプラインをうまく扱えそうという意味です)。
0 件のコメント:
コメントを投稿