以下の操作はすべてプロジェクトのrootディレクトリから実行するものとします。
まずvagrantで仮想マシンを起動する。
$ cd vagrant $ vagrant up
続いてansibleでplaybookを適用する。
$ cd ansible $ ansible-playbook site.yml
まずはrest proxyと疎通確認を行う。
$ curl 192.168.33.11:8082/topics ["_schemas"]
rest proxy経由でKafkaにデータを送ってみる。
$ curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \ --data '{"records":[{"value":{"name": "testUser"}}]}' \ "192.168.33.11:8082/topics/jsontest" {"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}
consumerを作成し、先ほど作成したjsontestトピックをサブスクライブする。
$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" -H "Accept: application/vnd.kafka.v2+json" \ --data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \ http://192.168.33.11:8082/consumers/my_json_consumer {"instance_id":"my_consumer_instance","base_uri":"http://192.168.33.11:8082/consumers/my_json_consumer/instances/my_consumer_instance"} $ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["jsontest"]}' \ http://192.168.33.11:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
もう1つメッセージを送ってみる。
$ curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \ --data '{"records":[{"value":{"name": "testUserXXX"}}]}' \ "192.168.33.11:8082/topics/jsontest" {"offsets":[{"partition":0,"offset":1,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}
メッセージをコンシュームする。
$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \ http://192.168.33.11:8082/consumers/my_json_consumer/instances/my_consumer_instance/records [{"key":null,"value":{"name":"testUser"},"partition":0,"offset":0,"topic":"jsontest"},{"key":null,"value":{"name":"testUserXXX"},"partition":0,"offset":1,"topic":"jsontest"}]
データの送受信を確認できたので、テスト用のコンシューマーを削除する。
$ curl -X DELETE -H "Accept: application/vnd.kafka.v2+json" \ http://192.168.33.11:8082/consumers/my_json_consumer/instances/my_consumer_instance
0 件のコメント:
コメントを投稿