-
Notifications
You must be signed in to change notification settings - Fork 927
http over amqp_0_9 proxy #2567
Description
I've been playing around with Benthos a couple of days now and I must say that I love the product. I'm now trying to create a configuration which I can use to sort of proxy HTTP over amqp_0_9. But this seems a bit too challenging given the lack of expierence I have with the product.
Topology:
Client-> Benthos -> RabbitMQ -> Benthos -> Webserver
The idea is that the client is performing a reguest at an http input. That http_input should store the request in an amqp_0_9 with an unique id attached to it. That is then picked up by another Benthos config which will send the request with an http_client output to the webserver. This config is storing the response in another amqp_0_9 queue along with the id. This should then somehow be picked up by the Benthos config which is listening as a webserver and should sync_response the webserver's response from the amqp_0_9 queue which contains the response matching the id added at the input. The config should obviously wait for the response to arrive for some time.
These are my simplified configs, Please ignore the OPTIONS verb, I sort of use that in some decision making.
# inner config
input:
http_server:
allowed_verbs: [ GET, POST, OPTIONS ]
path: /xxx
sync_response:
metadata_headers:
include_patterns:
- .*
output:
broker:
pattern: fan_out
outputs:
- stdout:
codec: "delim:\r\n\r\n"
- switch:
cases:
- check: meta("http_server_verb") == "OPTIONS"
output:
resource: options
- output:
stdout: {}
output_resources:
- label: options
amqp_0_9:
urls:
- amqp://user:pass@server.local:5672/
exchange: "" # No default (required)
exchange_declare:
enabled: false
type: direct
durable: true
key: test
# edge config
input:
amqp_0_9:
urls:
- amqp://user:pass@server.local:5672/
queue: "test" # No default (required)
queue_declare:
enabled: false
durable: true
auto_delete: false
pipeline:
processors:
- http:
url: "https://webserver:8080/some_service"
tls:
enabled: true
skip_cert_verify: true
verb: OPTIONS
extract_headers:
include_patterns:
- .*
successful_on:
- 200
output:
amqp_0_9:
urls:
- amqp://user:pass@server.local:5672/
exchange: "" # No default (required)
exchange_declare:
enabled: false
type: direct
durable: true
key: test_response
The current config is adding the request to the test queue and will add the response to the test_response queue. It does not check if the response matches the request. This means that requests and responses could get messed up if multiple requests are made to the inner input. Also there is no sync response with the response from the webserver.
Could someone help me to implement the logic for injecting and matching the id and syncing the response from the test_response queue in the inner config? I would already be happy with some pointers in the right direction.