Massively Parallel Processing
GreenPlum
01.Greenplum MPP 架构
02.Primary/Mirror 同步机制
03.Greenplum 计算能力估算
04.segment mirrir mode
05.最佳配置
06.Install Greenplum for CentOS 8.4
07.greenplum adds mirror to master and segment
08.Greenplum Command Center v6.22.0
09.Greenplum平台扩展框架(PXF)
11.management tool
12.管理数据库
13.分布式数据库存储及查询处理
14.角色权限及客户端认证管理
系统配置和参数
Greenplum to Greenplum modules
18.gptransfer and gpcopy
22.Apache MADlib
21.gpbackup and gprestore
20.备份与恢复
gpfdist 原理解析
Greenplum Stream Server(GPSS)
17.Loading and unloading data
16.数据库性能
15.Database objects
Greenplum mapreduce
Apache Doris
安装 Apache Doris
本文档使用 MrDoc 发布
-
+
home page
Greenplum mapreduce
# 1.MapReduce MapReduce是由Google开发的用于在一系列商品服务器上处理和生成大型数据集的编程模型。 Greenplum 设计初衷是面向大规模数据分析的,能轻松扩展到 Petabyte 级别,通过 Greenplum 的并行数据流引擎能够让程序员玩 MapReduce, DBA 跑 SQL ,可谓两全其美。 类似的思路已经给数据仓库市场带来了一场革命,Greenplum 的间接竞争对手其实应该是 Hadoop 。 GreenplumMapReduce允许熟悉MapReduce模型的编程人员编写map和reduce函数,并将其提交给Greenplum数据库并行引擎进行处理。 为了使Greenplum能够处理MapReduce函数,在文档中定义函数,然后将文档传递给Greenplum MapReduce程序gpmapreduce,以供Greenplum数据库并行引擎执行。 Greenplum数据库系统分配输入数据,在一组机器上执行程序,处理机器故障,并管理所需的机器间通信。 # 2.Greenplum MapReduce文档格式 Greenplum使用YAML 1.1文档格式,然后实现自己的模式,以定义MapReduce作业的各个步骤。 所有Greenplum MapReduce文件必须首先声明它们正在使用的YAML规范的版本。 之后,三个破折号(-—)表示文档的开头,三个点(…)表示文档的结尾而不启动新文档。 注释行以井号(#)为前缀。 可以在同一个文件中声明多个Greenplum MapReduce文档: ``` %YAML 1.1 --- # Begin Document 1 # ... --- # Begin Document 2 # ... ``` 在Greenplum MapReduce文档中,有三种基本类型的数据结构或节点:标量,序列和映射。 ## 标量 是由空格缩进的基本文本字符串。 如果您有跨越多行的标量输入,则前面的管道(|)表示文字样式,其中所有换行符都很重要。 或者,前一个尖括号(>)将单个换行符折叠到具有相同缩进级别的后续行的空格。 如果字符串包含具有保留含义的字符,则必须引用该字符串,或者必须使用反斜杠(\)转义特殊字符。 ``` # Read each new line literally somekey: | this value contains two lines and each line is read literally # Treat each new line as a space anotherkey: > this value contains two lines but is treated as one continuous line # This quoted string contains a special character ThirdKey: "This is a string: not a mapping" ``` ## 序列 是列表,列表中的每个条目都在其自己的行上,用短划线和空格(- )表示。 或者,您可以将内联序列指定为方括号内的逗号分隔列表。 序列提供一组数据并为其提供订单。 将列表加载到Greenplum MapReduce程序时,将保留订单。 ``` # list sequence - this - is - a list - with - five scalar values # inline sequence [this, is, a list, with, five scalar values] ``` ## 映射 用于将数据值与称为键的标识符配对。 映射对每个key: value对使用冒号和空格(: ),或者也可以内联指定为花括号内的逗号分隔列表。 该密钥用作从映射中检索数据的索引。 ``` # a mapping of items title: War and Peace author: Leo Tolstoy date: 1865 # same mapping written inline {title: War and Peace, author: Leo Tolstoy, date: 1865} ``` ## 密钥 用于将元信息与每个节点相关联,并指定预期的节点类型(标量,序列或映射)。 Greenplum MapReduce程序按顺序处理文档的节点,并使用缩进(空格)来确定文档层次结构和节点之间的关系。 使用空白区域非常重要。 不应仅将白色空间用于格式化目的,并且根本不应使用制表符。 # 3.Greenplum MapReduce文档模式 Greenplum MapReduce使用YAML文档框架并实现自己的YAML模式。 Greenplum MapReduce文档的基本结构是: ## 3.1.数据库信息 ``` %YAML 1.1 --- VERSION: 1.0.0.2 DATABASE: dbname USER: db_username HOST: master_hostname PORT: master_port ``` - VERSION 必须。Greenplum MapReduce YAML规范的版本。当前版本为1.0.0.1。 - DATABASE 可选。指定Greenplum中要连接的数据库。如果未指定,则默认为默认数据库或$PGDATABASE(如果已设置)。 - USER 可选。指定要用于连接的数据库角色。 如果未指定,则默认为当前用户或$PGUSER(如果已设置)。 您必须是Greenplum超级用户才能运行用不受信任的Python和Perl编写的函数。 常规数据库用户可以运行用可信Perl编写的函数。 您还必须是数据库超级用户才能运行包含FILE, GPFDIST和EXEC输入类型的MapReduce作业。 - HOST 可选。指定Greenplum master主机名。如果未指定,则默认为localhost或$PGHOST(如果已设置)。 - PORT 可选。指定Greenplum主端口。如果未指定,则默认为5432或$PGPORT(如果已设置)。 ## 3.2.INPUT 必须。定义输入数据。每个MapReduce文档必须至少定义一个输入。 文档中允许多个输入定义,但每个输入定义只能指定其中一种访问类型:文件,gpfdist文件分发程序,数据库中的表,SQL命令或操作系统命令。 ``` DEFINE: - INPUT: NAME: input_name FILE: - hostname:/path/to/file GPFDIST: - hostname:port/file_pattern TABLE: table_name QUERY: SELECT_statement EXEC: command_string COLUMNS: - field_name data_type FORMAT: TEXT | CSV DELIMITER: delimiter_character ESCAPE: escape_character NULL: null_string QUOTE: csv_quote_character ERROR_LIMIT: integer ENCODING: database_encoding ``` DEFINE 必须。此MapReduce文档的一系列定义。DEFINE部分必须至少有一个INPUT定义。 - NAME 此输入的名称。关于此MapReduce作业中其他对象的名称(例如map函数,task,reduce函数和输出名称),名称必须是唯一的。 此外,名称不能与数据库中的现有对象(例如表,函数或视图)冲突。 - FILE 一个或多个输入文件的序列,格式为:seghostname:/path/to/filename。 您必须是Greenplum数据库超级用户才能使用FILE输入运行MapReduce作业。 文件必须是text或csv。该文件必须位于Greenplum segment主机上。 - GPFDIST 一个或多个运行gpfdist文件分发程序的序列,格式为:hostname\[:port\]/file\_pattern。 除非服务器配置参数[服务器配置参数]($8618fe5efcd3ee4d.md#topic1)设置为on, 否则您必须是Greenplum数据库超级用户才能使用GPFDIST输入运行MapReduce作业。 - TABLE 数据库中现有表的名称。 - QUERY 要在数据库中运行的SQL SELECT命令。 - EXEC 要在Greenplum segment主机上运行的操作系统命令。 默认情况下,该命令由系统中的所有segment实例运行。 例如,如果每个segment主机有四个segment实例,则该命令将在每个主机上运行四次。 您必须是Greenplum数据库超级用户才能使用EXEC输入运行MapReduce作业, 并且服务器配置参数[服务器配置参数]($8618fe5efcd3ee4d.md#topic1)设置为on。 - COLUMNS 可选。列指定为:column\_name \[data\_type\]。 如果未指定,则默认值为value text。 [DELIMITER](#topic3__DELIMITER)字符用于分隔两个数据值字段(列)。 行由换行符(0x0a)确定。 - FORMAT 可选。指定数据的格式 - 分隔文本(TEXT)或逗号分隔值(CSV)格式。 如果未指定数据格式,则默认为TEXT。 - DELIMITER 可选[FILE](#topic3__FILE), [GPFDIST](#topic3__GPFDIST)和[EXEC](#topic3__EXEC)输入。 指定用于分隔数据值的单个字符。 默认值为TEXT模式下的制表符,CSV模式下为逗号。 分隔符字符只能出现在任意两个数据值字段之间。 不要在行的开头或结尾放置分隔符。 - ESCAPE 对于[FILE](#topic3__FILE), [GPFDIST](#topic3__GPFDIST)和[EXEC](#topic3__EXEC)输入可选。 指定用于C转义序列的单个字符(例如\\n,\\t,\\100等)以及转义可能以行或列分隔符形式取出的数据字符。 确保选择实际列数据中未使用的转义字符。 默认转义字符是文本格式文件的\\(反斜杠)和csv格式文件的"(双引号),但是可以指定另一个字符来表示转义。 也可以通过指定禁用转义值'OFF'作为转义值。 这对于诸如文本格式的Web日志数据之类的数据非常有用,这些数据具有许多不打算转义的嵌入式反斜杠。 - NULL 对于[FILE](#topic3__FILE), [GPFDIST](#topic3__GPFDIST)和[EXEC](#topic3__EXEC)输入可选。 指定表示空值的字符串。 默认值为TEXT格式的\\N,以及CSV格式没有引号的空值。 如果您不想将空值与空字符串区分开来,即使在TEXT模式下,您可能更喜欢空字符串。 与此字符串匹配的任何输入数据项都将被视为空值。 - QUOTE 对于[FILE](#topic3__FILE), [GPFDIST](#topic3__GPFDIST)和[EXEC](#topic3__EXEC)输入可选。 指定CSV格式文件的引用字符。 默认值为双引号(")。 在CSV格式的文件中,如果数据值字段包含任何逗号或嵌入的新行,则必须用双引号括起来。 包含双引号字符的字段必须用双引号括起来,并且嵌入双引号必须由一对连续的双引号表示。 始终正确打开和关闭引号以便正确解析数据行非常重要。 - ERROR\_LIMIT 如果输入行具有格式错误,则只要在输入处理期间未在任何Greenplum segment实例上达到错误限制计数,它们将被丢弃。 如果未达到错误限制,则将处理所有正常行并丢弃任何错误行。 - ENCODING 用于数据的字符集编码。 指定字符串常量(例如'SQL\_ASCII'),整数编码号或DEFAULT以使用默认客户端编码。 有关更多信息,请参阅[字符集支持]($8b969ad83b61690e.md#topic1)。 ## 3.3.OUTPUT 可选 定义输出此MapReduce作业的格式化数据的位置。 如果未定义输出,则默认为STDOUT(客户端的标准输出)。 您可以将输出发送到客户端主机上的文件或数据库中的现有表。 ``` - OUTPUT: NAME: output_name FILE: file_path_on_client TABLE: table_name KEYS: - column_name MODE: REPLACE | APPEND ``` - NAME 此输出的名称。默认输出名称为STDOUT。 关于MapReduce作业中其他对象的名称(例如map函数,task,reduce函数和输入名称),名称必须是唯一的。 此外,名称不能与数据库中的现有对象(例如表,函数或视图)冲突。 - FILE 指定MapReduce客户端计算机上的文件位置,以如下格式输出数据:/path/to/filename。 - TABLE 指定数据库中用于输出数据的表的名称。 如果在运行MapReduce作业之前该表不存在,则将使用[KEYS](#topic3__KEYS)指定的分发策略创建该表。 - KEYS [TABLE](#topic3__OUTPUTTABLE)输出的可选项。 指定要用作Greenplum数据库分发键的列。 如果[EXECUTE](#topic3__EXECUTE)任务包含[REDUCE](#topic3__REDUCE)定义, 则默认情况下REDUCE键将用作表分发键。 否则,表的第一列将用作分发键。 - MODE [TABLE](#topic3__OUTPUTTABLE)输出的可选项。 如果未指定,则默认为创建表(如果该表尚不存在),但如果表存在则输出错误。 声明APPEND将输出数据添加到现有表(前提是表模式与输出格式匹配),而不删除任何现有数据。 如果表存在,则声明REPLACE将删除该表,然后重新创建它。 如果不存在,APPEND和REPLACE都将创建一个新表。 ## 3.4.MAP 每个MAP函数采用以(key, value)对构造的数据, 处理每对,并生成零个或多个输出(key, value)对。 然后,Greenplum MapReduce框架从所有输出列表中收集具有相同密钥的所有对,并将它们组合在一起。 然后将此输出传递给[REDUCE](#topic3__TASKREDUCE)任务, 该任务由[TRANSITION | CONSOLIDATE | FINALIZE](#topic3__TCF)函数组成。 有一个名为IDENTITY的预定义MAP函数,它返回的(key, value)对不变。 虽然(key, value)是默认参数,但您可以根据需要指定其他原型。 ``` - MAP: NAME: function_name FUNCTION: function_definition LANGUAGE: perl | python | c LIBRARY: /path/filename.so PARAMETERS: - nametype RETURNS: - nametype OPTIMIZE: STRICT IMMUTABLE MODE: SINGLE | MULTI ``` - NAME 必须。函数的名称。关于此MapReduce作业中其他对象的名称(例如函数,任务,输入和输出名称),名称必须是唯一的。 您还可以指定Greenplum数据库内置函数的名称。 如果使用内置函数,请不要提供[LANGUAGE](#topic3__LANGUAGE)或[FUNCTION](#topic3__FUNCTION)正文。 - FUNCTION 可选。使用指定的[LANGUAGE](#topic3__LANGUAGE)指定函数的完整主体。 如果未指定FUNCTION, 则使用与[NAME](#topic3__TCFNAME)对应的内置数据库函数。 - LANGUAGE 使用[FUNCTION](#topic3__FUNCTION)时需要。指定用于解释函数的实现语言。 此版本具有对perl,python和C的语言支持。 如果调用内置数据库函数,则不应指定LANGUAGE。 - LIBRARY [LANGUAGE](#topic3__LANGUAGE)为C时必需(不允许用于其他语言函数)。 要使用此属性,[VERSION](#topic3__VERSION)必须为1.0.0.2。 必须在运行MapReduce作业之前安装指定的库文件,并且该文件必须存在于所有Greenplum主机(master和segment)上的相同文件系统位置。 - PARAMETERS 可选。函数输入参数。默认类型是text。 MAP default - key text, value text TRANSITION default - state text, value text CONSOLIDATE default - state1 text, state2 text (必须具有相同数据类型的两个输入参数) FINALIZE default - state text (仅限单个参数) - RETURNS 可选。默认返回类型是text。 MAP default - key text, value text TRANSITION default - state text (仅限单个参数) CONSOLIDATE default - state text (仅限单个参数) FINALIZE default - value text - OPTIMIZE 该函数的可选优化参数: STRICT - 函数不受NULL值的影响 IMMUTABLE - 函数将始终返回给定输入的相同值 - MODE 可选。指定函数返回的行数。 MULTI - 每个输入记录返回0行或更多行。 函数的返回值必须是要返回的行数组,或者必须使用Python中的yield或Perl中的return\_next将函数写为迭代器。 MULTI是MAP和FINALIZE函数的默认模式。 SINGLE - 每个输入记录只返回一行。 SINGLE是TRANSITION和CONSOLIDATE函数支持的唯一模式。 当与MAP和FINALIZE函数一起使用时,SINGLE模式可以提供适度的性能改进。 ## 3.5.TRANSITION ``` - TRANSITION | CONSOLIDATE | FINALIZE: NAME: function_name FUNCTION: function_definition LANGUAGE: perl | python | c LIBRARY: /path/filename.so PARAMETERS: - nametype RETURNS: - nametype OPTIMIZE: STRICT IMMUTABLE MODE: SINGLE | MULTI ``` - TRANSITION | CONSOLIDATE | FINALIZE TRANSITION,CONSOLIDATE和FINALIZE都是[REDUCE](#topic3__REDUCE)的组成部分。 需要TRANSITION函数。 CONSOLIDATE和FINALIZE函数是可选的。 默认情况下,所有将state作为其输入[PARAMETERS](#topic3__PARAMETERS)的第一个,但也可以定义其他原型。 TRANSITION函数遍历给定键的每个值,并在state变量中累积值。 当在键的第一个值上调用转换函数时,state将设置为[REDUCE](#topic3__REDUCE)作业的[INITIALIZE](#topic3__INITIALIZE)指定的值(或数据类型的默认状态值)。 转换需要两个参数作为输入; 密钥减少的当前状态和下一个值,然后产生一个新state。 如果指定了CONSOLIDATE函数,则在segment级别执行TRANSITION处理, 然后在Greenplum互连上重新分配密钥以进行最终聚合(两阶段聚合)。 仅重新分配给定密钥的结果state值,从而导致更低的互连流量和更高的并行度。 CONSOLIDATE像TRANSITION一样处理,除了(state + value)=> state, 它是(state + state)=> state。 如果指定了FINALIZE函数,它将采用CONSOLIDATE(如果存在)或TRANSITION生成的最终state, 并在发出最终结果之前进行任何最终处理。 TRANSITION和CONSOLIDATE函数不能返回一组值。 如果需要REDUCE作业来返回一个集合,则需要FINALIZE将最终状态转换为一组输出值。 ## 3.6.REDUCE 必须。REDUCE定义命名[TRANSITION | CONSOLIDATE | FINALIZE](#topic3__TCF)函数,包括将(key,value)对缩减到最终结果集。 ``` - REDUCE: NAME: reduce_job_name TRANSITION: transition_function_name CONSOLIDATE: consolidate_function_name FINALIZE: finalize_function_name INITIALIZE: value KEYS: - key_name ``` 您还可以执行几个预定义的REDUCE作业,这些作业都在名为value的列上运行: IDENTITY - 返回(键,值)对不变 SUM - 计算数值数据的总和 AVG - 计算数字数据的平均值 COUNT - 计算输入数据的计数 MIN - 计算数值数据的最小值 MAX - 计算数值数据的最大值 - NAME 必须。这个REDUCE工作的名称。 关于此MapReduce作业中的其他对象的名称(函数,任务,输入和输出名称),名称必须是唯一的。 此外,名称不能与数据库中的现有对象(例如表,函数或视图)冲突。 - TRANSITION 必须。TRANSITION函数名称。 - CONSOLIDATE 可选。CONSOLIDATE函数名称。 - FINALIZE 可选。FINALIZE函数名称。 - INITIALIZE text和float数据类型的可选项。 所有其他数据类型都需要。 文本的默认值为''。 float的默认值为0.0。 设置TRANSITION函数的初始state值。 - KEYS 可选。默认为\[key, \*\]。 使用多列缩减时,可能需要指定哪些列是键列,哪些列是值列。 默认情况下,未传递给TRANSITION函数的任何输入列都是键列, 名为key的列始终是键列,即使它传递给TRANSITION函数也是如此。 特殊指示符\*表示未传递给TRANSITION函数的所有列。 如果该指示符不存在于键列表中,则丢弃任何不匹配的列。 ## 3.7.TASK 可选。TASK在Greenplum MapReduce作业管道中定义了完整的端到端INPUT/MAP/REDUCE阶段。 它与[EXECUTE](#topic3__EXECUTE)类似,但不会立即执行。 ``` - TASK: NAME: task_name SOURCE: input_name MAP: map_function_name REDUCE: reduce_function_name ``` 可以被称为[INPUT](#topic3__INPUT)的任务对象进入进一步处理阶段。 - NAME 必须。此任务的名称。 关于此MapReduce作业中其他对象的名称(例如map函数,reduce函数,输入和输出名称),名称必须是唯一的。 此外,名称不能与数据库中的现有对象(例如表,函数或视图)冲突。 - SOURCE [INPUT](#topic3__INPUT)或其他TASK的名称。 - MAP 可选。[MAP](#topic3__MAP)函数的名称。 如果未指定,则默认为IDENTITY。 - REDUCE 可选。[REDUCE](#topic3__REDUCE)函数的名称。 如果未指定,则默认为IDENTITY。 ## 3.8.EXECUTE 必须。EXECUTE定义Greenplum MapReduce作业管道中的最终INPUT/MAP/REDUCE阶段。 ``` EXECUTE - RUN: SOURCE: input_or_task_name TARGET: output_name MAP: map_function_name REDUCE: reduce_function_name... ``` - run 运行任务 - SOURCE 必须。[INPUT](#topic3__INPUT)或[TASK](#topic3__TASK)的名称。 - TARGET 可选。[OUTPUT](#topic3__OUTPUT)的名称。默认值为STDOUT。 - MAP 可选。[MAP](#topic3__MAP)函数名称。 如果未指定,默认为IDENTITY。 - REDUCE 可选。[REDUCE](#topic3__REDUCE)函数的名称。默认为IDENTITY。 # 4.示例Greenplum MapReduce文档 ``` # This example MapReduce job processes documents and looks for keywords in them. # It takes two database tables as input: # - documents (doc_id integer, url text, data text) # - keywords (keyword_id integer, keyword text)# # The documents data is searched for occurrences of keywords and returns results of # url, data and keyword (a keyword can be multiple words, such as "high performance # computing") %YAML 1.1 --- VERSION:1.0.0.1 # Connect to Greenplum Database using this database and role DATABASE:webdata USER:jsmith # Begin definition section DEFINE: # Declare the input, which selects all columns and rows from the 'documents' and 'keywords' tables. - INPUT: NAME:doc TABLE:documents - INPUT: NAME:kw TABLE:keywords # Define the map functions to extract terms from documents and keyword # This example simply splits on white space, but it would be possible # to make use of a python library like nltk (the natural language toolkit) # to perform more complex tokenization and word stemming. - MAP: NAME:doc_map LANGUAGE:python FUNCTION:| i = 0 # the index of a word within the document terms = {}# a hash of terms and their indexes within the document # Lower-case and split the text string on space for term in data.lower().split(): i = i + 1# increment i (the index) # Check for the term in the terms list: # if stem word already exists, append the i value to the array entry # corresponding to the term. This counts multiple occurrences of the word. # If stem word does not exist, add it to the dictionary with position i. # For example: # data: "a computer is a machine that manipulates data" # "a" [1, 4] # "computer" [2] # "machine" [3] # … if term in terms: terms[term] += ','+str(i) else: terms[term] = str(i) # Return multiple lines for each document. Each line consists of # the doc_id, a term and the positions in the data where the term appeared. # For example: # (doc_id => 100, term => "a", [1,4] # (doc_id => 100, term => "computer", [2] # … for term in terms: yield([doc_id, term, terms[term]]) OPTIMIZE:STRICT IMMUTABLE PARAMETERS: - doc_id integer - data text RETURNS: - doc_id integer - term text - positions text # The map function for keywords is almost identical to the one for documents # but it also counts of the number of terms in the keyword. - MAP: NAME:kw_map LANGUAGE:python FUNCTION:| i = 0 terms = {} for term in keyword.lower().split(): i = i + 1 if term in terms: terms[term] += ','+str(i) else: terms[term] = str(i) # output 4 values including i (the total count for term in terms): yield([keyword_id, i, term, terms[term]]) OPTIMIZE:STRICT IMMUTABLE PARAMETERS: - keyword_id integer - keyword text RETURNS: - keyword_id integer - nterms integer - term text - positions text # A TASK is an object that defines an entire INPUT/MAP/REDUCE stage # within a Greenplum MapReduce pipeline. It is like EXECUTION, but it is # executed only when called as input to other processing stages. # Identify a task called 'doc_prep' which takes in the 'doc' INPUT defined earlier # and runs the 'doc_map' MAP function which returns doc_id, term, [term_position] - TASK: NAME:doc_prep SOURCE:doc MAP:doc_map # Identify a task called 'kw_prep' which takes in the 'kw' INPUT defined earlier # and runs the kw_map MAP function which returns kw_id, term, [term_position] - TASK: NAME:kw_prep SOURCE:kw MAP:kw_map # One advantage of Greenplum MapReduce is that MapReduce tasks can be # used as input to SQL operations and SQL can be used to process a MapReduce task. # This INPUT defines a SQL query that joins the output of the 'doc_prep' # TASK to that of the 'kw_prep' TASK. Matching terms are output to the 'candidate' # list (any keyword that shares at least one term with the document). - INPUT: NAME: term_join QUERY: | SELECT doc.doc_id, kw.keyword_id, kw.term, kw.nterms, doc.positions as doc_positions, kw.positions as kw_positions FROM doc_prep doc INNER JOIN kw_prep kw ON (doc.term = kw.term) # In Greenplum MapReduce, a REDUCE function is comprised of one or more functions. # A REDUCE has an initial 'state' variable defined for each grouping key. that is # A TRANSITION function adjusts the state for every value in a key grouping. # If present, an optional CONSOLIDATE function combines multiple # 'state' variables. This allows the TRANSITION function to be executed locally at # the segment-level and only redistribute the accumulated 'state' over # the network. If present, an optional FINALIZE function can be used to perform # final computation on a state and emit one or more rows of output from the state. # # This REDUCE function is called 'term_reducer' with a TRANSITION function # called 'term_transition' and a FINALIZE function called 'term_finalizer' - REDUCE: NAME:term_reducer TRANSITION:term_transition FINALIZE:term_finalizer - TRANSITION: NAME:term_transition LANGUAGE:python PARAMETERS: - state text - term text - nterms integer - doc_positions text - kw_positions text FUNCTION: | # 'state' has an initial value of '' and is a colon delimited set # of keyword positions. keyword positions are comma delimited sets of # integers. For example, '1,3,2:4:' # If there is an existing state, split it into the set of keyword positions # otherwise construct a set of 'nterms' keyword positions - all empty if state: kw_split = state.split(':') else: kw_split = [] for i in range(0,nterms): kw_split.append('') # 'kw_positions' is a comma delimited field of integers indicating what # position a single term occurs within a given keyword. # Splitting based on ',' converts the string into a python list. # add doc_positions for the current term for kw_p in kw_positions.split(','): kw_split[int(kw_p)-1] = doc_positions # This section takes each element in the 'kw_split' array and strings # them together placing a ':' in between each element from the array. # For example: for the keyword "computer software computer hardware", # the 'kw_split' array matched up to the document data of # "in the business of computer software software engineers" # would look like: ['5', '6,7', '5', ''] # and the outstate would look like: 5:6,7:5: outstate = kw_split[0] for s in kw_split[1:]: outstate = outstate + ':' + s return outstate - FINALIZE: NAME: term_finalizer LANGUAGE: python RETURNS: - count integer MODE:MULTI FUNCTION:| if not state: return 0 kw_split = state.split(':') # This function does the following: # 1) Splits 'kw_split' on ':' # for example, 1,5,7:2,8 creates '1,5,7' and '2,8' # 2) For each group of positions in 'kw_split', splits the set on ',' # to create ['1','5','7'] from Set 0: 1,5,7 and # eventually ['2', '8'] from Set 1: 2,8 # 3) Checks for empty strings # 4) Adjusts the split sets by subtracting the position of the set # in the 'kw_split' array # ['1','5','7'] - 0 from each element = ['1','5','7'] # ['2', '8'] - 1 from each element = ['1', '7'] # 5) Resulting arrays after subtracting the offset in step 4 are # intersected and their overlapping values kept: # ['1','5','7'].intersect['1', '7'] = [1,7] # 6) Determines the length of the intersection, which is the number of # times that an entire keyword (with all its pieces) matches in the # document data. previous = None for i in range(0,len(kw_split)): isplit = kw_split[i].split(',') if any(map(lambda(x): x == '', isplit)): return 0 adjusted = set(map(lambda(x): int(x)-i, isplit)) if (previous): previous = adjusted.intersection(previous) else: previous = adjusted # return the final count if previous: return len(previous) # Define the 'term_match' task which is then executed as part # of the 'final_output' query. It takes the INPUT 'term_join' defined # earlier and uses the REDUCE function 'term_reducer' defined earlier - TASK: NAME:term_match SOURCE:term_join REDUCE:term_reducer - INPUT: NAME:final_output QUERY:| SELECT doc.*, kw.*, tm.count FROM documents doc, keywords kw, term_match tm WHERE doc.doc_id = tm.doc_id AND kw.keyword_id = tm.keyword_id AND tm.count > 0 # Execute this MapReduce job and send output to STDOUT EXECUTE: - RUN: SOURCE:final_output TARGET:STDOUT ``` MapReduce示例的流程图  # 5.提交 gpmapreduce 作业 按照YAML规范文档中的定义运行Greenplum的MapReduce作业 在运行此程序之前,需要执行以下操作: - 用户必须在YAML文件中定义用户的MapReduce作业,YAML不支持TAB对齐,只能用空格对齐。 - 用户必须是Greenplum数据库超级用户才能运行使用不可信Perl或Python 编写的MapReduce作业。 - 用户必须是Greenplum数据库超级用户才能运行带有EXEC 和FILE输入的MapReduce作业。 - 用户必须是Greenplum数据库超级用户才能运行带有GPFDIST 输入的MapReduce作业,除非用户具有适当的权限。 step 1.在数据库中创建解析mapreduce的语言,必须由超级用户创建 在执行Map和Reduce函数时,GP MapReduce会使用过程语言将这些函数构建到数据库中 GP需要提前在数据库中创建这些语言,使用CREATE LANGUAGE命令在数据库中为要执行的MapReduce作业创建语言 ``` psql -c 'CREATE LANGUAGE plpythonu;' psql -c 'CREATE LANGUAGE plperl;' ``` 对于用python写代码遇到莫名错误的问题,可以在函数中添加异常处理信息,打印plpy.warning(e)可以提高调试速度。 step 2/通过gpmapreduce客户端程序提交该程序到GP 提供连接信息,如数据库、用户名、主机名和端口等 ``` gpmapreduce -h gpmasterhost -p 5432 -f my_gpmr_spec.yml mydatabase ``` 或 运行my_yaml.txt中定义的MapReduce作业,并连接到数据库 mydatabase: ``` gpmapreduce -f my_yaml.txt mydatabase ``` ## 安装库文件(自定义C函数) 函数定义的库文件必须安装到GP所有主机相同路径 例如:在MapReduce的YAML定义中存在一个C函数的定义: ``` DEFINE: - TRANSITION: NAME: int4_accum PARAMETERS: [state int8, value int4] RETURNS: [state int8] LANGUAGE: C LIBRARY: $libdir/gpmr.so FUNCTION: int4_accum ``` LIBRARY指定的$libdir/gpmr.so文件必须在所有主机的$GPHOME/lib目录下存在 ## MapReduce作业故障诊断 ### 语言不存在 1. 症状 ``` ERROR: language "pl*" does not exist HINT: Use CREATE LANGUAGE to load the language into the database. ``` 2. 解释 在执行Map和Reduce函数时,GP MapReduce会使用过程语言将这些函数构建到数据库中。GP需提前创建好这些语言 3. 解决方案 创建相应的语言,如: ``` psql database_name –c ‘CREATE LANGUAGE plperl;’ ``` ### 通用Python迭代器错误 1. 症状 ``` ERROR: plpython: function “function_name” error fetching next item from iterator DETAIL: <type 'exceptions.IOError'>: [Errno 2] No such file or directory: '/tmp/file/doesnt/exist' (, line 39) ``` 2. 解释 当Python的迭代函数中存在错误的时候,就会返回该错误; 这通常标识着你的Python程序存在bug 3. 解决方案 最简单的调试办法是在函数体外包装一个错误捕获结构。例如: ``` FUNCTION: | try: ... user code ... except Exception, e: plpy.warning('my function name:' + str(e)) ``` ### 函数定义使用了错误的模式 症状 ``` ERROR: set-returning PERL function must return reference to array or use return_next ----------------------------------------------------------------- ERROR: composite-returning PERL function must return reference to hash ----------------------------------------------------------------- ERROR: returned sequence's length must be same as tuple's length ----------------------------------------------------------------- ERROR: no attribute named "key" HINT: to return null in specific column, let returned object to have attribute named after column with value None ----------------------------------------------------------------- ``` 2. 解释 在GP MapReduce函数中有两种主要模式: MODE: SINGLE 每行接收返回单行 MODE: MULTI 每行接收可能返回0~N的任意行 Transition函数和Consolidate函数仅支持SINGLE模式。因为他们属于有限状态函数,且必须返回下一个状态 Map函数和Finalize函数支持两种模式且缺省为Multi模式 3. 解决方案 在SINGLE模式的函数中必须使用return方法来返回单行数据 在MULTI模式中,最好作为通用函数来编写,在Python中使用yield,在Perl中使用return_next 例如 ``` - MAP: NAME: perl_single MODE: SINGLE PARAMETERS: [key text, value text] RETURNS: [key text, value text] LANGUAGE: perl FUNCTION: | my ($key, $value) = @_; return {'key' => $key, 'value' => $value} return undef - MAP: NAME: perl_multi MODE: MULTI PARAMETERS: [key text, value text] RETURNS: [key text, value text] LANGUAGE: perl FUNCTION: | my ($key, $value) = @_; for my $i (0..10) { return_next {'key' => $key, 'value' => $value} } return undef ```
Seven
Oct. 19, 2022, 9:02 p.m.
转发文档
Collection documents
Last
Next
手机扫码
Copy link
手机扫一扫转发分享
Copy link
Markdown文件
share
link
type
password
Update password