-
Notifications
You must be signed in to change notification settings - Fork 135
Produce updates to kafka #1443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Produce updates to kafka #1443
Conversation
| @@ -1,5 +1,5 @@ | |||
| __version__ = "0.32.7" | |||
| __db_version__ = 7 | |||
| __db_version__ = 8 | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tab before operator
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_pool | ||
| FOR EACH ROW | ||
| EXECUTE PROCEDURE tf_kafka_produce_event(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| CREATE TRIGGER trigger_kafka_ip_net_pool | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_pool | ||
| FOR EACH ROW |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| CREATE TRIGGER trigger_kafka_ip_net_pool | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_pool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| EXECUTE PROCEDURE tf_kafka_produce_event(); | ||
| CREATE TRIGGER trigger_kafka_ip_net_pool | ||
| AFTER INSERT OR UPDATE OR DELETE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| -- Triggers that write to kafka_produce_event | ||
| CREATE TRIGGER trigger_kafka_ip_net_plan | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_plan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| -- Triggers that write to kafka_produce_event | ||
| CREATE TRIGGER trigger_kafka_ip_net_plan | ||
| AFTER INSERT OR UPDATE OR DELETE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| ELSIF OLD IS DISTINCT FROM NEW THEN | ||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb); | ||
| END IF; | ||
| RETURN NEW; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb); | ||
| ELSIF OLD IS DISTINCT FROM NEW THEN | ||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb); | ||
| END IF; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_vrf | ||
| FOR EACH ROW | ||
| EXECUTE PROCEDURE tf_kafka_produce_event(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| CREATE TRIGGER trigger_kafka_ip_net_vrf | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_vrf | ||
| FOR EACH ROW |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| CREATE TRIGGER trigger_kafka_ip_net_vrf | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_vrf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| EXECUTE PROCEDURE tf_kafka_produce_event(); | ||
| CREATE TRIGGER trigger_kafka_ip_net_vrf | ||
| AFTER INSERT OR UPDATE OR DELETE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_plan | ||
| FOR EACH ROW | ||
| EXECUTE PROCEDURE tf_kafka_produce_event(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| CREATE TRIGGER trigger_kafka_ip_net_plan | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_plan | ||
| FOR EACH ROW |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| -- Triggers that write to kafka_produce_event | ||
| CREATE TRIGGER trigger_kafka_ip_net_plan | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_plan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| -- disable these triggers at startup depending on configuration. | ||
| -- | ||
| CREATE TABLE IF NOT EXISTS kafka_produce_event ( | ||
| id SERIAL PRIMARY KEY, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| ELSIF OLD IS DISTINCT FROM NEW THEN | ||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb); | ||
| END IF; | ||
| RETURN NEW; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb); | ||
| ELSIF OLD IS DISTINCT FROM NEW THEN | ||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb); | ||
| END IF; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| IF TG_OP = 'DELETE' THEN | ||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb); | ||
| ELSIF OLD IS DISTINCT FROM NEW THEN | ||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
line too long (123 > 79 characters)
| BEGIN | ||
| IF TG_OP = 'DELETE' THEN | ||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb); | ||
| ELSIF OLD IS DISTINCT FROM NEW THEN |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
1b5f450 to
b2e086d
Compare
| CREATE OR REPLACE FUNCTION tf_kafka_produce_event() RETURNS trigger AS $$ | ||
| BEGIN | ||
| IF TG_OP = 'DELETE' THEN | ||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
line too long (123 > 79 characters)
| CREATE OR REPLACE FUNCTION tf_kafka_produce_event() RETURNS trigger AS $$ | ||
| BEGIN | ||
| IF TG_OP = 'DELETE' THEN |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation contains tabs
| conn.rollback() | ||
|
|
||
| except psycopg2.InterfaceError as e: | ||
| LOG.error("Database interface error in kafka_producer loop: %s. Reconnecting to database.", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long (106 > 79 characters)
|
|
||
| # mark processed for ids that were attempted | ||
| if ids: | ||
| cur.execute("UPDATE kafka_produce_event SET processed = TRUE WHERE id = ANY(%s);", (ids,)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long (106 > 79 characters)
| LOG.info("Attempting to recreate Kafka producer after flush failure") | ||
| producer = _ensure_producer(cfg) | ||
| if producer is None: | ||
| LOG.error("Unable to recreate Kafka producer; exiting kafka_producer process") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long (98 > 79 characters)
| send_failed = True | ||
| break | ||
| except Exception as e: | ||
| # if a single event fails to prepare, log and skip it for now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long (81 > 79 characters)
| ids.append(event_id) | ||
| else: | ||
| # sending failed (producer might be disconnected); mark to recreate producer | ||
| LOG.error("Failed to send event id %s to topic %s; will attempt to recreate producer", event_id, topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long (127 > 79 characters)
| if sent: | ||
| ids.append(event_id) | ||
| else: | ||
| # sending failed (producer might be disconnected); mark to recreate producer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long (100 > 79 characters)
| topic = _table_to_topic(topic_prefix, table) | ||
| message = {'event_type': etype, 'payload': payload} | ||
| # send with retries and exponential backoff | ||
| sent = _send_with_backoff(producer, topic, message, payload.get('id')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long (90 > 79 characters)
| producer = _ensure_producer(cfg) | ||
| if producer is None: | ||
| # _ensure_producer returns None if kafka-python-ng isn't installed or config invalid | ||
| LOG.error("Kafka producer not available (missing dependency or bad config), exiting kafka_producer process") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long (116 > 79 characters)
No description provided.