Skip to content

Conversation

@erikoqvist
Copy link
Contributor

No description provided.

@@ -1,5 +1,5 @@
__version__ = "0.32.7"
__db_version__ = 7
__db_version__ = 8

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tab before operator

AFTER INSERT OR UPDATE OR DELETE
ON ip_net_pool
FOR EACH ROW
EXECUTE PROCEDURE tf_kafka_produce_event();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

CREATE TRIGGER trigger_kafka_ip_net_pool
AFTER INSERT OR UPDATE OR DELETE
ON ip_net_pool
FOR EACH ROW

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

CREATE TRIGGER trigger_kafka_ip_net_pool
AFTER INSERT OR UPDATE OR DELETE
ON ip_net_pool

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

EXECUTE PROCEDURE tf_kafka_produce_event();
CREATE TRIGGER trigger_kafka_ip_net_pool
AFTER INSERT OR UPDATE OR DELETE

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

-- Triggers that write to kafka_produce_event
CREATE TRIGGER trigger_kafka_ip_net_plan
AFTER INSERT OR UPDATE OR DELETE
ON ip_net_plan

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

-- Triggers that write to kafka_produce_event
CREATE TRIGGER trigger_kafka_ip_net_plan
AFTER INSERT OR UPDATE OR DELETE

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

ELSIF OLD IS DISTINCT FROM NEW THEN
INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb);
END IF;
RETURN NEW;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb);
ELSIF OLD IS DISTINCT FROM NEW THEN
INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb);
END IF;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

AFTER INSERT OR UPDATE OR DELETE
ON ip_net_vrf
FOR EACH ROW
EXECUTE PROCEDURE tf_kafka_produce_event();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

CREATE TRIGGER trigger_kafka_ip_net_vrf
AFTER INSERT OR UPDATE OR DELETE
ON ip_net_vrf
FOR EACH ROW

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

CREATE TRIGGER trigger_kafka_ip_net_vrf
AFTER INSERT OR UPDATE OR DELETE
ON ip_net_vrf

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

EXECUTE PROCEDURE tf_kafka_produce_event();
CREATE TRIGGER trigger_kafka_ip_net_vrf
AFTER INSERT OR UPDATE OR DELETE

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

AFTER INSERT OR UPDATE OR DELETE
ON ip_net_plan
FOR EACH ROW
EXECUTE PROCEDURE tf_kafka_produce_event();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

CREATE TRIGGER trigger_kafka_ip_net_plan
AFTER INSERT OR UPDATE OR DELETE
ON ip_net_plan
FOR EACH ROW

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

-- Triggers that write to kafka_produce_event
CREATE TRIGGER trigger_kafka_ip_net_plan
AFTER INSERT OR UPDATE OR DELETE
ON ip_net_plan

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

-- disable these triggers at startup depending on configuration.
--
CREATE TABLE IF NOT EXISTS kafka_produce_event (
id SERIAL PRIMARY KEY,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

ELSIF OLD IS DISTINCT FROM NEW THEN
INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb);
END IF;
RETURN NEW;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb);
ELSIF OLD IS DISTINCT FROM NEW THEN
INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb);
END IF;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

IF TG_OP = 'DELETE' THEN
INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb);
ELSIF OLD IS DISTINCT FROM NEW THEN
INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs
line too long (123 > 79 characters)

BEGIN
IF TG_OP = 'DELETE' THEN
INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb);
ELSIF OLD IS DISTINCT FROM NEW THEN

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

@erikoqvist erikoqvist force-pushed the produce_updates_to_kafka branch from 1b5f450 to b2e086d Compare January 14, 2026 16:24
CREATE OR REPLACE FUNCTION tf_kafka_produce_event() RETURNS trigger AS $$
BEGIN
IF TG_OP = 'DELETE' THEN
INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs
line too long (123 > 79 characters)

CREATE OR REPLACE FUNCTION tf_kafka_produce_event() RETURNS trigger AS $$
BEGIN
IF TG_OP = 'DELETE' THEN

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation contains tabs

conn.rollback()

except psycopg2.InterfaceError as e:
LOG.error("Database interface error in kafka_producer loop: %s. Reconnecting to database.", e)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (106 > 79 characters)


# mark processed for ids that were attempted
if ids:
cur.execute("UPDATE kafka_produce_event SET processed = TRUE WHERE id = ANY(%s);", (ids,))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (106 > 79 characters)

LOG.info("Attempting to recreate Kafka producer after flush failure")
producer = _ensure_producer(cfg)
if producer is None:
LOG.error("Unable to recreate Kafka producer; exiting kafka_producer process")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (98 > 79 characters)

send_failed = True
break
except Exception as e:
# if a single event fails to prepare, log and skip it for now

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (81 > 79 characters)

ids.append(event_id)
else:
# sending failed (producer might be disconnected); mark to recreate producer
LOG.error("Failed to send event id %s to topic %s; will attempt to recreate producer", event_id, topic)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (127 > 79 characters)

if sent:
ids.append(event_id)
else:
# sending failed (producer might be disconnected); mark to recreate producer

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (100 > 79 characters)

topic = _table_to_topic(topic_prefix, table)
message = {'event_type': etype, 'payload': payload}
# send with retries and exponential backoff
sent = _send_with_backoff(producer, topic, message, payload.get('id'))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (90 > 79 characters)

producer = _ensure_producer(cfg)
if producer is None:
# _ensure_producer returns None if kafka-python-ng isn't installed or config invalid
LOG.error("Kafka producer not available (missing dependency or bad config), exiting kafka_producer process")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (116 > 79 characters)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants