NAME Kafka::Producer::Avro - Avro message producer for Apache Kafka. SYNOPSIS use Kafka::Connection; use Kafka::Producer::Avro; my $connection = Kafka::Connection->new( host => 'localhost' ); my $producer = Kafka::Producer::Avro->new( Connection => $connection ); # Do some interactions with Avro & SchemaRegistry before sending messages # Sending a single message my $response = $producer->send(...); # Sending a series of messages $response = $producer->send(...); # Closes the producer and cleans up undef $producer; $connection->close; undef $connection; DESCRIPTION "Kafka::Producer::Avro" main feature is to provide object-oriented API to produce messages according to *Confluent SchemaRegistry* and *Avro* serialization. "Kafka::Producer::Avro" inerhits from and extends Kafka::Producer. CONSTRUCTOR "new" Creates new producer client object. "new()" takes arguments in key-value pairs as described in Kafka::Producer from which it inherits. In addition, takes in the following arguments: "SchemaRegistry => $schema_registry" (mandatory) Is a Confluent::SchemaRegistry instance. METHODS The following methods are defined for the "Kafka::Avro::Producer" class: "schema_registry"() Returns the Confluent::SchemaRegistry instance supplied to the construcor. "get_error"() Returns a string containing last error message. "send( %params )" Sends a messages on a Kafka::Connection object. Returns a non-blank value (a reference to a hash with server response description) if the message is successfully sent. Despite Kafka::Producer method that expects positional arguments, "Kafka::Producer::Avro-"send()> method looks for named parameters: $producer->send( topic => $topic, # scalar partition => $partition, # scalar messages => $messages, # scalar | array keys => $keys, # scalar | array compression_codec => $compression_codec, # scalar key_schema => $key_schema, # optional JSON-string value_schema => $value_schema # optional JSON-string ); Extra arguments may be suggested: "key_schema => $key_schema" and "value_schema => $value_schema" Both $key_schema and $value_schema parameters are optional and provide JSON strings that represent Avro schemas to use to validate and serialize key(s) and value(s). These schemas are validated against "schema_registry" and, if compliant, they are added to the registry under the "$topic+'key'" or "$topic+'value'" subjects. If an expected schema isn't provided, latest version from Schema Registry is used accordingly to the subject (key or value). "bulk_send( %params )" Similar to "send" but uses bulks to avoid memory leaking. Extra named parameters are expected: "size => $size" The size of the bulk "on_before_send_bulk => sub {...} " (optional) A code block that will be executed before the sending of each bulk. The block will receive the following positional parameters: $bulk_num the number of the bulk $bulk_messages the number of messages in the bulk $bulk_keys the number of keys in the bulk $index_from the absolute index of the first message in the bulk $index_to the absolute index of the last message in the bulk "on_after_send_bulk => sub {...} " (optional) A code block that will be executed after the sending of each bulk. The block will receive the following positional parameters: $sent the number of sent messages in the bulk $total_sent the total number of messages sent "on_init => sub {...} " (optional) A code block that will be executed only once before at the beginning of the cycle. The block will receive the following positional parameters: $to_send the total number of messages to send $bulk_size the size of the bulk "on_complete => sub {...} " (optional) A code block that will be executed only once after the end of the cycle. The block will receive the following positional parameters: $to_send the total number of messages to send $total_sent the total number of messages sent $errors the number bulks sent with errors "on_send_error => sub {...} " (optional) A code block that will be executed when a bulk registers an error. AUTHOR Alvaro Livraghi, CONTRIBUTE BUGS Please use GitHub project link above to report problems or contact authors. COPYRIGHT AND LICENSE Copyright 2018 by Alvaro Livraghi This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.