我想使用服务器端事件(sse)来跟踪服务器上运行的批处理过程。
客户端是一个angular 10应用程序,向iis 10 web服务器发送请求,该服务器使用arr将请求转发给tomcat 9,我使用jersey 2.32管理sse请求。
由于需要身份验证,我还使用sse.js来添加授权头。
发送请求很好。它们由jersey接收,我可以在tomcat控制台中看到消息被正确发送(参见下面的java代码)。
不幸的是,我从未在angular客户端应用程序中收到任何消息,我使用浏览器控制台检查的唯一一件事是设置了事件源。
sse get请求处于挂起状态,从未收到响应。
我在谷歌上搜索了很多,想知道我的代码中有什么错误,或者我需要在iis、arr或tomcat中配置什么,但是我被绊住了,所以非常感谢您的帮助。
以下是angular应用程序中使用的代码。
监测服务:
export class BatchProcessMonitoringService {
constructor(private zone: NgZone, private sseService: ServerSideEventsService) { }
public getMonitoringData(url: string, authorizationString: string, formData: FormData): Observable<string> {
return this.getServerSentEvent(url, authorizationString, formData);
}
// get event source (SSE)
private getServerSentEvent(url: string, authorizationString: string, data: FormData): Observable<string> {
return new Observable((observer: Observer<any>) => {
const eventSource = this.sseService.getEventSourceWithGet(url, authorizationString, data);
console.log('batch process event source set');
console.log(eventSource);
eventSource.stream();
eventSource.onmessage = (event: any) => {
this.zone.run(() => {
console.log('batch process event received in angular');
console.log(event);
observer.next(event);
});
};
eventSource.onerror = (error: any) => {
this.zone.run(() => {
console.log('batch process event error in angular');
console.log(error);
observer.error(error);
});
};
});
}
public closeConnection(): void {
this.sseService.closeEventSource();
}
}
sse服务:
export class ServerSideEventsService {
eventSource: SSE;
constructor(
private sseService: ServerSideEventsService
) { }
// create an event source of POST request
public getEventSourceWithPost(url: string, authorizationString: string, formData: FormData): SSE {
return this.buildEventSource(url, authorizationString, 'POST', formData);
}
// create an event source of GET request
public getEventSourceWithGet(url: string, authorizationString: string, formData: FormData): SSE {
return this.buildEventSource(url, authorizationString, 'GET', formData);
}
// build the event source
private buildEventSource(url: string, authorizationString: string, meth: string, formData: FormData): SSE {
const options = this.buildOptions(meth, authorizationString, formData);
this.eventSource = new SSE(url, options);
console.log('sse service this.eventSource');
console.log(this.eventSource);
// add listener
this.eventSource.addEventListener('message', (e: any) => {
console.log('sse service message received');
console.log(e);
return e.data;
});
return this.eventSource;
}
// build query options
private buildOptions(
meth: string,
authorizationString: string,
formData: FormData
): {
payload: FormData;
method: string;
headers: { Authorization: string };
} {
const headerDict = {
'Content-Type': 'application/json',
Authorization: authorizationString
};
return {
payload: formData,
method: meth,
headers: headerDict
};
}
public closeEventSource(): void {
if (!!this.eventSource) {
this.eventSource.close();
}
}
下面是java get方法的代码,名为:
@GET
@Path("/{userId_timestamp}")
@Produces(SseFeature.SERVER_SENT_EVENTS)
public void getUserImportProcessEvent(
@Context SseEventSink eventSink,
@Context Sse sse,
@PathParam("userId_timestamp") String userId_timestamp) {
final EventOutput eventOutput = new EventOutput();
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
// ... code that waits 1 second
final OutboundEvent.Builder eventBuilder = new OutboundEvent.Builder();
eventBuilder.name("message");
eventBuilder.data(String.class, "Hello world " + i + "!");
final OutboundEvent event = eventBuilder.build();
eventOutput.write(event);
System.out.println("write event: " + i);
}
} catch (IOException e) {
throw new RuntimeException("Error when writing the event.", e);
} finally {
try {
eventOutput.close();
} catch (IOException ioClose) {
throw new RuntimeException("Error when closing the event output.", ioClose);
}
}
}).start();
return;
}
1条答案
按热度按时间fdx2calv1#
固定的。我的jersey java代码不正确,下面是正确的代码。