我们在2节点集群中使用Cassandra 3.5和Spark 1.6.1(每个节点8个内核和16 G内存)。
下面是 cassandra 表
CREATE TABLE schema.trade (
symbol text,
date int,
trade_time timestamp,
reporting_venue text,
trade_id bigint,
ref_trade_id bigint,
action_type text,
price double,
quantity int,
condition_code text,
PRIMARY KEY ((symbol, date), trade_time, trade_id)
) WITH compaction = {'class' : 'DateTieredCompactionStrategy', 'base_time_seconds' : '3600', 'max_sstable_age_days' : '93'};
我想计算体积百分比:在按交易所和时间条(1或5分钟)分组的时间段内,相关证券的所有交易量之和。我创建了一个示例:
void getPercentageOfVolume(String symbol, Integer date, Timestamp timeFrom, Timestamp timeTill, Integer barWidth) {
char MILLISECOND_TO_MINUTE_MULTIPLIER = 60_000;
LOG.info("start");
JavaPairRDD<Tuple2, Integer> counts = javaFunctions(sparkContext).cassandraTable("schema", "trade")
.filter(row ->
row.getString("symbol").equals(symbol) && row.getInt("date").equals(date) &&
row.getDateTime("trade_time").getMillis() >= timeFrom.getTime() &&
row.getDateTime("trade_time").getMillis() < timeTill.getTime())
.mapToPair(row ->
new Tuple2<>(
new Tuple2(
new Timestamp(
(row.getDateTime("trade_time").getMillis() / (barWidth * MILLISECOND_TO_MINUTE_MULTIPLIER)) * barWidth * MILLISECOND_TO_MINUTE_MULTIPLIER
),
row.getString("reporting_venue")),
row.getInt("quantity")
)
).reduceByKey((a, b) -> a + b);
LOG.info(counts.collect().toString());
LOG.info("finish");
}
[2016-06-15 09:25:27.014] [详细信息] [主要] [股票TCA分析]开始
[2016-06-15 09:25:28.000] [详细信息] [main] [NettyUtil]在类路径中找到Netty的本机epoll传输,正在使用它
[2016-06-15 09:25:28.518] [详细信息] [主目录] [集群]新增Cassandra主机/节点1:9042
[2016-06-15 09:25:28.519] [主要] [本地节点第一负载平衡策略]已添加主机节点1(数据中心1)
[2016-06-15 09:25:28.519] [主要] [集群]新的Cassandra主机/节点2:9042已添加
[2016-06-15 09:25:28.520] [详细信息] [主要信息] [CassandraConnector]已连接到Cassandra集群: cassandra
[2016-06-15 09:25:29.115] [主要] [SparkContext]正在启动作业:在www.example.com上收集EquityTCAAnalytics.java:88
[2016-06-15 09:25:29.385] [信息] [数据库调度器事件循环] [数据库调度器]注册RDD 2(在EquityTCAAnalytics.java:78上Map到对)
[2016-06-15 09:25:29.388] [信息] [dag-scheduler-event-loop] [DAGScheduler]已获取作业0(在EquityTCAAnalytics.java:88处收集),具有5个输出分区
[2016-06-15 09:25:29.388] [详细信息] [DAGScheduler]最后阶段:结果阶段1(收集于EquityTCAAnalytics.java:88)
[2016-06-15 09:25:29.389] [详细信息] [DAGScheduler]最后阶段的父级:列表(随机Map阶段0)
[2016-06-15 09:25:29.391] [详细信息] [DAGScheduler事件循环] [DAGScheduler]缺少父项:列表(随机Map阶段0)
[2016-06-15 09:25:29.400] [信息] [dag-scheduler-event-loop] [DAGScheduler]提交洗牌Map阶段0(Map分区RDD [2],位于EquityTCAAnalytics.java:78的mapToPair),该阶段没有缺失的父级
[2016-06-15 09:25:29.594] [信息] [dag-scheduler-event-loop] [MemoryStore]块广播_0作为值存储在内存中(估计大小为10.8 KB,可用大小为10.8 KB)
[2016-06-15 09:25:29.642] [信息] [dag-scheduler-event-loop] [MemoryStore]块广播_0_piece0以字节形式存储在内存中(估计大小为5.4 KB,可用大小为16.3 KB)
[2016-06-15 09:25:29.647] [信息] [调度程序-事件-循环-7] [块管理器信息]在节点2的内存中添加了broadcast_0_piece0:44871(大小:5.4 KB,可用空间:2.4 GB)
[2016-06-15 09:25:29.650] [信息] [dag-scheduler-event-loop] [SparkContext]从DAGScheduler的广播中创建广播0。标量:1006
[2016-06-15 09:25:29.658] [详细信息] [DAGScheduler]正在提交来自ShuffleMapStage 0的5个缺失任务(MapPartitionsRDD[2] at mapToPair at EquityTCAAnalytics.java:78)
[2016-06-15 09:25:29.661] [信息] [dag-scheduler-event-loop] [TaskSchedulerImpl]正在添加包含5个任务的任务集0.0
[2016-06-15 09:25:30.006] [信息] [调度程序-事件-循环-7] [SparkDeploySchedulerBackend]已注册的执行程序NettyRpcEndpointRef(空)(节点1:41122),ID为0
[2016-06-15 09:25:30.040] [信息] [调度程序-事件-循环-7] [任务集管理器]正在启动阶段0.0中的任务0.0(TID 0,节点1,分区0,节点本地,11725字节)
[2016-06-15 09:25:30.051] [信息] [调度程序-事件-循环-7] [任务集管理器]在阶段0.0中启动任务1.0(TID 1,节点1,分区1,节点本地,11317字节)
[2016-06-15 09:25:30.054] [信息] [调度程序-事件-循环-7] [任务集管理器]在阶段0.0中启动任务2.0(TID 2,节点1,分区2,节点本地,11929字节)
[2016-06-15 09:25:30.057] [信息] [调度程序-事件-循环-7] [任务集管理器]在阶段0.0中启动任务3.0(TID 3,节点1,分区3,节点本地,11249字节)
[2016-06-15 09:25:30.059] [信息] [调度程序-事件-循环-7] [任务集管理器]在阶段0.0中启动任务4.0(TID 4,节点1,分区4,NODE_LOCAL,11560字节)
[2016-06-15 09:25:30.077] [信息] [调度程序事件循环-7] [SparkDeploySchedulerBackend]已注册的执行程序NettyRpcEndpointRef(空)(CassandraCH4.ehubprod.local:33668),ID为1
[2016-06-15 09:25:30.111] [信息] [调度程序-事件-循环-4] [块管理器主端点]注册块管理器节点1:36512,内存为511.1 MB,块管理器Id(0,节点1,36512)
[2016-06-15 09:25:30.168] [信息] [调度程序-事件-循环-3] [块管理器主端点]注册块管理器CassandraCH 4. ehubprod. local:33610,内存为511. 1 MB,块管理器ID(1,CassandraCH 4. ehubprod. local,33610)
[2016-06-15 09:25:30.818] [信息] [调度程序-事件-循环-2] [块管理器信息]在节点1的内存中添加了broadcast_0_piece0:36512(大小:5.4 KB,可用空间:511.1兆字节)
[2016-06-15 09:25:36.764] [信息] [池-21-线程-1] [CassandraConnector]已从Cassandra集群断开连接: cassandra
[2016-06- 1/5 09:25:48.914] [信息] [任务结果获取器-0] [任务集管理器]在节点1上的阶段0.0(TID 4)中的任务4.0已在18854毫秒内完成(1/5)
[2016-06-15 09:2/5:55.541] [信息] [任务结果获取器-1] [任务集管理器]在节点1上于25489毫秒内完成了阶段0.0(TID 2)中的任务2.0(2/5)
[2016-06-15 09:25:57.837] [信息] [任务结果获取器-2] [任务集管理器]在27795毫秒内在节点1上完成了阶段0.0(TID 1)中的任务1.0(3/5)
[2016-06-15 09:25:57.931] [信息] [任务结果获取器-3] [任务集管理器]在27919毫秒内在节点1上完成了阶段0.0(TID 0)中的任务0.0(4/5)
[2016-06-15 09:26:01.357] [信息] [任务结果获取器-0] [任务集管理器]在31302毫秒内在节点1上完成了阶段0.0(TID 3)中的任务3.0(5/5)
[2016-06-15 09:26:01.358] [数据] [数据调度器事件循环] [数据调度器] ShuffleMapStage 0(在EquityTCAAnalytics.java:78上的Map到对)在31.602秒内完成
[2016-06-15 09:26:01.360] [信息] [dag-scheduler-event-loop] [DAGScheduler]正在查找新的可运行阶段
[2016-06-15 09:26:01.360] [信息] [数据库调度程序事件循环] [数据库调度程序]正在运行:设定()
[2016-06-15 09:26:01.360] [信息] [任务结果获取器-0] [TaskSchedulerImpl]已从池中删除任务集0.0,其任务已全部完成
[2016-06-15 09:26:01.362] [信息] [数据库调度程序事件循环] [数据库调度程序]等待:设置(结果阶段1)
[2016-06-15 09:26:01.362] [信息] [数据库调度程序事件循环] [数据库调度程序]失败:设定()
[2016-06-15 09:26:01.365] [信息] [dag-scheduler-event-loop] [DAGScheduler]正在提交结果阶段1(在EquityTCAAnalytics.java:87的reduceByKey处的ShuffledRDD[3]),该阶段没有缺失的父级
[2016-06-15 09:26:01.373] [信息] [dag-scheduler-event-loop] [MemoryStore]块广播_1作为值存储在内存中(估计大小为3.6 KB,可用空间为19.9 KB)
[2016-06-15 09:26:01.382] [信息] [dag-scheduler-event-loop] [MemoryStore]块broadcast_1_piece0以字节形式存储在内存中(估计大小为2.1 KB,可用大小为21.9 KB)
[2016-06-15 09:26:01.383] [信息] [调度程序-事件-循环-1] [块管理器信息]在节点2的内存中添加了broadcast_1_piece0:44871(大小:2.1 KB,可用空间:2.4 GB)
[2016-06-15 09:26:01.384] [信息] [dag-scheduler-event-loop] [SparkContext]从DAGScheduler的广播中创建广播1。标量:1006
[2016-06-15 09:26:01.385] [信息] [dag-scheduler-event-loop] [DAGScheduler]从结果阶段1提交5个缺失的任务(在EquityTCAAnalytics.java:87的reduceByKey上的ShuffledRDD[3])
[2016-06-15 09:26:01.386] [信息] [dag-scheduler-event-loop] [TaskSchedulerImpl]正在添加包含5个任务的任务集1.0
[2016-06-15 09:26:01.390] [信息] [调度程序-事件-循环-4] [任务集管理器]在阶段1.0中启动任务0.0(TID 5,节点1,分区0,NODE_LOCAL,2786字节)
[2016-06-15 09:26:01.390] [信息] [调度程序-事件-循环-4] [任务集管理器]在阶段1.0中启动任务1.0(TID 6,节点1,分区1,节点本地,2786字节)
[2016-06-15 09:26:01.397] [信息] [调度程序-事件-循环-4] [任务集管理器]在阶段1.0中启动任务2.0(TID 7,节点1,分区2,节点本地,2786字节)
[2016-06-15 09:26:01.398] [信息] [调度程序-事件-循环-4] [任务集管理器]在阶段1.0中启动任务3.0(TID 8,节点1,分区3,节点本地,2786字节)
[2016-06-15 09:26:01.406] [信息] [调度程序-事件-循环-4] [任务集管理器]在阶段1.0中启动任务4.0(TID 9,节点1,分区4,节点本地,2786字节)
[2016-06-15 09:26:01.429] [信息] [调度程序-事件-循环-4] [块管理器信息]在节点1的内存中添加了broadcast_1_piece0:36512(大小:2.1 KB,可用空间:511.1兆字节)
[2016-06-15 09:26:01.452] [信息] [dispatcher-event-loop-6] [MapOutputTrackerMasterEndpoint]已请求将用于shuffle 0的Map输出位置发送到节点1:41122
[2016-06-15 09:26:01.456] [信息] [dispatcher-event-loop-6] [MapOutputTrackerMaster]混洗0的输出状态大小为161字节
[2016-06- 1/5 09:26:01.526] [信息] [任务结果获取器-1] [任务集管理器]在节点1上的阶段1.0(TID 9)中的任务4.0在128毫秒内完成(1/5)
[2016-06-15 09:26:01.575] [信息] [任务结果获取器-3] [任务集管理器]在节点1上的阶段1.0(TID 7)中的任务2.0在184毫秒内完成(2/5)
[2016-06-15 09:26:01.580] [信息] [任务结果获取器-2] [任务集管理器]在193毫秒内在节点1上完成了阶段1.0(TID 5)中的任务0.0(3/5)
[2016-06-15 09:26:01.589] [信息] [任务结果获取器-3] [任务集管理器]在节点1上的阶段1.0(TID 6)中的任务1.0已在199毫秒内完成(4/5)
[2016-06-15 09:26:01.599] [信息] [任务结果获取器-2] [任务集管理器]在200毫秒内在节点1上完成了阶段1.0(TID 8)中的任务3.0(5/5)
[2016-06-15 09:26:01.599] [INFO ] [任务结果获取器-2] [TaskSchedulerImpl]已从池中删除任务集1.0,其任务已全部完成
[2016-06-15 09:26:01.599] [详细信息] [DAGScheduler]结果阶段1(在EquityTCAAnalytics.java:88收集)在0.202秒内完成
[2016-06-15 09:26:01.612] [详细信息] [主要] [DAGScheduler]作业0已完成:在EquityTCAAnalytics.java:88收集,花费了32.496470秒
[2016-06-10 13:45:00.0,DA),6944),((2016-06-10 14:25:00.0,B),5241),((2016-06-10 10:55:00.0,QD),109080),((2016 - 06 - 10 14:55:00.0,A),1300)]
[2016-06-15 09:26:01.641] [详细信息] [主要] [股权TCA分析]完成
32.5秒正常吗?
1条答案
按热度按时间2ic8powd1#
我认为,CPU和/或内存使用率的%%将是一个起点。如果您的内核未得到充分利用,则可能意味着您的进程不够并行。内存大小取决于您的数据,但通常是使用更多的RAM,而不是返回IO。