Apache spark 无法编写语句

我将spark与cassandra一起使用,我想将数据写入我的cassandra表: CREATE TABLE IF NOT EXISTS MyTable( user TEXT, date TIMESTAMP, event TEXT, PRIMARY KEY((user ),date , event) ); 但我有一个错误: java.io.IOException: Failed to write statements to KeySpace.MyTable. at co

Apache spark 为了运行SparkR,我是否需要在纱线集群中的每个工作节点中安装R?

我用的是amplab extras/SparkR软件包。测试并通过本地机器的试运行。我将在纱线簇(cdh5.4)上运行它。我是否需要在每个数据节点中安装R 是的。每个工人都必须能够访问本地R口译员 就我个人而言,我建议不要使用旧的SparkR。忽略本文中描述的问题,低级别的RDDAPI不太可能回到SparkR中来。每个工人都必须能够访问本地R口译员 就我个人而言,我建议不要使用旧的SparkR。忽略中描述的问题,低级别RDDAPI不太可能回到SparkR中我觉得使用低级别RDD的旧SparkR非

Apache spark spark提交执行器内存/批处理失败

关于spark streaming,我有两个问题: 我有一个spark streaming应用程序在20秒内运行并收集数据,在4000个批次中,有18个批次因异常而失败: 无法计算拆分,未找到块输入-0-146477410087 我假设此时的数据大小大于可用内存,并且应用程序StorageLevel仅为memory\u 请建议如何修复此问题 同样在我下面使用的命令中,我使用executor memory 20G(数据节点上的总RAM为140G),这是否意味着所有内存都已为该应用程序完全保留,如果

Apache spark SparkSQL— ;collect\u set和sort\u数组未正确排序整型列

我想在SparkSQL中生成一个已排序、已收集的集合,如下所示: spark.sql("SELECT id, col_2, sort_array(collect_set(value)) AS collected FROM my_table GROUP BY id, col_2").show() 其中,值是一个整数 但它无法按正确的数字顺序对数组进行排序——并且做了一些特别的事情(改为在值中第一个数字的开头进行排序?排序数组是否在字符串上运行?) 因此,不是: +----+

Apache spark Pyspark-如何进行不区分大小写的数据帧连接?

Pyspark中有没有好看的代码来执行不区分大小写的join? 比如: df3=df1.join(df2, [“col1”、“col2”、“col3”], “左外”, “不区分大小写”) 或者您的工作解决方案是什么?我认为实现这一点的最佳方法是将每个键列转换为大写或小写(可能创建新列或仅对其应用该转换),然后应用联接。这并不十分优雅,但是,您可以创建这些列的新小写版本,只用于连接 import pyspark.sql.functions as F df1_l = df1 \ .with

Apache spark Spark MatrixFactoryModel在Recommends Products Forser调用时崩溃

我在Apache Spark 1.6.0中有一个隐式MatrixFactorizationModel,超过300万用户和3万项。现在,我想用如下代码计算所有用户的前10个推荐项目: val model = MatrixFactorizationModel.load(sc, "/hdfs/path/to/model") model.userFeatures.cache model.productFeatures.cache val recommendations: RDD[(Int, Array[

Apache spark spark广播变量的缺点是什么?

我已经在SO中阅读了spark doc和其他相关Q&A,但我仍然不清楚有关spark广播变量的一些细节,特别是粗体的声明: Spark操作通过一组阶段执行,由分布式“洗牌”操作分隔Spark自动广播每个阶段中任务所需的公共数据。以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前进行反序列化。这意味着只有当跨多个阶段的任务需要相同的数据时,或者当以反序列化形式缓存数据很重要时,显式创建广播变量才有用 什么是“公共数据” 如果变量仅在1个阶段中使用,是否意味着无论其内存占用情况如何,它都没有

Apache spark 如何使用RESTAPI请求applicationID

尝试使用HORTONWORKS 2.5中的Pi执行spark程序 我关注了以下链接: 我面临的问题是: 在第5步中:从纱线请求应用程序ID curl -ikvu "knoxuser:knoxpwd" -X POST "https://$KNOX_SERVER:8443/gateway/default/resourcemanager/v1/cluster/apps/new-application" 错误: 即将连接()到$KNOX_服务器端口8443(#0)*正在尝试53.244.194.23

Apache spark 没有分区的当前分配:<;主题划分>;在重新加入期间;在运行时期间添加新分区不起作用

我有一个使用的流媒体应用程序。我从一个分区、一个消费者实例和三个代理开始。 我想按顺序做以下两件事: 1) 在运行时向主题添加新分区,我使用: bin/kafka-topics.sh --zookeeper 192.168.101.164:2181 --alter --topic topic10 --partitions 2 2) 将新的使用者实例添加到特定的使用者组中,我通过将相同的使用者代码作为单独的进程来执行此操作 但是,在执行1时:我没有看到(唯一)消费者实例使用新添加的分区。它只使用

Apache spark 如何在Pyspark中的循环中使用相同的spark上下文

我有一个shell脚本,它调用pyspark作业并在配置单元中创建表。剧本写得很好。现在,我们希望减少脚本在调用ascron作业时运行所需的时间,以获得100多个表 下面是import.sh脚本 #!/bin/bash source /home/$USER/source.sh [ $# -ne 1 ] && { echo "Usage : $0 table "; exit 1; } table=$1 TIMESTAMP=`date "+%Y-%m-%d"` touch /

Apache spark Kafka直接API批输入大小

根据Kafka Direct API,输入记录的数量计算如下 maxInputSize = maxRatePerPartition * #numOfPartitions# * #BATCH_DURATION_SECONDS# 我真的不明白为什么输入大小是这样确定的。假设我的作业在5分钟内处理100个文件 如果我设置maxRatePerPartition=1,主题中的numOfPartitions为6,那么批处理持续时间应该是多少,因为如果我将批处理持续时间秒设置为300,我将获取1800个文件

Apache spark 如何在Spark UDF中编写多个If语句

我的要求是为年龄创建类别。我试图在UDF中编写多个if条件,但它采用else条件。我的代码如下 我的数据 1,Ashok,23,asd 2,Joi,27,dfs 3,Sam,30,dft 4,Bob,37,dat 我的代码 val sqlContext = new org.apache.spark.sql.SQLContext(sc) import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ impo

Apache spark 在备用名称节点上访问spark作为kubernetics容器运行的UI

我们将所有spark作业作为k8es集群内的kubernetics(k8es)容器发布。我们还为每个作业创建一个服务,并为spark UI进行端口转发(容器的4040映射到SvcPort,比如31123)。 同一组节点也承载一个纱线簇。 容器的entry命令是在客户端模式下调用spark submit to Thread 现在,spark容器可以在任何名称节点(主节点或备用节点)上启动。存在分配给活动名称节点的VIP 当spark容器在活动名称节点上启动时,可以使用VIP:SvcPort从任何位

Apache spark 获取PredictionIO应用程序中的总事件时出错

我对使用v0.11版本孵化(spark-2.6.1,Hbase-1.2.6,ElasticSearch-5.2.1)的PredictionIO还不熟悉。使用/pio start all启动预测服务器,并选中pio status这些都工作正常。然后我创建了一个应用程序“testApp”,并将一些事件导入到该PredictionIO应用程序中。现在,为了验证导入事件的计数,我运行了以下命令: pio外壳——带火花 import org.apache.predictionio.data.store.P

Apache spark 修复spark结构化流中的检查点

当spark无法从\u spark\u元数据文件夹中找到文件时,我在生产中遇到了检查点问题 18/05/04 16:59:55 INFO FileStreamSinkLog: Set the compact interval to 10 [defaultCompactInterval: 10] 18/05/04 16:59:55 INFO DelegatingS3FileSystem: Getting file status for 's3u://data-bucket-prod/data/in

Apache spark 用于执行列更新的Spark map函数

这里是Scala 2.11。我有以下inputDB表: [input] === id BIGINT UNSIGNED NOT NULL, name VARCHAR(50) NOT NULL, rank INT NOT NULL 我把一些input记录读到SparkDataFrame中,如下所示: val inputDf = sqlContext().read .format("blah whatever") .option("url", "jdbc://blah://whate

Apache spark 通过Spark计算共享数据集

我有一个巨大的数据文件(200Gb+),其中包含每天的度量(数百万个度量) 对于每个指标,我必须根据预定义的一组时间段(例如10、50、100、365天)计算一些值 计算每天进行,时段不变,每次计算所有时段 结果可以重复使用(从10个系列可以重复使用50个,以此类推,从50个系列可以重复使用100个,以此类推) 文件中的记录未排序 我想知道是否有一些Spark模式可以应用于一次性读取文件、缓存#2的结果等。我不太确定您的实现,但是如果您想缓存巨大的数据集并在Spark作业之间共享,您可以查看 简

Apache spark 如何在pyspark中使用小写并删除原始列?

我有一个非常大的数据测试。包含文本。我想把一切都简化我这样做了: df1=df。选择(“*”,下(列('name')) 但它创建了一个名为lower(name)的新列。我不想保留上一栏。因此,我删除了以下内容: df2=df1.drop(*'title\u split') 但删除它需要很多时间。我怎样才能使它更快?我可以把它改成小写而不保留前一个吗?您可以使用with column替换旧列: df1 = df.withColumn('name2', lower(col('name'))).dro

Apache spark Pyspark将列合并到键、值对列表中(无UDF)

我想创建一个新列,它是其他一些列的JSON表示。列表中的键、值对 资料来源: 起源 目的地 计数 多伦多 渥太华 5. 蒙特利尔 温哥华 10 解决这一问题的方法: import pyspark.sql.functions as F df2 = df.withColumn( 'json', F.array( F.to_json(F.struct('origin')), F.to_json(F.struct('destination')),

Apache spark 使用PySpark的ALS训练抛出StackOverflower错误

当尝试在windows上使用Spark的MLLib(1.4)中的ALS来训练机器学习模型时,Pyspark总是以StackOverflowerError终止。我尝试按中所述添加检查点——似乎没有帮助(尽管每次运行都会创建一个新目录,但它总是空的) 以下是培训代码和堆栈跟踪: ranks = [8, 12] lambdas = [0.1, 10.0] numIters = [10, 20] bestModel = None bestValidationRmse = float("inf") bes

Apache spark 什么是zookeeper.broker.path

我正在学习Spark和Kafka,并偶然发现了这个项目,它似乎能有效地使用来自Kafka的信息。这个项目需要配置一些卡夫卡和zookeeper属性,这正是我努力的地方。我的意思是这个属性是什么意思?对不起,如果这是一个基本问题的话 我已在单节点中配置kafka,并具有以下属性 broker.id=1 port=9093 log.dir=/tmp/kafka-logs-1 和动物园管理员一样 zookeeper.connect=localhost:2181/brokers zookeeper.c

Apache spark 运行时错误:ClassNotFound异常与Spark

我想自己运行一个OOTB(1.6版),并以此为基础进行构建。我能够按原样编译和运行该示例,并将其与其他代码示例捆绑在一起。 即: 然而,我不能这样做(相同的代码)在我自己的项目。 有什么帮助吗 build.sbt: import sbtassembly.AssemblyKeys name := "stream-test" version := "1.0" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"

Apache spark 序列化是否会降低Spark性能?

鉴于以下案例类别: case class User(name:String, age:Int) 从用户实例的列表创建RDD 下面的代码过滤RDD以删除50岁以上的用户 trait Process { def test { val rdd = ... // create RDD rdd.filter(_.age>50) } } 为了添加日志记录,将创建一个单独的验证函数并将其传递给过滤器,如下所示: trait Process { def validate(u

Apache spark 火花相关系数

我有一个具体的应用程序,我试图验证我正在读取的许多时间序列数据之间的强正相关关系。我要详细说明: 我有很多演员是分布式的,他们产生了一些 大量的时间序列数据流。人数 演员*时间序列流非常大,所以按顺序使用它们 对于我来说,具体的回归分析是非常昂贵的。所以我选择了 采样,我得到了稳健的估计 问题是,;我需要验证这个想法,为了 验证它,我想做随机变量之间的“相关系数” 对这些时间序列进行采样,并为其创建高斯分布 并将相关性的平均值和标准差分配给 演员们。显示哪些演员正在制作更多相关时间 应用程序域

Apache spark 火花设置在纱线中不起作用

我已经准备好了 spark.executor.extraJavaOptions to –XX:+UseG1GC spark.storage.memoryFraction to 0.3 我可以在环境页面中看到它们。但在executors页中,存储内存仍在分配默认数量。我没有看到在启动执行器时使用Java选项。我使用SparkConf.set()在应用程序中设置这些环境 Map params=newhashmap(); 参数put(“spark.kryoserializer.buffer”,

Apache spark 应用中使用嵌入式Spark的缺点

我有一个用例,在这个用例中,我在应用服务器内部启动本地spark(嵌入式),而不是使用spark rest作业服务器或内核。因为前者(embedded spark)的延迟非常低。我对……感兴趣 这种方法的缺点(如果有) 在生产中可以使用同样的方法吗 另外,这里优先考虑低延迟 编辑:大多数情况下正在处理的数据大小将小于100mb。我认为这根本不是一个缺点。如果您看一下Spark项目本身中的实现,那么在这个过程中,他们还管理SQLContext等。如果数据量很小且驾驶员可以轻松处理,则情况尤其如

Apache spark Spark Cassandra连接器中的错误查询错误处理

我有一个Spark Streaming应用程序,它有多个数据流(DStream),写入同一个Cassandra表。当在大量随机数据上测试我的应用程序时,我从Spark Cassandra Connector收到一个错误,该错误几乎没有对调试有用的信息。错误如下所示: java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.InvalidQueryException: Key may not be e

Apache spark 检查点设置时foreachRDD()中使用的对象的序列化

根据我所阅读的文件和资料,Spark Streaming的foreachRDD(someFunction)将只在驱动程序进程中执行someFunction本身,尽管如果在RDD上执行操作,那么它们将在RDD所在的执行器上执行 所有这些对我来说都很有用,尽管我注意到,如果我打开检查点,那么spark似乎正在尝试序列化foreachRDD(someFunction)中的所有内容并发送到某个地方——这对我来说是一个问题,因为使用的一个对象是不可序列化的(即schemaRegistryClient)。我

Apache spark 在spark streaming中将RDD打印到控制台

我编写了一个spark流应用程序,通过使用KafkaUtils从Kafka接收数据,我想做的是打印出我从Kafka接收到的数据。以下是我的代码(我使用spark submit执行spark流媒体作业): 当我运行这个时,它运行得很好。如果Kafka producer中的输入为a、b、c,我可以从Spark streaming中获得如下结果: Time: 1476481700000 ms ------------------------------------------- (null,a) (

Apache spark 使用StandardScaler时SparseVector与DenseVector的比较

我使用以下代码来规范化PySpark数据帧 来自pyspark.ml.feature导入StandardScaler、VectorAssembler 从pyspark.ml导入管道 cols=[“a”、“b”、“c”] df=spark.createDataFrame([(1,0,3),(2,3,2),(1,3,1),(3,0,3)],cols) 管道(阶段)=[ 矢量汇编程序(inputCols=cols,outputCol='features'), StandardScaler(withMe

Apache spark 火花。修改密钥后保留分区器

首先,如果这是一个垃圾问题,很抱歉,我对Spark有点陌生 我试图在Spark中执行一些组操作,并试图在修改RDD的键时避免额外的洗牌 原始RDD是json字符串 简化逻辑我的代码如下所示: case class Key1 (a: String, b: String) val grouped1: RDD[(Key1, String)] = rdd1.keyBy(generateKey1(_)) val grouped2: RDD[(Key1, String)] = rdd2.keyBy(gen

Apache spark 在Apache Spark中,如何使用Spark env变量或start-slaves.sh在多个worker上实现--host[host]行为

在Spark standalone模式下,当我使用start-slave.sh启动工作程序时spark://master:7077我可以指定工作人员监听的主机--host[host]。如果一个人有多个工人,这是不实际的 我如何才能实现Worker以指定的主机名出现在master WebUI中,而不逐个启动它们? 它是否有环境变量? 或者我可以通过某种方式将特定于worker的参数传递给starter脚本(start slaves.sh) 例如: 形态/从属: worker1 worker2 wo

Apache spark spark wholetextfiles会拾取部分创建的文件吗?

我正在使用Spark wholeTextFiles API从源文件夹读取文件并将其加载到配置单元表 文件从远程服务器到达源文件夹。文件的大小非常大,如1GB-3GB。文件的SCP需要相当长的时间 如果我启动spark作业,并且文件被SCPd发送到源文件夹,并且进程进行到一半,spark会选择该文件吗 如果spark在中途选择文件,这将是一个问题,因为它将忽略文件的其余内容。如果将文件插入源文件夹;然后spark从文件夹中读取数据;由于SCP可能需要一些时间来复制,所以spark可能会选择写了一半

Apache spark Spark sql将数据帧保存到配置单元

您好,我正在使用java编写一些sparksql代码。我有一个类,如下所示: public class Item_Meta { private String itemId; private String category; private String description; private String properties;} 然后,我通过下面的语句从项目列表创建一个名为Dataset的数据集: Dataset<Row> dataset = sparkSession.sql

Apache spark “为什么?”;“创建表格”;产生一个空的数据帧? hc.sql(“创建表emp12(名称字符串)”; res13:org.apache.spark.sql.DataFrame=[] scala>res13.printSchema 根

为什么当我签入配置单元数据仓库时,数据框是空的,而表是在配置单元中创建的 hive> describe emp12; OK name string 即使从Spark加载数据,数据也不会进入配置单元表。sql方法将查询结果作为DataFrame返回,因此它仅适用于实际返回任何数据的sql语句CREATE TABLE并不是其中之一,它是一个SQL(逻辑)命令,仅针对其副作用执行,即在目录中注册一个表 如果要获取已发布的表,请发出单独的查询: hc.sql

Apache spark Spark Standalone-Tmp文件夹

我在集群的一个节点上使用带有Pyspark内核的Jupyter笔记本,问题是我的/tmp文件夹总是满的。我已经更新了参数: SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=172800" 问题是这个文件夹只有200GB,当我在Jupyter中关闭内核时,有没有办法说spark clean?或者我应该将Dspark.worker.cleanup.appDataTtl设

Apache spark 我应该使用哪个版本的hadoop aws

我在EMR5.14(hadoop 2.8.3)上运行spark作业 我可以使用hadoop aws的高级版本(例如2.9或3.1)从s3a协议的最新优化中获益吗?您需要坚持EMR提供的任何功能。他们的s3://连接器是AWS开发的,可能是您最安全的选择 FWIW,s3a,从2.8.3中开始,用于输入性能。与更高版本相比没有太大变化,除了在3.1中,如果将fs.s3a.experimental.fadvise保留为normal,它会在第一次反向搜索时自动从顺序IO优化切换为随机IO(列数据)。如果

Apache spark Hadoop的spark 2.4最佳版本

我是大数据(spark)的初学者,现在我已经安装了spark2.4,所以我想知道应该选择哪个版本最好。因为我想避免冲突,这太糟糕了。ApacheHadoop2.7.7是一个可以使用的稳定版本。但是,我已经安装了ApacheHadoop3.1.1并运行了Spark2.4,没有发现任何问题 如果您这样做是为了更好地实践,请使用3.1.1版本,对于生产,我建议使用更稳定的版本 谢谢, NaveenApacheHadoop2.7.7是一个稳定的版本。但是,我已经安装了ApacheHadoop3.1.1并

Apache spark Spark RDD treeReduce与Dataset reduce

Spark RDD有一个称为treeReduce的reduce变体,它非常有效,因为它通过将reduce作为一个层次结构来改进并行性 Spark数据集没有这种变化。数据集的reduce实现是否已经足够有效,或者是否有其他方法实现相同的行为

Apache spark 使用Pyspark读取elasticsearch索引

我试图使用Pyspark(v1.6.3)读取elasticsearch索引,但出现以下错误 我正在使用以下代码段读取/加载索引 es_reader = sql_context.read.format("org.elasticsearch.spark.sql") .option("es.nodes", "x.x.x.x,y.y.y.y,z.z.z.z") .option("es.port", "9200").option("e

Apache spark 在大型数据集上处理无分区窗口

我有一个数据集 col1、col2、col3、时间戳 8,XXXX,XXXX,时间 12,XXXX,XXXX,时间 15,XXXX,XXXX,时间 18,XXXX,XXXX,时间 (排序顺序是此处的时间) 我正在尝试基于上一行创建一个新列。我的方法是 w=Window.orderBy('timestamp')) df.select('*',when()) 此处通过引用前一行的滞后来进行时间和其他逻辑 这里的问题是spark如何处理这个问题??,因为数据集的大小非常大,超过100亿行,我只想了

Apache spark 查找每小时的广告点击次数

我对spark是个新手,我刚开始学习它。我遇到了一堵墙,我想在那里找到每小时的点击次数。鉴于此表: 到目前为止,我将时间戳转换为如下: timestamp_only = adclicks.selectExpr(["to_timestamp(timestamp) as timestamp"]) click_count_by_hour = adclicks.select("timestamp") click_count_by_hours.show(24) 我被困住了,下一步该怎么办?是否有我

  1    2   3   4   5   6  ... 下一页 最后一页 共 271 页