我对大气有这种依赖
<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-runtime</artifactId>
<version>2.5.4</version>
</dependency>
这是给Kafka的
<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-kafka</artifactId>
<version>2.5.2</version>
</dependency>
问题如下。在defaultbroadcaster.class的658行有atmosphere(没有kafka lib)
if (resources.isEmpty()) {
logger.trace("No resource available for {} and message {}", getID(), finalMsg);
entryDone(deliver.future);
if (cacheForSet != null) {
cacheForSet.clear();
}
return;
}
在此模式下到达对象“资源”已满
AtmosphereResource{
uuid=5348f619-df31-4bf6-a4eb-654ba64ef886,
transport=WEBSOCKET,
isInScope=true,
isResumed=false,
isCancelled=false,
isSuspended=true,
broadcasters=/clinicalevent/manager,
isClosedByClient=false,
isClosedByApplication=false,
action=Action{timeout=-1, type=SUSPEND}}
在kafka库中,“resource”对象是空的。到达空不广播到浏览器
这是我的大气资源课
package eu.dedalus.sop.o4c.servlets.atmosphere.clinicalevent;
import java.io.IOException;
import org.atmosphere.config.service.Disconnect;
import org.atmosphere.config.service.ManagedService;
import org.atmosphere.config.service.Message;
import org.atmosphere.config.service.Ready;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResourceEvent;
import com.owlike.genson.Genson;
import eu.dedalus.sop.log4j.LogManagerSOP;
import eu.dedalus.sop.log4j.LoggerSOP;
import eu.dedalus.sop.o4c.models.common.atmosphere.DataEntryOperation;
@ManagedService(path = "/clinicalevent/manager")
public final class ClinicalEventManager {
private static final LoggerSOP logger = LogManagerSOP.getLogger(ClinicalEventManager.class);
private static final Genson genson = new Genson();
@Ready
public final void onReady(final AtmosphereResource r) {
logger.info("Browser " + r.uuid() + " connected.");
}
@Disconnect
public final void onDisconnect(final AtmosphereResourceEvent event) {
if (event.isCancelled())
logger.info("Browser " + event.getResource().uuid() + " unexpectedly disconnected");
else if (event.isClosedByClient())
logger.info("Browser " + event.getResource().uuid() + " closed the connection");
}
@Message(encoders = {
ClinicalEventManagerEncoderDecoder.class
}, decoders = {
ClinicalEventManagerEncoderDecoder.class
})
public final DataEntryOperation onMessage(final DataEntryOperation message) throws IOException {
logger.info(message.getUser() + " just sent " + genson.serialize(message));
return message;
}
}
我是Kafka
bootstrap.servers=localhost:9092
group.id=clinicaleventmanager
zk.connect=localhost:2181
zookeeper.connect=localhost:2181
这是web.xml中的atmosphereservlet
<servlet>
<servlet-name>AtmosphereServlet</servlet-name>
<servlet-class>org.atmosphere.cpr.AtmosphereServlet</servlet-class>
<async-supported>true</async-supported>
<init-param>
<param-name>org.atmosphere.websocket.maxTextMessageSize</param-name>
<param-value>1048576</param-value>
</init-param>
<init-param>
<param-name>org.atmosphere.kafka.propertiesFile</param-name>
<param-value>WEB-INF/classes/kafka.properties</param-value>
</init-param>
<load-on-startup>2</load-on-startup>
</servlet>
这是编码器/解码器类公共最终类ClinicalEventManagerCoderCoder实现编码器、解码器{
private static final Genson genson = new Genson();
@Override
public DataEntryOperation decode(final String message) {
try {
DataEntryOperation dataEntryOperation = genson.deserialize(message, DataEntryOperation.class);
dataEntryOperation.setDateTime(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
return dataEntryOperation;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public String encode(final DataEntryOperation message) {
try {
return genson.serialize(message);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
有什么问题吗?
暂无答案!
目前还没有任何答案,快来回答吧!