|
von Leonardo Giordani <leo.giordani(at)libero.it> Über den Autor: Student an der Fakultät für Telecommunications Engineering am Polytechnikum in Milano, arbeitet als Netzwerk - Administrator und beschäftigt sich mit Programmieren ( meistens in Assembler und CC++). Arbeitet seit 1999 fast ausschliesslich mit Linux / Unix. Übersetzt ins Deutsche von: Jürgen Pohl <sept.sapins/at/verizon.net> |
Concurrent programming - Message Queue (1)Zusammenfassung: Diese Artikelserie möchte den Leser in das Konzept des Multitasking und dessen Implementation in Linux einführen. Beginnend mit den theoretischen Konzepten, die dem Multitasking zugrunde liegen, werden wir zum Abschluss eine vollständige Anwendung in Form eines einfachen, jedoch wirkungsvollen Protokolls schreiben, welches die Kommunikation zwischen Prozessen demonstriert. Voraussetzungen, um diesem Artikel folgen zu können:
Es ist auch hilfreich, die ersten beiden Artikel dieser Serie gelesen zu haben, sie erschienen im LinusFocus vom November 2002 (Artikel Nr. 272 ) und Januar 2003 (Artikel Nr. 281). |
Prozesse synchronisieren bedeutet, deren Ausführung zeitmässig festzulegen, d.h. nicht in einem absoluten Zeitsystem (das würde bedeuten, einen genauen Zeitpunkt für den Beginn des Prozessablaufs festzulegen), sondern in einer geplanten Reihenfolge zu bestimmen, welcher Prozess zuerst beginnt und welcher als zweiter.
Hierfür Semaphoren zu benutzen, erweist sich als zu komplex und zu begrenzt: komplex, weil jeder Prozess sein Semaphore mit denen der anderen Prozesse synchronisieren müsste. Eingeschränkt, da es uns nicht den Austausch von Parametern zwischen den Prozessen gestattet. Nehmen wir das Beispiel des Aufbaus eines neuen Prozesses: dieses Ereignis sollte jedem beteiligten Prozess mitgeteilt werden, die Semaphoren erlauben einem Prozess nicht solche Informationen auszutauschen.
Die 'concurrency' - Regelung des Zugriffs zu den gemeinsamen Ressourcen durch die Semaphoren kann zu einer andauernden Blockierung eines Prozesses führen, wenn ein weiterer beteiligter Prozess die Ressource freigibt und wieder belegt, bevor ein anderer diese benutzen kann: wie wir erfuhren, ist es in der Welt des "concurrency programming" nicht möglich, im voraus zu wissen, welcher Prozess wann ausgeführt wird.
Hiermit wird klar, dass die Semaphoren ein ungeeignetes Werkzeug zur Behandlung von komplexen Synchronisationsproblemen sind. Eine elegantere Lösung für diese Aufgabe erhalten wir durch die Anwendung von 'Message Queues': in diesem Artikel lernen wir etwas Theorie über diese Vorrichtung zur Kommunikation zwischen Prozessen, ausserdem werden wir ein kleines Programm mit SysV- Primitiven schreiben.
Der Gebrauch von Queues ist insofern die einfache Anwendung eines Mailsystems zwischen Prozessen: jeder Prozess hat eine Adresse und er kann mit anderen Prozessen korrespondieren. Ein Prozess kann also Messages, die an ihn gesandt werden, in einer bestimmten Reihenfolge lesen und entsprechend den vorgefundenen Anforderungen handeln.
Die Synchronisation zweier Prozesse kann infolgedessen einfach durch Messages zwischen ihnen erfolgen: Ressourcen besitzen ausserdem Semaphoren, welche die Prozesse über ihren Status informieren, der zeitliche Ablauf zwischen den Prozessen wird jedoch direkt durchgeführt. Hier wird sofort verständlich, dass der Gebrauch von Message Queues sehr vereinfacht, was anfangs als ein äusserst komplexes Problem erschien.
Bevor wir die Message Queues in der C-Sprache implementieren, müssen wir noch ein anderes Problem erwähnen, das sich auf die Synchronisation bezieht: die Notwendigkeit eines Kommunikationsprotokolls.
Hier ist das einfache Beispiel eines Protokolls, welches auf dem Austausch von Messages basiert: die Prozesse A und B werden parallel ausgeführt und verarbeiten verschiedene Daten - sobald sie das abgeschlossen haben, müssen sie das Ergebnis kombinieren. Ein einfaches Protokoll für diesen Austausch könnte folgendermassen aussehen:
PROZESS B:
Dieses Protokoll kann einfach auf 'n' Prozesse ausgedehnt werden: jeder Prozess, ausser A, bearbeitet seine eigenen Daten und schickt eine Message an A. A antwortet und jeder Prozess sendet seine Ergebnisse: die Struktur der individuellen Prozesse - ausser A - bleibt unverändert.
Die grundlegende Struktur des Systems, das eine Message beschreibt, ist mit msgbuf bezeichnet; ist in linux/msg.h deklariert
/* message buffer for msgsnd and msgrcv calls */ struct msgbuf { long mtype; /* type of message */ char mtext[1]; /* message text */ };
struct message { long mtype; /* message type */ long sender; /* sender id */ long receiver; /* receiver id */ struct info data; /* message content */ ... };
Um eine neue Queue zu erstellen, sollte ein Prozess einen Aufruf an die msgget() Funktion machen
int msgget(key_t key, int msgflg)welche als Argumente einen IPC-Schlüssel und einige Flags erhält, die nun auf
IPC_CREAT | 0660eingestellt werden können (erzeuge die Queue, falls sie noch nicht existiert , erlaube Zugriff für den Besitzer und die Gruppenmitglieder), was den Identifikator für die Queue zurückgibt.
Wie in den vorangegangenen Artikeln nehmen wir an, dass keine Fehler auftreten, so können wir den Code vereinfachen - in einem zukünftigen Artikel werden wir uns mit sicherem IPC-Code beschäftigen.
Um eine Message an eine Queue mit bekanntem Identifikator zu schicken, benutzen wir das msgsnd() Primitivum
int msgsnd(int msqid, struct msgbuf *msgp, int msgsz, int msgflg)
length = sizeof(struct message) - sizeof(long);
Um die Messages in der Queue zu lesen, benutzen wir den
msgrcv() -System - Aufruf
int msgrcv(int msqid, struct msgbuf *msgp, int msgsz, long mtype, int msgflg)
Eine Queue kann man entfernen mit dem Primitivum
msgctl() bei Verwendung des Flag IPC_RMID
msgctl(qid, IPC_RMID, 0)
#include <stdio.h> #include <stdlib.h> #include <linux/ipc.h> #include <linux/msg.h> /* Redefines the struct msgbuf */ typedef struct mymsgbuf { long mtype; int int_num; float float_num; char ch; } mess_t; int main() { int qid; key_t msgkey; mess_t sent; mess_t received; int length; /* Initializes the seed of the pseudo-random number generator */ srand (time (0)); /* Length of the message */ length = sizeof(mess_t) - sizeof(long); msgkey = ftok(".",'m'); /* Creates the Queue*/ qid = msgget(msgkey, IPC_CREAT | 0660); printf("QID = %d\n", qid); /* Builds a message */ sent.mtype = 1; sent.int_num = rand(); sent.float_num = (float)(rand())/3; sent.carattere = 'f'; /* Sends the message */ msgsnd(qid, &sent, length, 0); printf("MESSAGE SENT...\n"); /* Receives the message */ msgrcv(qid, &received, length, sent.mtype, 0); printf("MESSAGE RECEIVED...\n"); /* Controls that received and sent messages are equal */ printf("Interger number = %d (sent %d) -- ", received.int_num, sent.int_num); if(received.int_num == sent.int_num) printf(" OK\n"); else printf("ERROR\n"); printf("Float numero = %f (sent %f) -- ", received.float_num, sent.float_num); if(received.float_num == sent.float_num) printf(" OK\n"); else printf("ERROR\n"); printf("Char = %c (sent %c) -- ", received.ch, sent.ch); if(received.ch == sent.ch) printf(" OK\n"); else printf("ERROR\n"); /* Destroys the Queue */ msgctl(qid, IPC_RMID, 0); }
Mein Code erzeugt eine Queue, die vom Sohn-Prozess benutzt
wird, um Daten zum Vater zu schicken: der Sohn erzeugt
Zufallszahlen, überträgt diese zum Vater und beide
drucken diese als Standard-Ausgabe.
#include <stdio.h> #include <stdlib.h> #include <linux/ipc.h> #include <linux/msg.h> #include <sys/types.h> /* Redefines the message structure */ typedef struct mymsgbuf { long mtype; int num; } mess_t; int main() { int qid; key_t msgkey; pid_t pid; mess_t buf; int length; int cont; length = sizeof(mess_t) - sizeof(long); msgkey = ftok(".",'m'); qid = msgget(msgkey, IPC_CREAT | 0660); if(!(pid = fork())){ printf("FATHER - QID = %d\n", qid); srand (time (0)); for(cont = 0; cont < 10; cont++){ sleep (rand()%4); buf.mtype = 1; buf.num = rand()%100; msgsnd(qid, &buf, length, 0); printf("SON - MESSAGE NUMBER %d: %d\n", cont+1, buf.num); } return 0; } printf("FATHER - QID = %d\n", qid); for(cont = 0; cont < 10; cont++){ sleep (rand()%4); msgrcv(qid, &buf, length, 1, 0); printf("FATHER - MESSAGGE NUMBER %d: %d\n", cont+1, buf.num); } msgctl(qid, IPC_RMID, 0); return 0; }
Der LinuxFocus Redaktion schreiben
© Leonardo Giordani "some rights reserved" see linuxfocus.org/license/ http://www.LinuxFocus.org |
Autoren und Übersetzer:
|
2005-01-14, generated by lfparser_pdf version 2.51