Streamdatenübertragung vom REST-Service zur MQ-Warteschlange

Hi, Habr!

In diesem Artikel werde ich eine Methode zum Entwickeln eines REST-Diensts beschreiben, mit der Sie Dateien empfangen und im Streaming-Modus im Messagingsystem speichern können, ohne die gesamte Datei auf der Serviceseite speichern zu müssen. Es wird auch ein umgekehrtes Szenario beschrieben, bei dem der Client eine im Messagingsystem gehostete Datei als Antwort erhält.

Der Klarheit halber werde ich Codebeispiele für den auf JEE7 entwickelten Service für den IBM WebSphere Liberty Server-Anwendungsserver bereitstellen. IBM MQ fungiert als Messagingsystem.
Trotzdem ist das beschriebene Verfahren auch für andere ähnliche Plattformen geeignet, d. Jeder JMS-API-Anbieter kann als Messagingsystem fungieren, und jeder JEE-Server (beispielsweise Apache Tomcat) kann als Anwendungsserver fungieren.

Problemstellung


Es musste eine Lösung implementiert werden, mit der sowohl große Dateien vom Client empfangen werden können (> 100 MB) als auch auf ein anderes geographisch entferntes System übertragen werden können, und in umgekehrter Richtung - Dateien von diesem System als Antwort auf den Client übertragen. Im Hinblick auf den unzuverlässigen Netzwerkkanal zwischen dem Clientnetzwerk und dem Anwendungsnetzwerk wird ein Nachrichtensystem verwendet, um eine garantierte Zustellung zwischen ihnen sicherzustellen.

Die High-Level-Lösung umfasst drei Komponenten:

  1. REST-Dienst - dessen Aufgabe besteht darin, dem Client die Möglichkeit zu geben, eine Datei (oder Anforderung) zu übertragen.
  2. MQ - ist für das Senden von Nachrichten zwischen verschiedenen Netzwerken verantwortlich.
  3. Anwendung - Die Anwendung, die für das Speichern von Dateien und die Ausgabe auf Anfrage verantwortlich ist.

Bild

In diesem Artikel beschreibe ich, wie Sie einen REST-Service implementieren. Dazu gehören folgende Aufgaben:

  • Empfangen einer Datei von einem Client.
  • Übertragen Sie die resultierende Datei an MQ.
  • Dateiübertragung von MQ an den Client als Antwort.

Lösungsmethode


In Anbetracht der großen Größe der zu übertragenden Datei besteht keine Möglichkeit, sie vollständig im RAM zu platzieren. Außerdem gibt es eine Einschränkung durch MQ - die maximale Größe einer einzelnen Nachricht in MQ darf 100 MB nicht überschreiten. Daher basiert meine Entscheidung auf den folgenden Grundsätzen:

  • Das Empfangen und Speichern einer Datei in der MQ-Warteschlange sollte im Streaming-Modus erfolgen, ohne dass die gesamte Datei im Speicher abgelegt wird.
  • In der MQ-Warteschlange wird die Datei als Satz kleiner Nachrichten platziert.

Der Speicherort der Datei auf dem Client, dem REST-Service und dem MQ wird nachstehend grafisch dargestellt:

Bild

Auf der Clientseite befindet sich die Datei vollständig im Dateisystem, im REST-Dienst wird nur ein Teil der Datei im RAM gespeichert, und auf der MQ-Seite wird jeder Teil der Datei als separate Nachricht abgelegt.

REST Service-Entwicklung


Zur Verdeutlichung wird die vorgeschlagene Lösungsmethode als Demo-REST-Dienst entwickelt, der zwei Methoden enthält:

  • upload - empfängt eine Datei vom Client und schreibt sie in die MQ-Warteschlange. Gibt die Nachrichtengruppen-ID (im Base64-Format) als Antwort zurück.
  • download - empfängt die Nachrichtengruppen-ID vom Client (im Base64-Format) und gibt die in der MQ-Warteschlange gespeicherte Datei zurück.

Methode zum Empfang einer Datei vom Client (Upload)


Die Aufgabe der Methode besteht darin, den Stream der eingehenden Datei zu empfangen und dann in die MQ-Warteschlange zu schreiben.

Erhalten Sie die eingehende Datei des Streams


Um eine eingehende Datei vom Client zu erhalten, erwartet die Methode als Eingabeparameter ein Objekt mit der Schnittstelle com.ibm.websphere.jaxrs20.multipart.IMultipartBody, das die Möglichkeit bietet, eine Verknüpfung zum Eingabedatenstrom herzustellen

@PUT@Path("upload")
public Response upload(IMultipartBody body){
	...
	IAttachment attachment = body.getAttachment("file");
	InputStream inputStream = attachment.getDataHandler().getInputStream();
	...
}

Diese Schnittstelle (IMultipartBody) befindet sich im JAR-Archiv com.ibm.websphere.appserver.api.jaxrs20_1.0.21.jar und wird mit dem IBM Liberty Server ausgeliefert. Sie befindet sich im Ordner: < WLP_INSTALLATION_PATH > / dev / api / ibm.

Hinweis:

  • WLP_INSTALLATION_PATH ist der Pfad zum Verzeichnis des WebSphere Liberty-Profils.
  • Es wird erwartet, dass der Client die Datei in einem Parameter namens "file" überträgt.
  • Wenn Sie einen anderen Anwendungsserver verwenden, können Sie die alternative Bibliothek von Apache CXF verwenden.

Speichern Sie das Speichern von Dateien in MQ


Die Methode empfängt als Eingabe einen Stream der eingehenden Datei, den Namen der MQ-Warteschlange, in die die Datei geschrieben werden soll, und die Kennung der Nachrichtengruppe, die zum Verknüpfen der Nachrichten verwendet wird. Die Gruppen-ID wird auf der Serviceseite beispielsweise vom Dienstprogramm org.apache.commons.lang3.RandomStringUtils generiert:

String groupId = RandomStringUtils.randomAscii(24);

Der Algorithmus zum Speichern einer eingehenden Datei in MQ besteht aus den folgenden Schritten:

  1. MQ-Verbindungsobjekte initialisieren.
  2. Zyklisches Lesen eines Teils der eingehenden Datei, bis die Datei vollständig gelesen ist:
    1. Ein Teil der Dateidaten wird als separate Nachricht im MQ aufgezeichnet.
    2. Jede Nachrichtendatei hat eine eigene Sequenznummer (Eigenschaft "JMSXGroupSeq").
    3. Alle Dateimeldungen haben denselben Gruppenwert (die Eigenschaft "JMSXGroupID").
    4. Die letzte Nachricht hat ein Zeichen, dass diese Nachricht endgültig ist (Eigenschaft "JMS_IBM_Last_Msg_In_Group").
    5. Die Konstante SEGMENT_SIZE enthält die Portionsgröße. Zum Beispiel 1 MB.

publicvoidwrite(InputStream inputStream, String queueName, String groupId)throws IOException, JMSException {
	try (
		Connection connection = connectionFactory.createConnection();
		Session session = connection.createSession();
		MessageProducer producer = session.createProducer(session.createQueue(queueName));
	) {
		byte[] buffer = newbyte[SEGMENT_SIZE];
		BytesMessage message = null;
		for(int readBytesSize = 1, sequenceNumber = 1; readBytesSize > 0; sequenceNumber++) {
			readBytesSize = inputStream.read(buffer);
			if (message != null) {
				if (readBytesSize < 1) {
					message.setBooleanProperty("JMS_IBM_Last_Msg_In_Group", true);
				}	producer.send(message);
			}
			if (readBytesSize > 0) {
				message = session.createBytesMessage();
				message.setStringProperty("JMSXGroupID", groupId);
				message.setIntProperty("JMSXGroupSeq", sequenceNumber);
				if (readBytesSize == SEGMENT_SIZE) {
					message.writeBytes(buffer);
				} else {
					message.writeBytes(Arrays.copyOf(buffer, readBytesSize));
				}
			}
		}
	}
}

Die Methode zum Senden der Datei an den Client (Download)


Die Methode empfängt die Gruppenkennung von Nachrichten im Base64-Format, mit der sie Nachrichten aus der MQ-Warteschlange liest und als Streaming-Antwort sendet.

Nachrichtengruppen-ID abrufen


Als Eingabeparameter erhält die Methode die ID der Meldungsgruppe.

@PUT@Path("download")
public Response download(@QueryParam("groupId") String groupId) {
	...
}

Stream-Client-Antwort


Erstellen Sie eine Klasse mit der Schnittstelle javax.ws.rs.core.StreamingOutput, um eine Datei zu übertragen, die als Satz einzelner Nachrichten im Streaming-Modus an MQ gespeichert ist:

publicclassMQStreamingOutputimplementsStreamingOutput{
	private String groupId;
	private String queueName;
	publicMQStreamingOutput(String groupId, String queueName){
		super();
		this.groupId = groupId;
		this.queueName = queueName;
	}
	@Overridepublicvoidwrite(OutputStream outputStream)throws IOException, WebApplicationException {
		try {
			new MQWorker().read(outputStream, queueName, groupId);
		} catch(NamingException | JMSException e) {
			e.printStackTrace();
			new IOException(e);
		} finally {
			outputStream.flush();
			outputStream.close();
		}
	}
}

In der Klasse implementieren wir die write-Methode, die als Eingabe einen Link zum ausgehenden Stream erhält, in den Nachrichten von MQ geschrieben werden. Ich fügte der Klasse einen anderen Warteschlangennamen und eine Gruppen-ID hinzu, deren Nachrichten gelesen werden.

Ein Objekt dieser Klasse wird als Parameter übergeben, um eine Antwort an den Client zu erstellen:

@GET@Path("download")
public Response download(@QueryParam("groupId") String groupId) {
	ResponseBuilder responseBuilder = null;
	try {
		MQStreamingOutput streamingOutput = new MQStreamingOutput(new String(Utils.decodeBase64(groupId)), Utils.QUEUE_NAME);
		responseBuilder = Response.ok(streamingOutput);	
	} catch(Exception e) {
		e.printStackTrace();
	responseBuilder.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage());
	}
	return responseBuilder.build();
}

Stream-Datei lesen aus MQ


Der Algorithmus zum Lesen von Nachrichten von MQ an den ausgehenden Strom besteht aus den folgenden Schritten:

  1. MQ-Verbindungsobjekte initialisieren.
  2. Zyklisches Lesen von Nachrichten von MQ bis die Nachricht mit dem Terminator in der Gruppe gelesen wird (Eigenschaft „JMS_IBM_Last_Msg_In_Group“):
    1. Vor jedem Lesen einer Nachricht aus der Warteschlange wird ein Filter (messageSelector) gesetzt, in dem die Meldungsgruppenkennung und die Meldungssequenznummer in der Gruppe angegeben werden.
    2. Der Inhalt der gelesenen Nachricht wird in den ausgehenden Datenstrom geschrieben.


publicvoidread(OutputStream outputStream, String queueName, String groupId)throws IOException, JMSException {
	try(
		Connection connection = connectionFactory.createConnection();
		Session session = connection.createSession();
	) {
		connection.start();
		Queue queue = session.createQueue(queueName);
		int sequenceNumber = 1;
		for(boolean isMessageExist = true; isMessageExist == true; ) {
			String messageSelector = "JMSXGroupID='" + groupId.replaceAll("'", "''") + "' AND JMSXGroupSeq=" + sequenceNumber++;
			try(
				MessageConsumer consumer = session.createConsumer(queue, messageSelector);
					) {
				BytesMessage message = (BytesMessage) consumer.receiveNoWait();
				if (message == null) {
					isMessageExist = false;
				} else {
					byte[] buffer = newbyte[(int) message.getBodyLength()];
					message.readBytes(buffer);
					outputStream.write(buffer);
					if (message.getBooleanProperty("JMS_IBM_Last_Msg_In_Group")) {
						isMessageExist = false;
					}
				}
			}
		}
	}
}

REST-Service anrufen


Um den Dienst zu testen, verwende ich das Curl-Tool.

Datei senden


curl -X PUT -F file=@<путь_к_файлу> http://localhost:9080/Demo/rest/service/upload

Als Antwort wird eine base64-Zeichenfolge mit der Nachrichtengruppen-ID empfangen, die wir in der folgenden Methode angeben, um die Datei abzurufen.

Eine Datei erhalten


curl -X GET http://localhost:9080/Demo/rest/service/download?groupId=<base64_строка_идентификатор_группы_сообщений> -o <путь_к_файлу_куда_запишется_ответ>

Fazit


In diesem Artikel wurde der Ansatz zum Entwickeln eines REST-Diensts untersucht, der das Streaming sowohl zum Empfangen und Speichern großer Daten in der Messagingsystemwarteschlange als auch zum Lesen dieser Daten aus der Warteschlange als Antwort ermöglicht. Mit dieser Methode kann der Ressourcenverbrauch reduziert und der Durchsatz der Lösung erhöht werden.

Zusätzliche Materialien


Weitere Informationen zur IMultipartBody-Schnittstelle, die zum Empfangen des eingehenden Dateistreams verwendet wird, ist ein Link .

Eine alternative Bibliothek für das Streaming von Dateien in REST-Diensten ist Apache CXF .

Die Schnittstelle StreamingOutput für Streaming gibt eine REST-Antwort an den Client- Link zurück .

Jetzt auch beliebt: