11.07.2017 - 10:00

Ekspanzija distribuiranog računarstva doživljava svoj boom i sa sobom nosi nove izazove. Stari dobri HTTP protokol, jedan od najpoznatijih i najkorišćenijih, više nije dovoljan i ne može pokriti potrebe aplikacija, koje izvršavaju sve složenije akcije. Ono što nam je sad potrebno jesu rešenja koja su brža, pouzdanija, lakša za održavanje i skaliranje-a jedno od njih je upravo RabbitMQ, baziran na AMQ protokolu.

AMQ PROTOKOL

AMQP (Advanced Message Queuing Protocol) definiše semantiku za razmenu poruka, koju primenjuju brokeri za komunikaciju sa klijentskim aplikacijama. AMQ model jeste skup komponenti bitnih za komunikaciju u cilju razmene poruka. I te osnovne komponete su:

  • Queue – služi za čuvanje poruka dok ih klijentska aplikacija sigurno ne preuzme.
  • Exchange – primaju poruku i prosleđuju do željenog queue-a po definisanom kriterijumu.
  • Binding – predstavlja relaciju između queue-a i exchange-a, koja informiše exchange kako da rutira poruku.

Dva bitna pojma, koja ćemo dalje koristiti, a koja su od velike važnosti za protokol i samo rutiranje poruka su routing ključ i binding ključ. Prvi ključ se šalje u poruci i predstavlja parametar na osnovu kog se rutira poruka, dok je drugi sastavni deo binding komponente i služi za poređenje sa prosleđenim routing ključem pri donošenju odluke gde će poruka biti prosleđena.

Za opis rada AMQP servera se često koristi princip slanja email poruka. Pošiljalac šalje poruku do transfer agenta koji potom prosleđuje poruku do odgovarajuceg sandučeta, prema definisanom pravilu. Klijent preuzima poruku iz poštanskog sandučeta. U praksi se često mogu naći primeri gde pošiljalac poruku šalje direktno u queue. Ovo nije preporučljiv način korišćenja jer je cilj da tok poruke bude kontrolisan od same arhiktekture a ne od dela koda koji implementira pravilo slanja.

Rutiranje poruka

RabbitMQ predstavlja jedan od primera brokera za razmenu poruka koji podržava AMQP. Pored AMQ protokola  RabbitMQ podržava jos i STOPMP, MQTT kao i HTTP. Iako podržava više protokola RabbitMQ je izvorno razvijen za podršku AMQP. U prethodnom poglavlju smo naveli osnovne komponente koje čine AMQ model. Postoji nekoliko vrsta exchange-ova koji su podržani od strane protokola:

  • Direct exchange – rutira poruku na osnovu ključa (routing ključa) koji se prosleđuje u header-u poruke. Odluka kom queue-u će exchange proslediti poruku je bazirana na tome koji binding ima ključ, koji je jednak prosleđenom ključu u poruci. Ovo je primer korišćenja unicast rutiranja na osnovu prosleđenog ključa.
  • Topic exchange – Prosleđuje poruke queue-ovima, kod kojih routing kluč odgovara biniding ključu. Routing ključ mora biti sastavljen od reči razdvojenim tačkama. Forme binding ključa su:
    • *.orange.* ->  (* je zamena za tacno jednu reč)
    • Lazy.# ->  (# je zamena za 0 ili vise reči)
  • Fanout exchange – Prosleđuje poruke svim queue-ovima koji su pridruženi datom exchange-u, ignorišući routing ključ. Ovaj exchange predstavlja primer slanja broadcast poruka.

Na sledećem primeru ilustrujemo rutiranje poruka kroz jednu konfiguraciju RabbitMQ servera koja se sastoji od četiri queue komponente (Q1..4), tri exchange komponente (X1..2 i T1) i pet binding komponenti (B1..4).

Ovde vidimo nekoliko slučajeva rutiranja poruka kroz sistem:

  • P1 (routing_key=apple) -> X1: poruka P1 će biti prosleđena u queue Q1 pošto njen routing ključ odgovara binding ključu queue-a. Svaka poruka koja se pošalje na X1 gde je routing ključ različit od apple biće odbačena. Sve poruke koje završe u Q1 ostaće tu trajno dok ih neko ne pokupi. Ovo je ponašanje koje se dobija ukoliko se poruci ne definise ttl (time to live).
  • P2 (routing_key=fruit.orange) -> X2: poruka P2 će završiti u Q1 pošto njen routing ključ odgovara binding ključu koji prolseđuje poruke u Q1. Ovo je jedan od primera koji demonstrira kako topic exchange razvrstava poruke po routing ključu.
  • P3 (routing_key=car.bmw.black) -> X2: poslata poruka će biti prosleđena u Q2 na osnovu routing ključa. Ukoliko korisnik ne pročita poruku u roku od 10 sekundi ona će biti prosleđena na exchange T1 iz razloga što je x_dead_letter_exchange postavljen na T1. Pored ovog argumenta postoji još jedan koji se zove x_dead_letter_routing_key. Ukoliko vrednost ovog argumenta nije postavljena, na exchange će biti prosleđena poruka sa vrednošću routing ključa sa kojom je ušla u sam queue (u konkretnom slučaju car.bmw.black).  Pošto pomenuti routing ključ spada u domen vrednosti  binding ključa ova poruka će biti prosleđena u queue Q3.
  • P4 (routing_key=car.bmw.white) ->X4: sve isto kao za primer prethodne poslate poruke samo što će u slučaju timeout-a biti prosleđena u Q4.

Jednostavan primer producer/consumer komunikacije korišćenjem RabbitMQ

U narednom primeru ćemo prikazati, na principu jednostavne komunikacije, kako izgleda razmena poruka između producer-a i consumer-a. Za primer je izabran python programski jezik i biblioteka pika koja nudi dosta različitih funkcija za rad sa RabbitMQ-om. Pika je dobar izbor za implementaciju, jer dosta dobro reprezentuje AMQP.

Python skripta za slanje poruka:

import json
from pika import BlockingConnection, ConnectionParameters, PlainCredentials

conn = ConnectionParameters(host='localhost', virtual_host='test', credentials=PlainCredentials('guest', 'guest'))

connection = BlockingConnection(parameters=conn)

car_description = {'km': 10000, 'year': 2014}

channel = connection.channel()

channel.exchange_declare(exchange='ex_topic_probe', type='topic', durable=True)

channel.basic_publish(exchange='ex_topic_probe',
                    routing_key='car.bmw.black',
                    body=json.dumps(car_description)
                    )

connection.close()

Python skripta za konzumiranje poruka:

import json
from pika import BlockingConnection, ConnectionParameters, PlainCredentials

def callback(ch, method, properties, body):
  print('routing key: {}, body: {}'.format(method.routing_key, json.loads(body)))

conn = ConnectionParameters(host='localhost', virtual_host='test', credentials=PlainCredentials('admin', 'admin'))

connection = BlockingConnection(parameters=conn)

channel = connection.channel()

channel.queue_declare(queue='cars_queue', durable=True, arguments={'x-message-ttl': 10000, 'x-dead-letter_exchange':
                    'timeout_exchange'})
channel.queue_bind(queue='cars_queue', routing_key='car.*.*', exchange='ex_topic_probe')

channel.basic_consume(queue='cars_queue', consumer_callback=callback, no_ack=True)

channel.start_consuming()

Osnovna i zajednička stvar obe skripte je kreiranje konekcije i channel objekata koji predstavljaju početnu tačku komunikacije između procesa kroz RabbitMQ. Pre slanja pošiljalac mora, nad kreiranim channel objektom, da deklariše exchange (channel.exchange_declare metoda). Exchange će biti kreiran isključivio ako već ne postoji. Slanje poruka se sprovodi pozivom metode publish.

Prilikom preuzimanja poruka neophodno je deklarisati komponente koje su bitne za samu operaciju, a to je deklaracija queue i binding komponente. Pozivom basic_consume metode objekta channel označava se queue iz koga će se uzeti poruka kao i callback koji će biti izvršen po obavljanju operacije. Sa pozivom start_consuming metode proces počinje sa blokirajućom operacijom uzimanja poruka. Pozivom opracije stop_consuming se zaustavlja proces koji uzima poruke, ali je to potrebno uraditi iz drugog thread-a. Izlaz pokretanja datog programa biće “routing key: car.bmw.black, body: {u'km': 100000, u'year': 2014}”.

Na datom jednostavnom primeru slanja i primanja poruka, možemo uočiti prednost korišćenja ovog pristupa u pogledu skaliranja. Na jedan RabbitMQ server možemo povezati proizvoljan broj producer i consumer procesa (sa iste ili različitih mašina) na vrlo jednostavan i brz način, što dovodi do velikog poboljšanja u pogledu performansi i održavanja.

Kreiranje osnovne RabbitMQ konfiguracije

Za kreiranje komponenti (queue, channel, exchange) može se iskoristiti HTTP API RabbitMQ servera. Drugi pristup je preko konzolne aplikacije, kojoj se može pristupiti kada se pokrene servis (IP podrazumevana adresa: localhost:15672).

Iz terminala izvršavanjem sledećih naredbi možemo kreirati komponente iz prikaznog primera (pretpostavka je da je RabbitMQ već instaliran i da sluša na portu 15672):

Na isti način se kreira timeout exchange kao i potrebni queue-ovi.

Dodaj novi komentar

Plain text

  • HTML tagovi nisu dozvoljeni
  • Web i email adrese automatski postaju linkovi
  • Redovi i paragrafi se prelamaju automatski.
NAPOMENA:
IT-KONEKT zadržava pravo izbora i skraćivanja komentara koji će biti objavljeni na veb sajtu.
Neće biti objavljivani komentari koji sadrže govor mržnje, pretnje, uvrede i psovke.
Očekujemo da tekstovi budu pravopisno i gramatički ispravni.
Komentari objavljeni na ovom veb sajtu predstavljaju privatno mišljenje njihovih autora a ne zvaničan stav IT-KONEKT tima.
CAPTCHA
This question is for testing whether or not you are a human visitor and to prevent automated spam submissions.
Image CAPTCHA
Enter the characters shown in the image.