开发者问题收集

Pyspark 中 RDD 到 DF 的转换不完整

2018-06-21
1445

使用 PySpark 1.6.3,我尝试将 RDD 转换为 Dataframe。这是在 Zeppelin 笔记本中运行的测试代码。感兴趣的 RDD 是 rdd_ret

>>> from pyspark.sql import Row
>>> rdd_ret.count()
9301
>>> rddofrows = rdd_ret.map(lambda x: Row(**x))
>>> things = rddofrows.take(10000)
>>> len(things)
9301
>>> [type(x) for x in things if type(x) != Row]
[]
>>> [len(x) for x in things if len(x) != 117]
[]

所以我们在这里看到,我们肯定有 9301 行,它们都是 Row 对象,并且所有行的长度都相同。现在我想转换为 DataFrame:

>>> outdf = rddofrows.toDF(sampleRatio=0.1)
>>> outdf.count()

这会引发错误: TypeError:'NoneType' 对象不可迭代 ;底部有完整的堆栈跟踪。

生成了输出 Dataframe 对象,但我尝试在其上运行的任何操作(.show();.count();.filter())都会在底部产生相同的堆栈跟踪。我不明白在这种情况下 NoneType 可能是什么?当然,Row 对象中的某些值可能有误,但为了计数或显示,您应该遍历 Dataframe 的所有行,这些行都在那里。

这是怎么回事?

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5665146503764823323.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark-client/python/pyspark/sql/dataframe.py", line 269, in count
    return int(self._jdf.count())
  File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark-client/python/pyspark/sql/utils.py", line 45, in deco
    return f(*a, **kw)
  File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o2282.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 21 in stage 1256.0 failed 4 times, most recent failure: Lost task 21.3 in stage 1256.0 (TID 62913, usg-kov-e1b-slv005.c.pg-us-n-app-053847.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/hdp/current/spark-client/python/pyspark/worker.py", line 111, in main
    process()
  File "/usr/hdp/current/spark-client/python/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/current/spark-client/python/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/hdp/current/spark-client/python/pyspark/sql/types.py", line 924, in convert_struct
    return tuple(conv(v) for v, conv in zip(obj, converters))
  File "/usr/hdp/current/spark-client/python/pyspark/sql/types.py", line 924, in <genexpr>
    return tuple(conv(v) for v, conv in zip(obj, converters))
  File "/usr/hdp/current/spark-client/python/pyspark/sql/types.py", line 900, in <lambda>
    return lambda row: [conv(v) for v in row]
TypeError: 'NoneType' object is not iterable
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:933)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1499)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1506)
    at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1516)
    at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1515)
    at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100)
    at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1515)
    at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)

根据要求,其中一行如下所示:

Row(accountType='individual', added='2018-06-05T01:52:34.257+0000', assignment='null', author='noahmagel', authorCity='null', authorCityCode='null', 
authorContinent='North America', authorContinentCode='n-a', authorCountry='United States', authorCountryCode='us', authorCounty='null', 
authorCountyCode='null', authorLocation='n-a,us,,,', authorState='null', authorStateCode='null', avatarUrl='https://pbs.twimg.com/profile_images/613069089263718401/P1BWMsFG_normal.jpg', 
averageDurationOfVisit=20.0, averageVisits=6.0, backlinks=49850734.0, blogComments=0.0, checked=False, city='null', cityCode='null', continent='North America', 
continentCode='n-a', country='United States', countryCode='us', county='null', countyCode='null', date='2017-12-11T10:58:36.000+0000', 
displayUrls=[], domain='twitter.com', engagement=0.0, expandedUrls=[], facebookAuthorId='null', facebookComments=0.0, facebookLikes=0.0, 
facebookRole='null', facebookShares=0.0, facebookSubtype='null', forumPosts=0.0, forumViews=0.0, fullText='@oli_braun @elonmusk @SpaceX Take my money 💰',
fullname='noah', gender='male', id=167783541878.0, imageMd5s=None, impact=34.0, importanceAmplification=28.0, importanceReach=40.0, 
impressions=208.0, influence=502.0, insightsHashtag=[], insightsMentioned=['@elonmusk', '@spacex', '@oli_braun'], instagramCommentCount=0.0, 
instagramFollowerCount=0.0, instagramFollowingCount=0.0, instagramInteractionsCount=0.0, instagramLikeCount=0.0, instagramPostCount=0.0, 
interest=['Fine arts', 'Business', 'Technology'], language='en', lastAssignmentDate='null', latitude=0.0, lemmatize=['money'], 
locationName='null', logoImages=None, longitude=0.0, matchPositions=[], mediaFilter='null', mediaUrls=[], monthlyVisitors=6000000000.0, mozRank=9.6, 
originalUrl='http://twitter.com/noahmagel/statuses/940173969935818752', outreach=0.0, pageType='twitter', pagesPerVisit=22.0, percentFemaleVisitors=46.0, 
percentMaleVisitors=54.0, priority='null', professions=[], queryId=1999376256.0, queryName='Braun_English', reach=502.0, 
replyTo='http://twitter.com/oli_braun/statuses/940171345115144192', resourceId=167783541878.0, resourceType='page', retweetOf='null', 
sentiment='neutral', shortUrls=[], snippet='@oli_braun @elonmusk @SpaceX Take my money 💰', starred=False, state='null', stateCode='null', status='null', 
subtype='null', tags=[], textlen=44, threadAuthor='oli_braun', threadCreated='null', threadEntryType='reply', threadId='0', threadURL='null',
title='noah (@noahmagel): @oli_braun @elonmusk @Spac ...', trackedLinkClicks=0.0, trackedLinks='null', twitterAuthorId='2246429194', 
twitterFollowers=208.0, twitterFollowing=513.0, twitterPostCount=381.0, twitterReplyCount=0.0, twitterRetweets=0.0, twitterRole='null', 
twitterVerified=False, updated='2018-06-05T01:52:34.257+0000', url='http://twitter.com/noahmagel/statuses/940173969935818752', wordCount='null')
2个回答

我使用以下内容重现了此问题:

sc = spark.sparkContext
json_rows = ['{"key1": [{"foo": 1}, {"bar": 2}]}', 
             '{"key2": 1}']
rows = sc.parallelize(json_rows)
df = spark.read.json(rows)
rdd = df.rdd
new_df = spark.createDataFrame(rdd, samplingRatio=1)
new_df.head(2)

给出了相同的错误:

File "/usr/hdp/current/spark-client/python/pyspark/sql/types.py", line 900, in <lambda>
    return lambda row: [conv(v) for v in row]
TypeError: 'NoneType' object is not iterable

请注意,这些行工作正常:

json_rows = ['{"key1": [1, 2]}', 
             '{"key2": 1}']

问题出在您有一个 listArrayType ,其元素为 StructTypeRow 类型时。 StructTypeRow 类型需要转换,请参阅 源代码

def _need_converter(dataType):
    if isinstance(dataType, StructType):
        return True
    elif isinstance(dataType, ArrayType):
        return _need_converter(dataType.elementType)

因此,它现在将尝试转换数组内的元素 ( lambda row: [conv(v) for v in row] )。如果该键的值对于某一行曾经为 None(如果该键不在该行上或明确为 None),这将引发错误。老实说,这看起来像是 spark 代码中的一个错误?您是否认为它会在尝试对数组中的元素调用 conv() 之前检查数组是否不为 None?

我的解决方案是通过 map 展平嵌套的行/结构,因此值只会变成 str 文字,无需进行转换。

import json
from pyspark.sql.types import Row, ArrayType

def flatten(x):
  x_dict = x.asDict()
  for k, v in x_dict.items():
    if isinstance(v, Row):
      x_dict[k] = json.dumps(v.asDict())
  return x_dict

sc = spark.sparkContext
rows = sc.parallelize(json_rows)
df = spark.read.json(rows)
flat_rdd = df.rdd.map(lambda x: flatten(x))
flat_df = spark.createDataFrame(flat_rdd, samplingRatio=1)
flat_df.head(2)

请注意,对于我的用例,我的所有嵌套对象都是 Row 类型,并且我可以展平整个 Row ,因为它无论如何都会转到 Redshift。YMMV。对于我上面的列表示例,您可能已经检查了类型 list ,我认为只要其元素是文字,就完全可以保留嵌套列表,因此您可以展平列表的元素而不是列表本身。

fizloki
2020-06-10

Python 映射“**”用于 python dict 对象。我使用 x.as_dict() 将 Row 转换为 dict,然后可以用作 **x.as_dict()

hamza tuna
2018-06-21