[![Build Status](https://travis-ci.org/inway/mojo-rabbitmq-client.svg?branch=master)](https://travis-ci.org/inway/mojo-rabbitmq-client) [![MetaCPAN Release](https://badge.fury.io/pl/Mojo-RabbitMQ-Client.svg)](https://metacpan.org/release/Mojo-RabbitMQ-Client) # NAME Mojo::RabbitMQ::Client - Mojo::IOLoop based RabbitMQ client # SYNOPSIS use Mojo::RabbitMQ::Client; # Supply URL according to (https://www.rabbitmq.com/uri-spec.html) my $client = Mojo::RabbitMQ::Client->new( url => 'amqp://guest:guest@127.0.0.1:5672/'); # Catch all client related errors $client->catch(sub { warn "Some error caught in client"; }); # When connection is in Open state, open new channel $client->on( open => sub { my ($client) = @_; # Create a new channel with auto-assigned id my $channel = Mojo::RabbitMQ::Client::Channel->new(); $channel->catch(sub { warn "Error on channel received"; }); $channel->on( open => sub { my ($channel) = @_; $channel->qos(prefetch_count => 1)->deliver; # Publish some example message to test_queue my $publish = $channel->publish( exchange => 'test', routing_key => 'test_queue', body => 'Test message', mandatory => 0, immediate => 0, header => {} ); # Deliver this message to server $publish->deliver; # Start consuming messages from test_queue my $consumer = $channel->consume(queue => 'test_queue'); $consumer->on(message => sub { say "Got a message" }); $consumer->deliver; } ); $channel->on(close => sub { $log->error('Channel closed') }); $client->open_channel($channel); } ); # Start connection $client->connect(); # Start Mojo::IOLoop if not running already Mojo::IOLoop->start unless Mojo::IOLoop->is_running; ## CONSUMER use Mojo::RabbitMQ::Client; my $consumer = Mojo::RabbitMQ::Client->consumer( url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&queue=mojo', defaults => { qos => {prefetch_count => 1}, queue => {durable => 1}, consumer => {no_ack => 0}, } ); $consumer->catch(sub { die "Some error caught in Consumer" } ); $consumer->on('success' => sub { say "Consumer ready" }); $consumer->on( 'message' => sub { my ($consumer, $message) = @_; $consumer->channel->ack($message)->deliver; } ); $consumer->start(); Mojo::IOLoop->start unless Mojo::IOLoop->is_running; ## PUBLISHER use Mojo::RabbitMQ::Client; my $publisher = Mojo::RabbitMQ::Client->publisher( url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&queue=mojo' ); $publisher->catch(sub { die "Some error caught in Publisher" } ); $publisher->on('success' => sub { say "Publisher ready" }); $publisher->publish('plain text'); $publisher->publish({encode => { to => 'json'}}); Mojo::IOLoop->start unless Mojo::IOLoop->is_running; # DESCRIPTION [Mojo::RabbitMQ::Client](https://metacpan.org/pod/Mojo::RabbitMQ::Client) is a rewrite of [AnyEvent::RabbitMQ](https://metacpan.org/pod/AnyEvent::RabbitMQ) to work on top of [Mojo::IOLoop](https://metacpan.org/pod/Mojo::IOLoop). # EVENTS [Mojo::RabbitMQ::Client](https://metacpan.org/pod/Mojo::RabbitMQ::Client) inherits all events from [Mojo::EventEmitter](https://metacpan.org/pod/Mojo::EventEmitter) and can emit the following new ones. ## connect $client->on(connect => sub { my ($client, $stream) = @_; ... }); Emitted when TCP/IP connection with RabbitMQ server is established. ## open $client->on(open => sub { my ($client) = @_; ... }); Emitted AMQP protocol Connection.Open-Ok method is received. ## close $client->on(close => sub { my ($client) = @_; ... }); Emitted on reception of Connection.Close-Ok method. ## disconnect $client->on(close => sub { my ($client) = @_; ... }); Emitted when TCP/IP connection gets disconnected. # ATTRIBUTES [Mojo::RabbitMQ::Client](https://metacpan.org/pod/Mojo::RabbitMQ::Client) has following attributes. ## tls my $tls = $client->tls; $client = $client->tls(1) Force secure connection. Default is disabled (`0`). ## user my $user = $client->user; $client = $client->user('guest') Sets username for authorization, by default it's not defined. ## pass my $pass = $client->pass; $client = $client->pass('secret') Sets user password for authorization, by default it's not defined. ## pass my $pass = $client->pass; $client = $client->pass('secret') Sets user password for authorization, by default it's not defined. ## host my $host = $client->host; $client = $client->host('localhost') Hostname or IP address of RabbitMQ server. Defaults to `localhost`. ## port my $port = $client->port; $client = $client->port(1234) Port on which RabbitMQ server listens for new connections. Defaults to `5672`, which is standard RabbitMQ server listen port. ## vhost my $vhost = $client->vhost; $client = $client->vhost('/') RabbitMQ virtual server to user. Default is `/`. ## params my $params = $client->params; $client = $client->params(Mojo::Parameters->new('verify=1')) Sets additional parameters for connection. Default is not defined. For list of supported parameters see ["SUPPORTED QUERY PARAMETERS"](#supported-query-parameters). ## url my $url = $client->url; $client = $client->url('amqp://...'); Sets all connection parameters in one string, according to specification from [https://www.rabbitmq.com/uri-spec.html](https://www.rabbitmq.com/uri-spec.html). amqp_URI = "amqp[s]://" amqp_authority [ "/" vhost ] [ "?" query ] amqp_authority = [ amqp_userinfo "@" ] host [ ":" port ] amqp_userinfo = username [ ":" password ] username = *( unreserved / pct-encoded / sub-delims ) password = *( unreserved / pct-encoded / sub-delims ) vhost = segment ## heartbeat\_timeout my $timeout = $client->heartbeat_timeout; $client = $client->heartbeat_timeout(180); Heartbeats are use to monitor peer reachability in AMQP. Default value is `60` seconds, if set to `0` no heartbeats will be sent. ## connect\_timeout my $timeout = $client->connect_timeout; $client = $client->connect_timeout(5); Connection timeout used by [Mojo::IOLoop::Client](https://metacpan.org/pod/Mojo::IOLoop::Client). Defaults to environment variable `MOJO_CONNECT_TIMEOUT` or `10` seconds if nothing else is set. ## max\_channels my $max_channels = $client->max_channels; $client = $client->max_channels(10); Maximum number of channels allowed to be active. Defaults to `0` which means no implicit limit. When you try to call `add_channel` over limit an `error` will be emitted on channel saying that: _Maximum number of channels reached_. # STATIC METHODS ## consumer my $client = Mojo::RabbitMQ::Client->consumer(...) Shortcut for creating [Mojo::RabbitMQ::Client::Consumer](https://metacpan.org/pod/Mojo::RabbitMQ::Client::Consumer). ## publisher my $client = Mojo::RabbitMQ::Client->publisher(...) Shortcut for creating [Mojo::RabbitMQ::Client::Publisher](https://metacpan.org/pod/Mojo::RabbitMQ::Client::Publisher). # METHODS [Mojo::RabbitMQ::Client](https://metacpan.org/pod/Mojo::RabbitMQ::Client) inherits all methods from [Mojo::EventEmitter](https://metacpan.org/pod/Mojo::EventEmitter) and implements the following new ones. ## connect $client->connect(); Tries to connect to RabbitMQ server and negotiate AMQP protocol. ## close $client->close(); ## param my $param = $client->param('name'); $client = $client->param(name => 'value'); ## add\_channel my $channel = Mojo::RabbitMQ::Client::Channel->new(); ... $channel = $client->add_channel($channel); $channel->open; ## open\_channel my $channel = Mojo::RabbitMQ::Client::Channel->new(); ... $client->open_channel($channel); ## delete\_channel my $removed = $client->delete_channel($channel->id); # SUPPORTED QUERY PARAMETERS There's no formal specification, nevertheless a list of common parameters recognized by officially supported RabbitMQ clients is maintained here: [https://www.rabbitmq.com/uri-query-parameters.html](https://www.rabbitmq.com/uri-query-parameters.html). Some shortcuts are also supported, you'll find them in parenthesis. Aliases are less significant, so when both are specified only primary value will be used. ## cacertfile (_ca_) Path to Certificate Authority file for TLS. ## certfile (_cert_) Path to the client certificate file for TLS. ## keyfile (_key_) Path to the client certificate private key file for TLS. ## fail\_if\_no\_peer\_cert (_verify_) TLS verification mode, defaults to 0x01 on the client-side if a certificate authority file has been provided, or 0x00 otherwise. ## auth\_mechanism Currently only AMQPLAIN is supported, **so this parameter is ignored**. ## heartbeat Sets requested heartbeat timeout, just like `heartbeat_timeout` attribute. ## connection\_timeout (_timeout_) Sets connection timeout - see [connection\_timeout](https://metacpan.org/pod/connection_timeout) attribute. ## channel\_max Sets maximum number of channels - see [max\_channels](https://metacpan.org/pod/max_channels) attribute. # SEE ALSO [Mojo::RabbitMQ::Client::Channel](https://metacpan.org/pod/Mojo::RabbitMQ::Client::Channel), [Mojo::RabbitMQ::Client::Consumer](https://metacpan.org/pod/Mojo::RabbitMQ::Client::Consumer), [Mojo::RabbitMQ::Client::Publisher](https://metacpan.org/pod/Mojo::RabbitMQ::Client::Publisher) # COPYRIGHT AND LICENSE Copyright (C) 2015-2017, Sebastian Podjasek and others Based on [AnyEvent::RabbitMQ](https://metacpan.org/pod/AnyEvent::RabbitMQ) - Copyright (C) 2010 Masahito Ikuta, maintained by `bobtfish@bobtfish.net` This program is free software, you can redistribute it and/or modify it under the terms of the Artistic License version 2.0.