我将spark与cassandra一起使用,我想将数据写入我的cassandra表:
CREATE TABLE IF NOT EXISTS MyTable(
user TEXT,
date TIMESTAMP,
event TEXT,
PRIMARY KEY((user ),date , event)
);
但我有一个错误:
java.io.IOException: Failed to write statements to KeySpace.MyTable.
at co
每当我尝试像这样查询我的cassandra db时:
sc.parallelize(keys).repartitionByCassandraReplica("keyspace","mytable")
.joinWithCassandraTable("keyspace", "mytable")
.select("whatever")
我的spark作业将无限期地挂在日志中的这一行上:
INFO CassandraConnector: Disconnected from C
我用的是amplab extras/SparkR软件包。测试并通过本地机器的试运行。我将在纱线簇(cdh5.4)上运行它。我是否需要在每个数据节点中安装R 是的。每个工人都必须能够访问本地R口译员
就我个人而言,我建议不要使用旧的SparkR。忽略本文中描述的问题,低级别的RDDAPI不太可能回到SparkR中来。每个工人都必须能够访问本地R口译员
就我个人而言,我建议不要使用旧的SparkR。忽略中描述的问题,低级别RDDAPI不太可能回到SparkR中我觉得使用低级别RDD的旧SparkR非
关于spark streaming,我有两个问题:
我有一个spark streaming应用程序在20秒内运行并收集数据,在4000个批次中,有18个批次因异常而失败:
无法计算拆分,未找到块输入-0-146477410087
我假设此时的数据大小大于可用内存,并且应用程序StorageLevel仅为memory\u
请建议如何修复此问题
同样在我下面使用的命令中,我使用executor memory 20G(数据节点上的总RAM为140G),这是否意味着所有内存都已为该应用程序完全保留,如果
我想在SparkSQL中生成一个已排序、已收集的集合,如下所示:
spark.sql("SELECT id, col_2, sort_array(collect_set(value)) AS collected
FROM my_table GROUP BY id, col_2").show()
其中,值是一个整数
但它无法按正确的数字顺序对数组进行排序——并且做了一些特别的事情(改为在值中第一个数字的开头进行排序?排序数组是否在字符串上运行?)
因此,不是:
+----+
Pyspark中有没有好看的代码来执行不区分大小写的join?
比如:
df3=df1.join(df2,
[“col1”、“col2”、“col3”],
“左外”,
“不区分大小写”)
或者您的工作解决方案是什么?我认为实现这一点的最佳方法是将每个键列转换为大写或小写(可能创建新列或仅对其应用该转换),然后应用联接。这并不十分优雅,但是,您可以创建这些列的新小写版本,只用于连接
import pyspark.sql.functions as F
df1_l = df1 \
.with
我在Apache Spark 1.6.0中有一个隐式MatrixFactorizationModel,超过300万用户和3万项。现在,我想用如下代码计算所有用户的前10个推荐项目:
val model = MatrixFactorizationModel.load(sc, "/hdfs/path/to/model")
model.userFeatures.cache
model.productFeatures.cache
val recommendations: RDD[(Int, Array[
我已经在SO中阅读了spark doc和其他相关Q&A,但我仍然不清楚有关spark广播变量的一些细节,特别是粗体的声明:
Spark操作通过一组阶段执行,由分布式“洗牌”操作分隔Spark自动广播每个阶段中任务所需的公共数据。以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前进行反序列化。这意味着只有当跨多个阶段的任务需要相同的数据时,或者当以反序列化形式缓存数据很重要时,显式创建广播变量才有用
什么是“公共数据”
如果变量仅在1个阶段中使用,是否意味着无论其内存占用情况如何,它都没有
尝试使用HORTONWORKS 2.5中的Pi执行spark程序
我关注了以下链接:
我面临的问题是:
在第5步中:从纱线请求应用程序ID
curl -ikvu "knoxuser:knoxpwd" -X POST "https://$KNOX_SERVER:8443/gateway/default/resourcemanager/v1/cluster/apps/new-application"
错误:
即将连接()到$KNOX_服务器端口8443(#0)*正在尝试53.244.194.23
给定一个数据帧
+---+---+----+
| id| v|date|
+---+---+----+
| 1| a| 1|
| 2| a| 2|
| 3| b| 3|
| 4| b| 4|
+---+---+----+
我们想添加一列,平均值为date,乘以v
+---+---+----+---------+
| v| id|date|avg(date)|
+---+---+----+---------+
| a| 1| 1| 1.5|
我有一个使用的流媒体应用程序。我从一个分区、一个消费者实例和三个代理开始。
我想按顺序做以下两件事:
1) 在运行时向主题添加新分区,我使用:
bin/kafka-topics.sh --zookeeper 192.168.101.164:2181 --alter --topic topic10 --partitions 2
2) 将新的使用者实例添加到特定的使用者组中,我通过将相同的使用者代码作为单独的进程来执行此操作
但是,在执行1时:我没有看到(唯一)消费者实例使用新添加的分区。它只使用
我有一个shell脚本,它调用pyspark作业并在配置单元中创建表。剧本写得很好。现在,我们希望减少脚本在调用ascron作业时运行所需的时间,以获得100多个表
下面是import.sh脚本
#!/bin/bash
source /home/$USER/source.sh
[ $# -ne 1 ] && { echo "Usage : $0 table "; exit 1; }
table=$1
TIMESTAMP=`date "+%Y-%m-%d"`
touch /
标签: Apache Spark
spark-streamingkafka-consumer-apikafka-directconsumer
根据Kafka Direct API,输入记录的数量计算如下
maxInputSize = maxRatePerPartition * #numOfPartitions# * #BATCH_DURATION_SECONDS#
我真的不明白为什么输入大小是这样确定的。假设我的作业在5分钟内处理100个文件
如果我设置maxRatePerPartition=1,主题中的numOfPartitions为6,那么批处理持续时间应该是多少,因为如果我将批处理持续时间秒设置为300,我将获取1800个文件
我的要求是为年龄创建类别。我试图在UDF中编写多个if条件,但它采用else条件。我的代码如下
我的数据
1,Ashok,23,asd
2,Joi,27,dfs
3,Sam,30,dft
4,Bob,37,dat
我的代码
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
impo
我们将所有spark作业作为k8es集群内的kubernetics(k8es)容器发布。我们还为每个作业创建一个服务,并为spark UI进行端口转发(容器的4040映射到SvcPort,比如31123)。
同一组节点也承载一个纱线簇。
容器的entry命令是在客户端模式下调用spark submit to Thread
现在,spark容器可以在任何名称节点(主节点或备用节点)上启动。存在分配给活动名称节点的VIP
当spark容器在活动名称节点上启动时,可以使用VIP:SvcPort从任何位
我对使用v0.11版本孵化(spark-2.6.1,Hbase-1.2.6,ElasticSearch-5.2.1)的PredictionIO还不熟悉。使用/pio start all启动预测服务器,并选中pio status这些都工作正常。然后我创建了一个应用程序“testApp”,并将一些事件导入到该PredictionIO应用程序中。现在,为了验证导入事件的计数,我运行了以下命令:
pio外壳——带火花
import org.apache.predictionio.data.store.P
当spark无法从\u spark\u元数据文件夹中找到文件时,我在生产中遇到了检查点问题
18/05/04 16:59:55 INFO FileStreamSinkLog: Set the compact interval to 10 [defaultCompactInterval: 10]
18/05/04 16:59:55 INFO DelegatingS3FileSystem: Getting file status for 's3u://data-bucket-prod/data/in
这里是Scala 2.11。我有以下inputDB表:
[input]
===
id BIGINT UNSIGNED NOT NULL,
name VARCHAR(50) NOT NULL,
rank INT NOT NULL
我把一些input记录读到SparkDataFrame中,如下所示:
val inputDf = sqlContext().read
.format("blah whatever")
.option("url", "jdbc://blah://whate
我正在尝试使用ElasticSearch hadoop连接器在ElasticSearch中为下面模式的数据帧编制索引
|-- ROW_ID: long (nullable = false)
|-- SUBJECT_ID: long (nullable = false)
|-- HADM_ID: long (nullable = true)
|-- CHARTDATE: date (nullable = false)
|-- CATEGORY: string (nullable = fa
我有一个巨大的数据文件(200Gb+),其中包含每天的度量(数百万个度量)
对于每个指标,我必须根据预定义的一组时间段(例如10、50、100、365天)计算一些值
计算每天进行,时段不变,每次计算所有时段
结果可以重复使用(从10个系列可以重复使用50个,以此类推,从50个系列可以重复使用100个,以此类推)
文件中的记录未排序
我想知道是否有一些Spark模式可以应用于一次性读取文件、缓存#2的结果等。我不太确定您的实现,但是如果您想缓存巨大的数据集并在Spark作业之间共享,您可以查看
简
数据帧架构:
root
|-- ID: decimal(15,0) (nullable = true)
|-- COL1: array (nullable = true)
| |-- element: string (containsNull = true)
|-- COL2: array (nullable = true)
| |-- element: string (containsNull = true)
|-- COL3: array (nullable = tr
我有一个非常大的数据测试。包含文本。我想把一切都简化我这样做了:
df1=df。选择(“*”,下(列('name'))
但它创建了一个名为lower(name)的新列。我不想保留上一栏。因此,我删除了以下内容:
df2=df1.drop(*'title\u split')
但删除它需要很多时间。我怎样才能使它更快?我可以把它改成小写而不保留前一个吗?您可以使用with column替换旧列:
df1 = df.withColumn('name2', lower(col('name'))).dro
我想创建一个新列,它是其他一些列的JSON表示。列表中的键、值对
资料来源:
起源
目的地
计数
多伦多
渥太华
5.
蒙特利尔
温哥华
10
解决这一问题的方法:
import pyspark.sql.functions as F
df2 = df.withColumn(
'json',
F.array(
F.to_json(F.struct('origin')),
F.to_json(F.struct('destination')),
我在pyspark
df = spark.createDataFrame(
[(1,'Y'),
(2,'Y'),
(3,'N'),
(4,'N'),
(5,'N'),
(6,'Y'),
(7,'N')
],
('id', 'status')
)
df.show()
+---+------+
| id|status|
+---+------+
| 1| Y|
| 2| Y|
| 3| N|
| 4| N|
| 5| N|
| 6|
当尝试在windows上使用Spark的MLLib(1.4)中的ALS来训练机器学习模型时,Pyspark总是以StackOverflowerError终止。我尝试按中所述添加检查点——似乎没有帮助(尽管每次运行都会创建一个新目录,但它总是空的)
以下是培训代码和堆栈跟踪:
ranks = [8, 12]
lambdas = [0.1, 10.0]
numIters = [10, 20]
bestModel = None
bestValidationRmse = float("inf")
bes
我正在学习Spark和Kafka,并偶然发现了这个项目,它似乎能有效地使用来自Kafka的信息。这个项目需要配置一些卡夫卡和zookeeper属性,这正是我努力的地方。我的意思是这个属性是什么意思?对不起,如果这是一个基本问题的话
我已在单节点中配置kafka,并具有以下属性
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
和动物园管理员一样
zookeeper.connect=localhost:2181/brokers
zookeeper.c
我想自己运行一个OOTB(1.6版),并以此为基础进行构建。我能够按原样编译和运行该示例,并将其与其他代码示例捆绑在一起。
即:
然而,我不能这样做(相同的代码)在我自己的项目。
有什么帮助吗
build.sbt:
import sbtassembly.AssemblyKeys
name := "stream-test"
version := "1.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
鉴于以下案例类别:
case class User(name:String, age:Int)
从用户实例的列表创建RDD
下面的代码过滤RDD以删除50岁以上的用户
trait Process {
def test {
val rdd = ... // create RDD
rdd.filter(_.age>50)
}
}
为了添加日志记录,将创建一个单独的验证函数并将其传递给过滤器,如下所示:
trait Process {
def validate(u
我已经尝试将SPARK_LOCAL_IP设置为“127.0.0.1”,并检查端口是否已被占用。以下是完整的错误文本:
Launching java with spark-submit command /usr/hdp/2.4.0.0-
169/spark/bin/spark-submit "sparkr-shell" /tmp/RtmpZo44il/backend_port998540c56917
/usr/hdp/2.4.0.0-169/spark/bin/load-spar
我有一个具体的应用程序,我试图验证我正在读取的许多时间序列数据之间的强正相关关系。我要详细说明:
我有很多演员是分布式的,他们产生了一些
大量的时间序列数据流。人数
演员*时间序列流非常大,所以按顺序使用它们
对于我来说,具体的回归分析是非常昂贵的。所以我选择了
采样,我得到了稳健的估计
问题是,;我需要验证这个想法,为了
验证它,我想做随机变量之间的“相关系数”
对这些时间序列进行采样,并为其创建高斯分布
并将相关性的平均值和标准差分配给
演员们。显示哪些演员正在制作更多相关时间
应用程序域
我已经准备好了
spark.executor.extraJavaOptions to –XX:+UseG1GC
spark.storage.memoryFraction to 0.3
我可以在环境页面中看到它们。但在executors页中,存储内存仍在分配默认数量。我没有看到在启动执行器时使用Java选项。我使用SparkConf.set()在应用程序中设置这些环境
Map params=newhashmap();
参数put(“spark.kryoserializer.buffer”,
我有一个用例,在这个用例中,我在应用服务器内部启动本地spark(嵌入式),而不是使用spark rest作业服务器或内核。因为前者(embedded spark)的延迟非常低。我对……感兴趣
这种方法的缺点(如果有)
在生产中可以使用同样的方法吗
另外,这里优先考虑低延迟
编辑:大多数情况下正在处理的数据大小将小于100mb。我认为这根本不是一个缺点。如果您看一下Spark项目本身中的实现,那么在这个过程中,他们还管理SQLContext等。如果数据量很小且驾驶员可以轻松处理,则情况尤其如
我有一个Spark Streaming应用程序,它有多个数据流(DStream),写入同一个Cassandra表。当在大量随机数据上测试我的应用程序时,我从Spark Cassandra Connector收到一个错误,该错误几乎没有对调试有用的信息。错误如下所示:
java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.InvalidQueryException: Key may not be e
根据我所阅读的文件和资料,Spark Streaming的foreachRDD(someFunction)将只在驱动程序进程中执行someFunction本身,尽管如果在RDD上执行操作,那么它们将在RDD所在的执行器上执行
所有这些对我来说都很有用,尽管我注意到,如果我打开检查点,那么spark似乎正在尝试序列化foreachRDD(someFunction)中的所有内容并发送到某个地方——这对我来说是一个问题,因为使用的一个对象是不可序列化的(即schemaRegistryClient)。我
我正试图准确地确定哪些记录器负责apache spark日志记录中的以下类型的行:
[Stage 5:======> (24 + 1) / 200]
[Stage 5:==========> (38 + 1) / 200]
[Stage 5:==============>
我编写了一个spark流应用程序,通过使用KafkaUtils从Kafka接收数据,我想做的是打印出我从Kafka接收到的数据。以下是我的代码(我使用spark submit执行spark流媒体作业):
当我运行这个时,它运行得很好。如果Kafka producer中的输入为a、b、c,我可以从Spark streaming中获得如下结果:
Time: 1476481700000 ms
-------------------------------------------
(null,a)
(
我使用以下代码来规范化PySpark数据帧
来自pyspark.ml.feature导入StandardScaler、VectorAssembler
从pyspark.ml导入管道
cols=[“a”、“b”、“c”]
df=spark.createDataFrame([(1,0,3),(2,3,2),(1,3,1),(3,0,3)],cols)
管道(阶段)=[
矢量汇编程序(inputCols=cols,outputCol='features'),
StandardScaler(withMe
首先,如果这是一个垃圾问题,很抱歉,我对Spark有点陌生
我试图在Spark中执行一些组操作,并试图在修改RDD的键时避免额外的洗牌
原始RDD是json字符串
简化逻辑我的代码如下所示:
case class Key1 (a: String, b: String)
val grouped1: RDD[(Key1, String)] = rdd1.keyBy(generateKey1(_))
val grouped2: RDD[(Key1, String)] = rdd2.keyBy(gen
在Spark standalone模式下,当我使用start-slave.sh启动工作程序时spark://master:7077我可以指定工作人员监听的主机--host[host]。如果一个人有多个工人,这是不实际的
我如何才能实现Worker以指定的主机名出现在master WebUI中,而不逐个启动它们?
它是否有环境变量?
或者我可以通过某种方式将特定于worker的参数传递给starter脚本(start slaves.sh)
例如:
形态/从属:
worker1
worker2
wo
我正在使用Spark wholeTextFiles API从源文件夹读取文件并将其加载到配置单元表
文件从远程服务器到达源文件夹。文件的大小非常大,如1GB-3GB。文件的SCP需要相当长的时间
如果我启动spark作业,并且文件被SCPd发送到源文件夹,并且进程进行到一半,spark会选择该文件吗
如果spark在中途选择文件,这将是一个问题,因为它将忽略文件的其余内容。如果将文件插入源文件夹;然后spark从文件夹中读取数据;由于SCP可能需要一些时间来复制,所以spark可能会选择写了一半
您好,我正在使用java编写一些sparksql代码。我有一个类,如下所示:
public class Item_Meta {
private String itemId;
private String category;
private String description;
private String properties;}
然后,我通过下面的语句从项目列表创建一个名为Dataset的数据集:
Dataset<Row> dataset = sparkSession.sql
我使用Spark 2.2上的Spark结构化流媒体将文件从HDFS目录流式传输到Kafka主题。我想为我正在撰写的主题数据捕捉卡夫卡偏移量
我正在使用
val write = jsonDF
.writeStream.format("kafka")
.option("checkpointLocation", Config().getString(domain + ".kafkaCheckpoint"))
.option("kafka.bootstrap.servers", Config().get
为什么当我签入配置单元数据仓库时,数据框是空的,而表是在配置单元中创建的
hive> describe emp12;
OK
name string
即使从Spark加载数据,数据也不会进入配置单元表。sql方法将查询结果作为DataFrame返回,因此它仅适用于实际返回任何数据的sql语句CREATE TABLE并不是其中之一,它是一个SQL(逻辑)命令,仅针对其副作用执行,即在目录中注册一个表
如果要获取已发布的表,请发出单独的查询:
hc.sql
我在集群的一个节点上使用带有Pyspark内核的Jupyter笔记本,问题是我的/tmp文件夹总是满的。我已经更新了参数:
SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=172800"
问题是这个文件夹只有200GB,当我在Jupyter中关闭内核时,有没有办法说spark clean?或者我应该将Dspark.worker.cleanup.appDataTtl设
我在EMR5.14(hadoop 2.8.3)上运行spark作业
我可以使用hadoop aws的高级版本(例如2.9或3.1)从s3a协议的最新优化中获益吗?您需要坚持EMR提供的任何功能。他们的s3://连接器是AWS开发的,可能是您最安全的选择
FWIW,s3a,从2.8.3中开始,用于输入性能。与更高版本相比没有太大变化,除了在3.1中,如果将fs.s3a.experimental.fadvise保留为normal,它会在第一次反向搜索时自动从顺序IO优化切换为随机IO(列数据)。如果
我是大数据(spark)的初学者,现在我已经安装了spark2.4,所以我想知道应该选择哪个版本最好。因为我想避免冲突,这太糟糕了。ApacheHadoop2.7.7是一个可以使用的稳定版本。但是,我已经安装了ApacheHadoop3.1.1并运行了Spark2.4,没有发现任何问题
如果您这样做是为了更好地实践,请使用3.1.1版本,对于生产,我建议使用更稳定的版本
谢谢,
NaveenApacheHadoop2.7.7是一个稳定的版本。但是,我已经安装了ApacheHadoop3.1.1并
Spark RDD有一个称为treeReduce的reduce变体,它非常有效,因为它通过将reduce作为一个层次结构来改进并行性
Spark数据集没有这种变化。数据集的reduce实现是否已经足够有效,或者是否有其他方法实现相同的行为
我试图使用Pyspark(v1.6.3)读取elasticsearch索引,但出现以下错误
我正在使用以下代码段读取/加载索引
es_reader = sql_context.read.format("org.elasticsearch.spark.sql")
.option("es.nodes", "x.x.x.x,y.y.y.y,z.z.z.z")
.option("es.port", "9200").option("e
我有一个数据集
col1、col2、col3、时间戳
8,XXXX,XXXX,时间
12,XXXX,XXXX,时间
15,XXXX,XXXX,时间
18,XXXX,XXXX,时间
(排序顺序是此处的时间)
我正在尝试基于上一行创建一个新列。我的方法是
w=Window.orderBy('timestamp'))
df.select('*',when())
此处通过引用前一行的滞后来进行时间和其他逻辑
这里的问题是spark如何处理这个问题??,因为数据集的大小非常大,超过100亿行,我只想了
我对spark是个新手,我刚开始学习它。我遇到了一堵墙,我想在那里找到每小时的点击次数。鉴于此表:
到目前为止,我将时间戳转换为如下:
timestamp_only = adclicks.selectExpr(["to_timestamp(timestamp) as timestamp"])
click_count_by_hour = adclicks.select("timestamp")
click_count_by_hours.show(24)
我被困住了,下一步该怎么办?是否有我
1 2 3 4 5 6 ...
下一页 最后一页 共 271 页