java—从splunk源代码读取并写入主题—编写相同的记录没有调出最新的记录

tyu7yeag  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(268)

相同的记录正在写入主题。没有从splunk调出最新的记录。在开始方法中设置时间参数以提取最后一分钟的数据。任何输入。
目前我不设置从源偏移。每次运行poll时,是否查找源偏移量,然后进行poll?在日志中我们可以用时间作为偏移量。

@Override
public List<SourceRecord> poll() throws InterruptedException {
    List<SourceRecord> results = new ArrayList<>();
    Map<String, String> recordProperties = new HashMap<String, String>();
    while (true) {
        try {
            String line = null;                
            InputStream stream = job.getResults(previewArgs);
            String earlierKey = null;
            String value = null;                                
            ResultsReaderCsv csv = new ResultsReaderCsv(stream);
            HashMap<String, String> event;    
            while ((event = csv.getNextEvent()) != null) {
                for (String key: event.keySet())   {                
                    if(key.equals("rawlogs")){
                        recordProperties.put("rawlogs", event.get(key));                                                        results.add(extractRecord(Splunklog.SplunkLogSchema(), line, recordProperties));
                        return results;}}}
            csv.close();
            stream.close();
            Thread.sleep(500);
        } catch(Exception ex) {
            System.out.println("Exception occurred : " + ex);
        }
    }
}
private SourceRecord extractRecord(Schema schema, String line, Map<String, String> recordProperties) {
    Map<String, String> sourcePartition = Collections.singletonMap(FILENAME_FIELD, FILENAME);       
    Map<String, String> sourceOffset = Collections.singletonMap(POSITION_FIELD, recordProperties.get(OFFSET_KEY));
    return new SourceRecord(sourcePartition, sourceOffset, TOPIC_NAME, schema, recordProperties);        
}

@Override
public void start(Map<String, String> properties) {
    try {
        config = new SplunkSourceTaskConfig(properties);
    } catch (ConfigException e) {
          throw new ConnectException("Couldn't start SplunkSourceTask due to configuration error", e);
    }
    HttpService.setSslSecurityProtocol(SSLSecurityProtocol.TLSv1_2);
    Service service = new Service("splnkip", port);
    String credentials = "user:pwd";
    String basicAuthHeader = Base64.encode(credentials.getBytes());
    service.setToken("Basic " + basicAuthHeader);       
    String startOffset = readOffset();
    JobArgs jobArgs = new JobArgs();
    if (startOffset != null) {
        log.info("-------------------------------task OFFSET!NULL ");
        jobArgs.setExecutionMode(JobArgs.ExecutionMode.BLOCKING);
        jobArgs.setSearchMode(JobArgs.SearchMode.NORMAL);
        jobArgs.setEarliestTime(startOffset);
        jobArgs.setLatestTime("now");
        jobArgs.setStatusBuckets(300);
    } else {
        log.info("-------------------------------task OFFSET=NULL ");
        jobArgs.setExecutionMode(JobArgs.ExecutionMode.BLOCKING);
        jobArgs.setSearchMode(JobArgs.SearchMode.NORMAL);
        jobArgs.setEarliestTime("+419m");
        jobArgs.setLatestTime("+420m");
        jobArgs.setStatusBuckets(300);
    }

    String mySearch = "search host=search query";
    job = service.search(mySearch, jobArgs);        
    while (!job.isReady()) {
        try {
            Thread.sleep(500);
        } catch (InterruptedException ex) {
            log.error("Exception occurred while waiting for job to start: " + ex);
        }
    }        
    previewArgs = new JobResultsPreviewArgs();
    previewArgs.put("output_mode", "csv");        
    stop = new AtomicBoolean(false);
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题