我正在尝试运行sta/lta代码,它用于spark流媒体上的事件检测,并使用kafka生成流媒体数据,但我无法得到结果。我有两个.py文件,一个包含sta/lta代码,另一个包含spark流代码。我的问题是,当我运行sparkkafka.py时,它没有显示应该由plot\u trigger函数生成的任何事件图。实际上我的Spark代码什么都没用。我做错了什么?
# STA/LTA.py
# !/usr/bin/env python
# -*- coding: utf-8 -*-
# -------------------------------------------------------------------
# Filename: trigger.py
# Purpose: Python trigger/picker routines for seismology.
# Author: Moritz Beyreuther, Tobias Megies
# Email: moritz.beyreuther@geophysik.uni-muenchen.de
#
# Copyright (C) 2008-2012 Moritz Beyreuther, Tobias Megies
# -------------------------------------------------------------------
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from future.builtins import * # NOQA
from collections import deque
import ctypes as C
import warnings
import numpy as np
from obspy import UTCDateTime
from obspy.signal.cross_correlation import templates_max_similarity
from obspy.signal.headers import clibsignal, head_stalta_t
from numpy import genfromtxt
from obspy import read
def classic_sta_lta(a, nsta, nlta):
"""
Computes the standard STA/LTA from a given input array a. The length of
the STA is given by nsta in samples, respectively is the length of the
LTA given by nlta in samples.
Fast version written in C.
:type a: NumPy :class:`~numpy.ndarray`
:param a: Seismic Trace
:type nsta: int
:param nsta: Length of short time average window in samples
:type nlta: int
:param nlta: Length of long time average window in samples
:rtype: NumPy :class:`~numpy.ndarray`
:return: Characteristic function of classic STA/LTA
"""
data = a
# initialize C struct / NumPy structured array
head = np.empty(1, dtype=head_stalta_t)
head[:] = (len(data), nsta, nlta)
# ensure correct type and contiguous of data
data = np.ascontiguousarray(data, dtype=np.float64)
# all memory should be allocated by python
charfct = np.empty(len(data), dtype=np.float64)
# run and check the error-code
errcode = clibsignal.stalta(head, data, charfct)
if errcode != 0:
raise Exception('ERROR %d stalta: len(data) < nlta' % errcode)
return charfct
def trigger_onset(charfct, thres1, thres2, max_len=9e99, max_len_delete=False):
"""
Calculate trigger on and off times.
Given thres1 and thres2 calculate trigger on and off times from
characteristic function.
This method is written in pure Python and gets slow as soon as there
are more then 1e6 triggerings ("on" AND "off") in charfct --- normally
this does not happen.
:type charfct: NumPy :class:`~numpy.ndarray`
:param charfct: Characteristic function of e.g. STA/LTA trigger
:type thres1: float
:param thres1: Value above which trigger (of characteristic function)
is activated (higher threshold)
:type thres2: float
:param thres2: Value below which trigger (of characteristic function)
is deactivated (lower threshold)
:type max_len: int
:param max_len: Maximum length of triggered event in samples. A new
event will be triggered as soon as the signal reaches
again above thres1.
:type max_len_delete: bool
:param max_len_delete: Do not write events longer than max_len into
report file.
:rtype: List
:return: Nested List of trigger on and of times in samples
"""
# 1) find indices of samples greater than threshold
# 2) calculate trigger "of" times by the gap in trigger indices
# above the threshold i.e. the difference of two following indices
# in ind is greater than 1
# 3) in principle the same as for "of" just add one to the index to get
# start times, this operation is not supported on the compact
# syntax
# 4) as long as there is a on time greater than the actual of time find
# trigger on states which are greater than last of state an the
# corresponding of state which is greater than current on state
# 5) if the signal stays above thres2 longer than max_len an event
# is triggered and following a new event can be triggered as soon as
# the signal is above thres1
ind1 = np.where(charfct > thres1)[0]
if len(ind1) == 0:
return []
ind2 = np.where(charfct > thres2)[0]
#
on = deque([ind1[0]])
of = deque([-1])
# determine the indices where charfct falls below off-threshold
ind2_ = np.empty_like(ind2, dtype=bool)
ind2_[:-1] = np.diff(ind2) > 1
# last occurence is missed by the diff, add it manually
ind2_[-1] = True
of.extend(ind2[ind2_].tolist())
on.extend(ind1[np.where(np.diff(ind1) > 1)[0] + 1].tolist())
# include last pick if trigger is on or drop it
if max_len_delete:
# drop it
of.extend([1e99])
on.extend([on[-1]])
else:
# include it
of.extend([ind2[-1]])
#
pick = []
while on[-1] > of[0]:
while on[0] <= of[0]:
on.popleft()
while of[0] < on[0]:
of.popleft()
if of[0] - on[0] > max_len:
if max_len_delete:
on.popleft()
continue
of.appendleft(on[0] + max_len)
pick.append([on[0], of[0]])
return np.array(pick, dtype=np.int64)
def plot_trigger(trace, cft, thr_on, thr_off, show=True):
"""
Plot characteristic function of trigger along with waveform data and
trigger On/Off from given thresholds.
:type trace: :class:`~obspy.core.trace.Trace`
:param trace: waveform data
:type cft: :class:`numpy.ndarray`
:param cft: characteristic function as returned by a trigger in
:mod:`obspy.signal.trigger`
:type thr_on: float
:param thr_on: threshold for switching trigger on
:type thr_off: float
:param thr_off: threshold for switching trigger off
:type show: bool
:param show: Do not call `plt.show()` at end of routine. That way,
further modifications can be done to the figure before showing it.
"""
import matplotlib.pyplot as plt
df = trace.stats.sampling_rate
npts = trace.stats.npts
t = np.arange(npts, dtype=np.float32) / df
fig = plt.figure()
ax1 = fig.add_subplot(211)
ax1.plot(t, trace.data, 'k')
ax2 = fig.add_subplot(212, sharex=ax1)
ax2.plot(t, cft, 'k')
on_off = np.array(trigger_onset(cft, thr_on, thr_off))
i, j = ax1.get_ylim()
try:
ax1.vlines(on_off[:, 0] / df, i, j, color='r', lw=2,
label="Trigger On")
ax1.vlines(on_off[:, 1] / df, i, j, color='b', lw=2,
label="Trigger Off")
ax1.legend()
except IndexError:
pass
ax2.axhline(thr_on, color='red', lw=1, ls='--')
ax2.axhline(thr_off, color='blue', lw=1, ls='--')
ax2.set_xlabel("Time after %s [s]" % trace.stats.starttime.isoformat())
fig.suptitle(trace.id)
fig.canvas.draw()
if show:
plt.show()
def main():
#a = genfromtxt('syntheticData.csv', delimiter=',')
st = read('http://examples.obspy.org/RJOB_061005_072159.ehz.new')
trace = st[0]
#length = len(trace.data)
#trace.data = a[:length]
charfct = classic_sta_lta(trace.data, 2, 30)
trigger_onset(charfct, 4, 2, max_len=9e99, max_len_delete=False)
plot_trigger(trace, charfct, 4, 2, show=True)
if __name__ == '__main__':
main()
import doctest
doctest.testmod(exclude_empty=True)
# sparkkafka.py
import sys
# Spark
from pyspark import SparkContext
# Spark Streaming
from pyspark.streaming import StreamingContext
# Kafka
from pyspark.streaming.kafka import KafkaUtils
import STALTA
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "STALTA")
ssc = StreamingContext(sc, 1)
# Connect to Kafka
kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming-consumer', {'consumer6':1})
kafkaStream.pprint()
# Process the stream
sta_lta_ratio = kafkaStream.map(lambda i: classic_sta_lta(i, 2, 30))
triggers = sta_lta_ratio.flatMap(lambda j: trigger_onset(j, 4, 2, max_len=9e99, max_len_delete=False))
# output stream
kafkaStream.map(lambda k: plot_trigger(k, sta_lta_ratio, 4, 2, show=True))
ssc.start()
ssc.awaitTermination()
运行 sparkkafka.py
,将其放入以下目录,然后运行以下命令。目录: spark-2.3.1-bin-hadoop2.7/examples/src/main/python/streaming
命令: bin/spark-submit --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar examples/src/main/python/streaming/sparkkafka.py localhost:2181 consumer6
或
此命令: bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11 examples/src/main/python/streaming/sparkkafka.py localhost:2181 consumer6
在我的示例中,Kafka主题名是consumer6。
应加载到Kafka的输入文件可通过以下链接获得:https://www.dropbox.com/s/wf2cpdlrbwnip14/inputfile.txt?dl=0
下载后放入Kafka的以下目录:
opt/kafka/bin/inputfile.txt文件
运行Kafka的有用命令: sudo /opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic consumer6
/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < /opt/kafka/bin/inputFile.txt /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic consumer6 --from-beginning
谢谢你,泽纳布
暂无答案!
目前还没有任何答案,快来回答吧!