Vagrant、Ansibleで作ったKafka環境で動作確認をしてみる。
以下の操作はすべて
プロジェクトのrootディレクトリから実行するものとします。
まずvagrantで仮想マシンを起動する。
$ cd vagrant
$ vagrant up
続いてansibleでplaybookを適用する。
$ cd ansible
$ ansible-playbook site.yml
まずはrest proxyと疎通確認を行う。
$ curl 192.168.33.11:8082/topics
["_schemas"]
rest proxy経由でKafkaにデータを送ってみる。
$ curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
--data '{"records":[{"value":{"name": "testUser"}}]}' \
"192.168.33.11:8082/topics/jsontest"
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}
consumerを作成し、先ほど作成したjsontestトピックをサブスクライブする。
$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" -H "Accept: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \
http://192.168.33.11:8082/consumers/my_json_consumer
{"instance_id":"my_consumer_instance","base_uri":"http://192.168.33.11:8082/consumers/my_json_consumer/instances/my_consumer_instance"}
$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["jsontest"]}' \
http://192.168.33.11:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
もう1つメッセージを送ってみる。
$ curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
--data '{"records":[{"value":{"name": "testUserXXX"}}]}' \
"192.168.33.11:8082/topics/jsontest"
{"offsets":[{"partition":0,"offset":1,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}
メッセージをコンシュームする。
$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://192.168.33.11:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
[{"key":null,"value":{"name":"testUser"},"partition":0,"offset":0,"topic":"jsontest"},{"key":null,"value":{"name":"testUserXXX"},"partition":0,"offset":1,"topic":"jsontest"}]
データの送受信を確認できたので、テスト用のコンシューマーを削除する。
$ curl -X DELETE -H "Accept: application/vnd.kafka.v2+json" \
http://192.168.33.11:8082/consumers/my_json_consumer/instances/my_consumer_instance